Newer
Older
# -*- coding: utf-8 -*-
# Copyright(C) 2012 Romain Bignon
#
# This file is part of a weboob module.
# This weboob module is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This weboob module is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this weboob module. If not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
from decimal import Decimal
import re
from io import BytesIO
from PIL import Image, ImageFilter
from weboob.browser.elements import method, DictElement, ItemElement
from weboob.browser.filters.standard import CleanText, CleanDecimal, Regexp, Eval, Date, Field
from weboob.browser.filters.html import Attr, Link, AttributeNotFound
from weboob.browser.filters.json import Dict
Jerome Berthier
committed
from weboob.exceptions import BrowserUnavailable, BrowserIncorrectPassword, ActionNeeded, BrowserPasswordExpired
from weboob.browser.pages import HTMLPage, LoggedPage, FormNotFound, JsonPage, RawPage, XMLPage
from weboob.capabilities.bank import Account, Investment
from weboob.capabilities.profile import Person
from weboob.capabilities.contact import Advisor
from weboob.capabilities import NotAvailable
from weboob.tools.capabilities.bank.transactions import FrenchTransaction
from weboob.tools.captcha.virtkeyboard import SplitKeyboard
from weboob.tools.decorators import retry
from weboob.tools.compat import urlsplit, parse_qsl
from weboob.tools.json import json
from weboob.tools.misc import to_unicode
from weboob.tools.pdf import get_pdf_rows
class LoggedOut(Exception):
pass
class BrokenPageError(Exception):
pass
assert isinstance(key, bytes)
self.state = list(range(256))
self.x = self.y = 0
if key is not None:
self.init(key)
@staticmethod
def ord(i):
if sys.version_info.major < 3:
return ord(i)
return i
@staticmethod
def chr(i):
if sys.version_info.major < 3:
return chr(i)
return bytes([i])
def init(self, key):
for i in range(256):
self.x = (self.ord(key[i % len(key)]) + self.state[i] + self.x) & 0xFF
self.state[i], self.state[self.x] = self.state[self.x], self.state[i]
self.x = 0
def crypt(self, input):
output = [None]*len(input)
self.x = (self.x + 1) & 0xFF
self.y = (self.state[self.x] + self.y) & 0xFF
self.state[self.x], self.state[self.y] = self.state[self.y], self.state[self.x]
output[i] = self.chr((self.ord(input[i]) ^ self.state[(self.state[self.x] + self.state[self.y]) & 0xFF]))
return b''.join(output)
class BasePage(object):
ENCODING = 'iso-8859-15'
token = Attr('//form//input[@name="token"]', 'value', default=NotAvailable)(self.doc)
try:
token = Regexp(Attr('//body', 'onload'), "saveToken\('(.*?)'")(self.doc)
except AttributeNotFound:
self.logger.warning('Unable to update token.')
token = self.get_token()
if token:
self.browser.token = token
self.logger.debug('Update token to %s', self.browser.token)
for script in self.doc.xpath('//script'):
if script.text is not None and \
(u"Le service est momentanément indisponible" in script.text or
Quentin Defenouillere
committed
u"Le service est temporairement indisponible" in script.text or
u"Votre abonnement ne vous permet pas d'accéder à ces services" in script.text or
u'Merci de bien vouloir nous en excuser' in script.text):
return True
def build_token(self, token):
"""
Each time there is a call to SAB (selectActionButton), the token
available in the form is modified with a key available in JS:
ipsff(function(){TW().ipthk([12, 25, 17, 5, 23, 26, 15, 30, 6]);});
Each value of the array is an index for the current token to append the
char at this position at the end of the token.
"""
table = None
for script in self.doc.xpath('//script'):
if script.text is None:
continue
m = re.search(r'ipthk\(([^\)]+)\)', script.text, flags=re.MULTILINE)
if m:
table = json.loads(m.group(1))
if table is None:
return token
for i in table:
token += token[i]
return token
def get_params(self):
params = {}
for field in self.doc.xpath('//input'):
params[field.attrib['name']] = field.attrib.get('value', '')
return params
def get_button_actions(self):
actions = {}
for script in self.doc.xpath('//script'):
if script.text is None:
continue
for id, action, strategy in re.findall(r'''attEvt\(window,"(?P<id>[^"]+)","click","sab\('(?P<action>[^']+)','(?P<strategy>[^']+)'\);"''', script.text, re.MULTILINE):
actions[id] = {'dialogActionPerformed': action,
'validationStrategy': strategy,
}
return actions
class MyHTMLPage(BasePage, HTMLPage):
def build_doc(self, data, *args, **kwargs):
# XXX FUCKING HACK BECAUSE BANQUE POPULAIRE ARE NASTY AND INCLUDE NULL
# BYTES IN DOCUMENTS.
data = data.replace(b'\x00', b'')
return super(MyHTMLPage, self).build_doc(data, *args, **kwargs)
class RedirectPage(LoggedPage, MyHTMLPage):
ENCODING = None
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
"""
var i = 'lyhrnu551jo42yfzx0jm0sqk';
setCookie('i', i);
var welcomeMessage = decodeURI('M MACHIN');
var lastConnectionDate = decodeURI('17 Mai 2013');
var lastConnectionTime = decodeURI('14h27');
var userId = '12345678';
var userCat = '1';
setCookie('uwm', $.rc4EncryptStr(welcomeMessage, i));
setCookie('ulcd', $.rc4EncryptStr(lastConnectionDate, i));
setCookie('ulct', $.rc4EncryptStr(lastConnectionTime, i));
setCookie('uid', $.rc4EncryptStr(userId, i));
setCookie('uc', $.rc4EncryptStr(userCat, i));
var agentCivility = 'Mlle';
var agentFirstName = decodeURI('Jeanne');
var agentLastName = decodeURI('Machin');
var agentMail = decodeURI('gary@example.org');
setCookie('ac', $.rc4EncryptStr(agentCivility, i));
setCookie('afn', $.rc4EncryptStr(agentFirstName, i));
setCookie('aln', $.rc4EncryptStr(agentLastName, i));
setCookie('am', $.rc4EncryptStr(agentMail, i));
var agencyLabel = decodeURI('DTC');
var agencyPhoneNumber = decodeURI('0123456789');
setCookie('al', $.rc4EncryptStr(agencyLabel, i));
setCookie('apn', $.rc4EncryptStr(agencyPhoneNumber, i));
Note: that cookies are useless to login on website
"""
def add_cookie(self, name, value):
# httplib/cookielib don't seem to like unicode cookies...
if sys.version_info.major < 3:
name = to_unicode(name).encode('utf-8')
value = to_unicode(value).encode('utf-8')
self.browser.logger.debug('adding cookie %r=%r', name, value)
self.browser.session.cookies.set(name, value, domain=urlsplit(self.url).hostname)
redirect_url = None
args = {}
RC4 = None
for script in self.doc.xpath('//script'):
if script.text is None:
continue
m = re.search('window.location=\'([^\']+)\'', script.text, flags=re.MULTILINE)
if m:
redirect_url = m.group(1)
for line in script.text.split('\r\n'):
m = re.match("^var (\w+) ?= ?[^']*'([^']*)'.*", line)
if m:
args[m.group(1)] = m.group(2)
m = re.match("^setCookie\('([^']+)', (\w+)\);", line)
if m:
self.add_cookie(m.group(1), args[m.group(2)])
m = re.match("^setCookie\('([^']+)', .*rc4EncryptStr\((\w+), \w+\)", line)
if m:
enc = RC4.crypt(args[m.group(2)].encode('ascii'))
self.add_cookie(m.group(1), hexlify(enc).decode('ascii'))
if RC4 is None and 'i' in args:
RC4 = WikipediaARC4(args['i'].encode('ascii'))
if redirect_url is not None:
url = self.browser.absurl(redirect_url)
headers = {'Referer': self.url}
self.browser.logger.debug('redir...')
self.browser.location(url, headers=headers)
form = self.get_form(name="CyberIngtegrationPostForm")
except FormNotFound:
class ErrorPage(LoggedPage, MyHTMLPage):
def on_load(self):
if CleanText('//script[contains(text(), "momentanément indisponible")]')(self.doc):
raise BrowserUnavailable(u"Le service est momentanément indisponible")
elif CleanText('//h1[contains(text(), "Cette page est indisponible")]')(self.doc):
raise BrowserUnavailable('Cette page est indisponible')
return super(ErrorPage, self).on_load()
buf = self.doc.xpath('//body/@onload')[0]
except IndexError:
return
else:
m = re.search("saveToken\('([^']+)'\)", buf)
if m:
return m.group(1)
class UnavailablePage(LoggedPage, MyHTMLPage):
def on_load(self):
h1 = CleanText('//h1[1]')(self.doc)
if "est indisponible" in h1:
raise BrowserUnavailable(h1)
body = CleanText(".")(self.doc)
if "An unexpected error has occurred." in body or "Une erreur s'est produite" in body:
raise BrowserUnavailable(body)
a = Link('//a[@class="btn"][1]', default=None)(self.doc)
self.browser.location(a)
class LoginPage(MyHTMLPage):
def on_load(self):
h1 = CleanText('//h1[1]')(self.doc)
if h1.startswith('Le service est moment'):
text = CleanText('//h4[1]')(self.doc) or h1
raise BrowserUnavailable(text)
if not self.browser.no_login:
raise LoggedOut()
def login(self, login, passwd):
form = self.get_form(name='Login')
form['IDToken1'] = login.encode(self.ENCODING)
form['IDToken2'] = passwd.encode(self.ENCODING)
form.submit()
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
class MyVirtKeyboard(SplitKeyboard):
char_to_hash = {
'0': '6a2cb38bcfc27781faaec727ad304ce2',
'1': '296140f37a22b5e2b4871272aed22444',
'2': 'c1318fd381665a97e1052f85213867a7',
'3': 'fe19d2cc8f8d09b818b05c2a10218233',
'4': 'd5a03e69857bf01fc373cedbe2530ca9',
'5': '289ae90e4adfa58ef4767d9151c96348',
'6': '88938bbbb6b81ee2a32568f7081be488',
'7': '96499777fb95974ee651f19181de6c01',
'8': '6e2e052c9301d1f381155912ad4d3874',
'9': '5958d54d88bfaa172305575164b39a8d',
}
codesep = ' '
def convert(self, buffer):
im = Image.open(BytesIO(buffer))
im = im.resize((5, 8), Image.BILINEAR)
im = im.filter(ImageFilter.UnsharpMask(radius=2,
percent=110,
threshold=3))
im = im.convert("P", dither=Image.NONE)
im = Image.eval(im, lambda x: 0 if x < 160 else 255)
s = BytesIO()
im.save(s, 'png')
return s.getvalue()
class Login2Page(LoginPage):
@property
def request_url(self):
transactionID = self.params['transactionID']
assert transactionID
return self.browser.redirect_url + transactionID
if not self.browser.no_login:
raise LoggedOut()
r = self.browser.open(self.request_url)
self.form_id, = [(k, v[0]['id'], v[0]['type']) for k, v in doc['step']['validationUnits'][0].items() if v[0]['type'] in ('PASSWORD_LOOKUP', 'IDENTIFIER')]
def virtualkeyboard(self, vk_obj, password):
imgs = {}
lst_img = self.browser.location(vk_obj['externalRestMediaApiUrl']).json()
for img_info in lst_img:
value = img_info['value']
url = img_info['uri']
resp = self.browser.location(url)
imgs[value] = resp.content
return MyVirtKeyboard(imgs).get_string_code(password)
def login(self, login, password):
payload = {
'validate': {
self.form_id[0]: [ {
'id': self.form_id[1],
'login': login.upper(),
'password': password,
'type': 'PASSWORD_LOOKUP',
} ]
}
}
url = self.request_url + '/step'
if self.form_id[2] == 'IDENTIFIER':
del payload['validate'][self.form_id[0]][0]['password']
payload['validate'][self.form_id[0]][0]['type'] = 'IDENTIFIER'
doc = self.browser.open(url, json=payload).json()
for k, v in doc['validationUnits'][0].items():
if v[0]['type'] in ('PASSWORD',):
form_id = (k, v[0]['id'], v[0]['type'])
if v[0].get('virtualKeyboard'):
Jerome Berthier
committed
if not password.isdigit():
# Users who get virtualkeyboard must change their passwords
# If there are letters in the password it means they did not do it.
raise BrowserPasswordExpired()
password = self.virtualkeyboard(vk_obj=v[0]['virtualKeyboard'],
Jerome Berthier
committed
password=password)
payload = {
'validate': {
form_id[0]: [{
'password': password,
'type': 'PASSWORD',
}]
}
}
r = self.browser.open(url, json=payload)
self.logger.debug('doc = %s', doc)
if 'phase' in doc and doc['phase']['state'] == 'TERMS_OF_USE':
# Got:
# {u'phase': {u'state': u'TERMS_OF_USE'}, u'validationUnits': [{u'LIST_OF_TERMS': [{u'type': u'TERMS', u'id': u'b7f28f91-7aa0-48aa-8028-deec13ae341b', u'reference': u'CGU_CYBERPLUS'}]}]}
if 'reference' in doc['validationUnits'][0]:
del doc['validationUnits'][0]['reference']
Etienne Lachere
committed
elif 'reference' in doc['validationUnits'][0]['LIST_OF_TERMS'][0]:
del doc['validationUnits'][0]['LIST_OF_TERMS'][0]['reference']
payload = {'validate': doc['validationUnits'][0]}
url = self.request_url + '/step'
r = self.browser.open(url, json=payload)
self.logger.debug('doc = %s', doc)
if 'phase' in doc and doc['phase']['state'] == "ENROLLMENT":
raise ActionNeeded()
if (('phase' in doc and doc['phase']['previousResult'] == 'FAILED_AUTHENTICATION') or
doc['response']['status'] != 'AUTHENTICATION_SUCCESS'):
raise BrowserIncorrectPassword()
data = {'SAMLResponse': doc['response']['saml2_post']['samlResponse']}
self.browser.location(doc['response']['saml2_post']['action'], data=data)
class AlreadyLoginPage(LoggedPage, MyHTMLPage):
def is_here(self):
try:
doc = json.loads(self.response.text)
if 'response' in doc:
return doc['response']['status'] == 'AUTHENTICATION_SUCCESS' and 'saml2_post' in doc['response']
# not a json page
# so it should be Login2Page
return False
return False
class IndexPage(LoggedPage, MyHTMLPage):
url = self.doc.xpath('//frame[@name="portalHeader"]')[0].attrib['src']
v = urlsplit(url)
args = dict(parse_qsl(v.query))
return args['token']
class HomePage(LoggedPage, MyHTMLPage):
# Sometimes, the page is empty but nothing is scrapped on it.
def build_doc(self, data, *args, **kwargs):
if not data:
return None
return super(MyHTMLPage, self).build_doc(data, *args, **kwargs)
@retry(KeyError)
# sometime the server redirects to a bad url, not containing token.
# therefore "return args['token']" crashes with a KeyError
if self.params.get('vary', None) is not None:
vary = self.params['vary']
for script in self.doc.xpath('//script'):
if script.text is None:
continue
m = re.search("'vary', '([\d-]+)'\)", script.text)
if m:
vary = m.group(1)
break
url = self.browser.absurl('/portailinternet/Transactionnel/Pages/CyberIntegrationPage.aspx')
headers = {'Referer': self.url}
# Sometime, the page is a 302 and redirect to a page where there are no information that we need,
# so we try with 2 others url to further fetch token when empty page
r = self.browser.open(url, data='taskId=aUniversMesComptes', params={'vary': vary}, headers=headers)
if not int(r.headers.get('Content-Length', 0)):
r = self.browser.open(url, data='taskId=aUniversMesComptes', headers=headers)
if not int(r.headers.get('Content-Length', 0)):
r = self.browser.open(url, data={'taskId': 'equipementDom'}, params={'vary': vary}, headers=headers)
date = None
for script in doc.xpath('//script'):
if script.text is None:
continue
m = re.search('lastConnectionDate":"([^"]*)"', script.text)
if m:
date = m.group(1)
url = self.browser.absurl('/cyber/ibp/ate/portal/integratedInternet.jsp')
data = 'session%%3Aate.lastConnectionDate=%s&taskId=aUniversMesComptes' % date
headers = {'Referer': r.url}
r = self.browser.open(url, data=data, headers=headers)
v = urlsplit(r.url)
args = dict(parse_qsl(v.query))
return args['token']
class AccountsPage(LoggedPage, MyHTMLPage):
ACCOUNT_TYPES = {u'Mes comptes d\'épargne': Account.TYPE_SAVINGS,
u'Mon épargne': Account.TYPE_SAVINGS,
u'Placements': Account.TYPE_SAVINGS,
u'Liste complète de mon épargne': Account.TYPE_SAVINGS,
u'Mes comptes': Account.TYPE_CHECKING,
u'Comptes en euros': Account.TYPE_CHECKING,
u'Mes comptes en devises': Account.TYPE_CHECKING,
u'Liste complète de mes comptes': Account.TYPE_CHECKING,
u'Mes emprunts': Account.TYPE_LOAN,
u'Liste complète de mes emprunts': Account.TYPE_LOAN,
u'Financements': Account.TYPE_LOAN,
u'Liste complète de mes engagements': Account.TYPE_LOAN,
u'Mes services': None, # ignore this kind of accounts (no bank ones)
u'Équipements': None, # ignore this kind of accounts (no bank ones)
u'Synthèse': None, # ignore this title
PATTERN = [(re.compile('.*Titres Pea.*'), Account.TYPE_PEA),
(re.compile(".*Plan D'epargne En Actions.*"), Account.TYPE_PEA),
(re.compile(".*Compte Especes Pea.*"), Account.TYPE_PEA),
(re.compile('.*Plan Epargne Retraite.*'), Account.TYPE_PERP),
(re.compile('.*Titres.*'), Account.TYPE_MARKET),
(re.compile('.*Selection Vie.*'),Account.TYPE_LIFE_INSURANCE),
(re.compile('^Fructi Pulse.*'), Account.TYPE_MARKET),
(re.compile('^(Quintessa|Solevia).*'), Account.TYPE_MARKET),
(re.compile('^Plan Epargne Enfant Mul.*'), Account.TYPE_MARKET),
(re.compile('^Alc Premium'), Account.TYPE_MARKET),
(re.compile('^Plan Epargne Enfant Msu.*'), Account.TYPE_LIFE_INSURANCE),
def pop_up(self):
if self.doc.xpath('//span[contains(text(), "du navigateur Internet.")]'):
return True
return False
return len(self.doc.xpath('//script[contains(text(), "EQUIPEMENT_COMPLET")]')) > 0
COL_NUMBER = 0
COL_TYPE = 1
COL_LABEL = 2
COL_BALANCE = 3
COL_COMING = 4
def iter_accounts(self, next_pages):
account_type = Account.TYPE_UNKNOWN
params = self.get_params()
actions = self.get_button_actions()
for div in self.doc.xpath('//div[has-class("btit")]'):
if div.text in (None, u'Synthèse'):
account_type = self.ACCOUNT_TYPES.get(div.text.strip(), Account.TYPE_UNKNOWN)
if account_type is None:
# ignore services accounts
self.logger.debug('Ignore account type %s', div.text.strip())
# Go to the full list of this kind of account, if any.
btn = div.getparent().xpath('.//button[span[text()="Suite"]]')
if len(btn) > 0:
_params = params.copy()
_params.update(actions[btn[0].attrib['id']])
next_pages.append(_params)
continue
currency = None
for th in div.getnext().xpath('.//thead//th'):
m = re.match('.*\((\w+)\)$', th.text)
if m and currency is None:
currency = Account.get_currency(m.group(1))
for tr in div.getnext().xpath('.//tbody/tr'):
if 'id' not in tr.attrib:
args = dict(parse_qsl(tr.attrib['id']))
tds = tr.findall('td')
if len(tds) < 4 or 'identifiant' not in args:
self.logger.warning('Unable to parse an account')
continue
account.label = u' '.join([u''.join([txt.strip() for txt in tds[1].itertext()]),
u''.join([txt.strip() for txt in tds[2].itertext()])]).strip()
for pattern, _type in self.PATTERN:
match = pattern.match(account.label)
if match:
account.type = _type
break
else:
account.type = account_type
balance_text = u''.join([txt.strip() for txt in tds[3].itertext()])
balance = FrenchTransaction.clean_amount(balance_text)
account.balance = Decimal(balance or '0.0')
account.currency = currency or Account.get_currency(balance_text)
if account.type == account.TYPE_LOAN:
account.balance = - abs(account.balance)
account._prev_debit = None
account._next_debit = None
account._params = None
account._coming_params = None
if balance != u'' and len(tds[3].xpath('.//a')) > 0:
account._params = params.copy()
account._params['dialogActionPerformed'] = 'SOLDE'
account._params['attribute($SEL_$%s)' % tr.attrib['id'].split('_')[0]] = tr.attrib['id'].split('_', 1)[1]
if len(tds) >= 5 and len(tds[self.COL_COMING].xpath('.//a')) > 0:
_params = account._params.copy()
_params['dialogActionPerformed'] = 'ENCOURS_COMPTE'
# If there is an action needed before going to the cards page, save it.
m = re.search('dialogActionPerformed=([\w_]+)', self.url)
if m and m.group(1) != 'EQUIPEMENT_COMPLET':
_params['prevAction'] = m.group(1)
account._invest_params = params.copy()
account._invest_params['dialogActionPerformed'] = 'CONTRAT'
account._invest_params['attribute($SEL_$%s)' % tr.attrib['id'].split('_')[0]] = tr.attrib['id'].split('_', 1)[1]
# Needed to preserve navigation.
btn = self.doc.xpath('.//button[span[text()="Retour"]]')
if len(btn) > 0:
_params = params.copy()
_params.update(actions[btn[0].attrib['id']])
self.browser.open('/cyber/internet/ContinueTask.do', data=_params)
class AccountsFullPage(AccountsPage):
pass
class CardsPage(LoggedPage, MyHTMLPage):
COL_ID = 1
COL_TYPE = 2
COL_LABEL = 3
COL_DATE = 4
COL_AMOUNT = 5
def iter_accounts(self, next_pages):
params = self.get_params()
for th in self.doc.xpath('//table[@id="tbl1"]//thead//th'):
m = re.match('.*\((\w+)\)$', th.text)
if m and currency is None:
currency = Account.get_currency(m.group(1))
if currency is None:
currency = Account.get_currency(CleanText('//td[@id="tbl1_0_5_Cell"]//span')(self.doc))
for tr in self.doc.xpath('//table[@id="tbl1"]/tbody/tr'):
if len(cols) == 1 and CleanText('.')(cols[0]) == 'pas de carte':
self.logger.debug('there are no cards on this page')
continue
id = CleanText(None).filter(cols[self.COL_ID])
if len(id) > 0:
if account is not None:
yield account
account = Account()
account.type = Account.TYPE_CARD
account.balance = account.coming = Decimal('0')
account._next_debit = datetime.date.today()
account._prev_debit = datetime.date(2000,1,1)
account.label = u' '.join([CleanText(None).filter(cols[self.COL_TYPE]),
CleanText(None).filter(cols[self.COL_LABEL])])
account._params = None
account._coming_params = params.copy()
account._coming_params['dialogActionPerformed'] = 'SELECTION_ENCOURS_CARTE'
account._coming_params['attribute($SEL_$%s)' % tr.attrib['id'].split('_')[0]] = tr.attrib['id'].split('_', 1)[1]
account._coming_count = len(self.doc.xpath('//table[@id="tbl1"]/tbody/tr/td[5]/span[not(contains(text(), "(1)"))]'))
elif account is None:
raise BrokenPageError('Unable to find accounts on cards page')
else:
account._params = params.copy()
account._params['dialogActionPerformed'] = 'SELECTION_ENCOURS_CARTE'
account._params['attribute($SEL_$%s)' % tr.attrib['id'].split('_')[0]] = tr.attrib['id'].split('_', 1)[1]
date_col = CleanText(None).filter(cols[self.COL_DATE])
m = re.search('(\d+)/(\d+)/(\d+)', date_col)
if not m:
self.logger.warning('Unable to parse date %r' % date_col)
continue
date = datetime.date(*[int(c) for c in m.groups()][::-1])
if date.year < 100:
date = date.replace(year=date.year+2000)
amount = Decimal(FrenchTransaction.clean_amount(CleanText(None).filter(cols[self.COL_AMOUNT])))
if not date_col.endswith('(1)'):
# debited
account.coming += - abs(amount)
account._next_debit = date
elif date > account._prev_debit:
account._prev_balance = - abs(amount)
account._prev_debit = date
if account is not None:
yield account
# Needed to preserve navigation.
btn = self.doc.xpath('.//button[span[text()="Retour"]]')
if len(btn) > 0:
actions = self.get_button_actions()
_params = params.copy()
_params.update(actions[btn[0].attrib['id']])
self.browser.open('/cyber/internet/ContinueTask.do', data=_params)
class Transaction(FrenchTransaction):
PATTERNS = [(re.compile('^RET DAB (?P<text>.*?) RETRAIT (DU|LE) (?P<dd>\d{2})(?P<mm>\d{2})(?P<yy>\d+).*'),
FrenchTransaction.TYPE_WITHDRAWAL),
(re.compile('^RET DAB (?P<text>.*?) CARTE ?:.*'),
FrenchTransaction.TYPE_WITHDRAWAL),
(re.compile('^(?P<text>.*) RETRAIT DU (?P<dd>\d{2})(?P<mm>\d{2})(?P<yy>\d{2}) .*'),
FrenchTransaction.TYPE_WITHDRAWAL),
(re.compile('^(RETRAIT CARTE )?RET(RAIT)? DAB (?P<text>.*)'),
FrenchTransaction.TYPE_WITHDRAWAL),
(re.compile('((\w+) )?(?P<dd>\d{2})(?P<mm>\d{2})(?P<yy>\d{2}) CB[:\*][^ ]+ (?P<text>.*)'),
FrenchTransaction.TYPE_CARD),
(re.compile('^VIR(EMENT)? (?P<text>.*)'), FrenchTransaction.TYPE_TRANSFER),
(re.compile('^(PRLV|PRELEVEMENT) (?P<text>.*)'),
FrenchTransaction.TYPE_ORDER),
(re.compile('^(?P<text>CHEQUE .*)'), FrenchTransaction.TYPE_CHECK),
(re.compile('^(AGIOS /|FRAIS) (?P<text>.*)', re.IGNORECASE),
FrenchTransaction.TYPE_BANK),
(re.compile('^(CONVENTION \d+ )?COTIS(ATION)? (?P<text>.*)', re.IGNORECASE),
FrenchTransaction.TYPE_BANK),
(re.compile('^REMISE (?P<text>.*)'), FrenchTransaction.TYPE_DEPOSIT),
(re.compile('^(?P<text>ECHEANCE PRET .*)'), FrenchTransaction.TYPE_LOAN_PAYMENT),
(re.compile('^(?P<text>.*)( \d+)? QUITTANCE .*'),
FrenchTransaction.TYPE_ORDER),
(re.compile('^.* LE (?P<dd>\d{2})/(?P<mm>\d{2})/(?P<yy>\d{2})$'),
FrenchTransaction.TYPE_UNKNOWN),
(re.compile(r'^RELEVE CARTE'), FrenchTransaction.TYPE_CARD_SUMMARY),
class TransactionsPage(LoggedPage, MyHTMLPage):
nxt = self.doc.xpath('//li[contains(@id, "_nxt")]')
if len(nxt) == 0 or nxt[0].attrib.get('class', '') == 'nxt-dis':
return None
params = {}
for field in self.doc.xpath('//input'):
params[field.attrib['name']] = field.attrib.get('value', '')
params['validationStrategy'] = 'NV'
params['pagingDirection'] = 'NEXT'
params['pagerName'] = nxt[0].attrib['id'].split('_', 1)[0]
if len(self.doc.xpath('//table[@id="tbl1"]')) > 0:
if len(self.doc.xpath('//table[@id="TabFact"]')) > 0:
Romain Bignon
committed
raise NotImplementedError('Unable to find what kind of history it is.')
COL_COMPTA_DATE = 0
COL_LABEL = 1
COL_REF = 2 # optional
COL_OP_DATE = -4
COL_VALUE_DATE = -3
COL_DEBIT = -2
COL_CREDIT = -1
# Keep track of the order in the transaction list, so details can be retrieve
# Because each transaction row has a different id
# in the html page for each request of the TransactionPage
for tr in self.doc.xpath('//table[@id="tbl1"]/tbody/tr'):
if len(tds) < 5:
continue
# get the column index of the link to access transaction details
# (only used for GoCardLess transactions so far)
Quentin Defenouillere
committed
t._has_link = bool(tds[self.COL_DEBIT].findall('a') or tds[self.COL_CREDIT].findall('a'))
# XXX We currently take the *value* date, but it will probably
# necessary to use the *operation* one.
# Default sort on website is by compta date, so in browser.py we
# change the sort on value date.
cleaner = CleanText(None).filter
date = cleaner(tds[self.COL_OP_DATE])
vdate = cleaner(tds[self.COL_VALUE_DATE])
raw = cleaner(tds[self.COL_LABEL])
debit = cleaner(tds[self.COL_DEBIT])
credit = cleaner(tds[self.COL_CREDIT])
t._amount_type = 'debit' if t.amount == debit else 'credit'
# Strip the balance displayed in transaction labels
t.label = re.sub('solde en valeur : .*', '', t.label)
t.raw = re.sub('solde en valeur : .*', '', t.raw)
# XXX Fucking hack to include the check number not displayed in the full label.
if re.match("^CHEQUE |^CHQ VOTRE CHEQUE", t.label):
t.raw = '%s No: %s' % (t.raw, cleaner(tds[self.COL_REF]))
# In rare cases, label is empty ..
if not t.label:
t.label = cleaner(tds[self.COL_REF])
# To be able to find by ref on the transaction page
t._ref = cleaner(tds[self.COL_REF])
COL_CARD_DATE = 0
COL_CARD_LABEL = 1
COL_CARD_AMOUNT = 2
def get_card_history(self, account, coming):
if coming:
debit_date = account._next_debit
elif not hasattr(account, '_prev_balance'):
return
else:
debit_date = account._prev_debit
if 'ContinueTask.do' in self.url:
t.parse(debit_date, 'RELEVE CARTE')
t.amount = -account._prev_balance
yield t
currency = Account.get_currency(self.doc\
.xpath('//table[@id="TabFact"]/thead//th')[self.COL_CARD_AMOUNT]\
.text\
.replace('(', ' ')\
.replace(')', ' '))
for i, tr in enumerate(self.doc.xpath('//table[@id="TabFact"]/tbody/tr')):
tds = tr.findall('td')
if len(tds) < 3:
continue
cleaner = CleanText(None).filter
date = cleaner(tds[self.COL_CARD_DATE])
label = cleaner(tds[self.COL_CARD_LABEL])
amount = '-' + cleaner(tds[self.COL_CARD_AMOUNT])
t.parse(debit_date, re.sub(r'[ ]+', ' ', label))
t.set_amount(amount)
t.rdate = t.parse_date(date)
if not t.type:
t.type = Transaction.TYPE_DEFERRED_CARD
if len(self.doc.xpath('//table[@id="tbl1" or @id="TabFact"]//td[@colspan]')) > 0:
if len(self.doc.xpath(u'//div[contains(text(), "Accès à LineBourse")]')) > 0:
script = self.doc.xpath('//body')[0].attrib['onload']
m = re.search(r"','([^']+?)',\[", script, re.MULTILINE)
params = {}
for key, value in re.findall(r"key:'(?P<key>SJRToken)'\,value:'(?P<value>.*?)'}", script, re.MULTILINE):
params[key] = value
return url, params if url and params else None
def get_transaction_table_id(self, ref):
tr = self.doc.xpath('//table[@id="tbl1"]/tbody/tr[.//span[contains(text(), "%s")]]' % ref)[0]
key = 'attribute($SEL_$%s)' % tr.attrib['id'].split('_')[0]
value = tr.attrib['id'].split('_', 1)[1]
return key, value
def get_gocardless_strategy_param(self, transaction):
# A form is filled and send with javascript
# the 'validationStrategy' parameter value only depends on the column
# index in which the link lies
#
# To get more details about how things are done, see the following javascript functions:
#- attachTableRowEvents (atre)
#- attachActiveSelectionEventsOnRow
#- astr
#- updateSelection (uds)
#- selectActionButton (sab)
#- a script element embedded in the html page (search for "tcl5", "tcl6")
assert transaction._has_link
if transaction._amount_type == 'debit':
return 'AV'
elif transaction._amount_type == 'credit':
return 'NV'
class NatixisChoicePage(LoggedPage, HTMLPage):
def on_load(self):
message = CleanText('//span[@class="rf-msgs-sum"]', default='')(self.doc)
if re.search(r"Le service de consultation de votre contrat \w+ est momentanément indisponible.", message):
raise BrowserUnavailable()
# TODO handle when there are multiple accounts on this page
account_tr, = self.doc.xpath('//tbody[@id="list:dataVie:tb"]/tr')
self.logger.info('opening automatically account %s', CleanText('./td[1]')(account_tr))
self.browser.location(Link('./td[1]/a')(account_tr))
class NatixisPage(LoggedPage, HTMLPage):
form = self.get_form(name="formRoutage")
form['javax.faces.source'] = 'formRoutageButton'
form['javax.faces.partial.execute'] = 'formRoutageButton @component'
form['javax.faces.partial.render'] = '@component'
form['AJAX:EVENTS_COUNT'] = '1'
form['javax.faces.partial.ajax'] = 'true'
form['javax.faces.partial.event'] = 'click'
form['org.richfaces.ajax.component'] = 'formRoutageButton'
form['formRoutageButton'] = 'formRoutageButton'
form.request.headers['Faces-Request'] = 'partial/ajax'
class TransactionsBackPage(TransactionsPage):
def is_here(self):
return self.doc.xpath('//div[text()="Liste des écritures"]')
class NatixisRedirect(LoggedPage, XMLPage):
def get_redirect(self):
url = self.doc.xpath('/partial-response/redirect/@url')[0]
return url.replace('http://', 'https://') # why do they use http on a bank site???
class NatixisErrorPage(LoggedPage, HTMLPage):
class IbanPage(LoggedPage, MyHTMLPage):
def need_to_go(self):
return len(self.doc.xpath('//div[@class="grid"]/div/span[contains(text(), "IBAN")]')) == 0
def go_iban(self, account):
for tr in self.doc.xpath('//table[@id]/tbody/tr'):
if account.type not in (Account.TYPE_LOAN, Account.TYPE_MARKET) and CleanText().filter(tr.xpath('./td[1]')) in account.id:
form = self.get_form(id='myForm')
form['token'] = self.build_token(form['token'])
form['dialogActionPerformed'] = "DETAIL_IBAN_RIB"
tr_id = Attr(None, 'id').filter(tr.xpath('.')).split('_')
form[u'attribute($SEL_$%s)' % tr_id[0]] = tr_id[1]
form.submit()