# -*- coding: utf-8 -*-
# Copyright(C) 2012 Romain Bignon
#
# This file is part of a weboob module.
#
# This weboob module is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This weboob module is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this weboob module. If not, see .
from __future__ import unicode_literals
from binascii import hexlify
import datetime
from decimal import Decimal
import re
import sys
from io import BytesIO
from PIL import Image, ImageFilter
from weboob.browser.elements import method, DictElement, ItemElement
from weboob.browser.filters.standard import CleanText, CleanDecimal, Regexp, Eval, Date, Field
from weboob.browser.filters.html import Attr, Link, AttributeNotFound
from weboob.browser.filters.json import Dict
from weboob.exceptions import BrowserUnavailable, BrowserIncorrectPassword, ActionNeeded
from weboob.browser.pages import HTMLPage, LoggedPage, FormNotFound, JsonPage, RawPage, XMLPage
from weboob.capabilities.bank import Account, Investment
from weboob.capabilities.profile import Person
from weboob.capabilities.contact import Advisor
from weboob.capabilities import NotAvailable
from weboob.tools.capabilities.bank.transactions import FrenchTransaction
from weboob.tools.captcha.virtkeyboard import SplitKeyboard
from weboob.tools.decorators import retry
from weboob.tools.compat import urlsplit, parse_qsl
from weboob.tools.json import json
from weboob.tools.misc import to_unicode
from weboob.tools.pdf import get_pdf_rows
class LoggedOut(Exception):
pass
class BrokenPageError(Exception):
pass
class WikipediaARC4(object):
def __init__(self, key=None):
assert isinstance(key, bytes)
self.state = list(range(256))
self.x = self.y = 0
if key is not None:
self.init(key)
@staticmethod
def ord(i):
if sys.version_info.major < 3:
return ord(i)
return i
@staticmethod
def chr(i):
if sys.version_info.major < 3:
return chr(i)
return bytes([i])
def init(self, key):
for i in range(256):
self.x = (self.ord(key[i % len(key)]) + self.state[i] + self.x) & 0xFF
self.state[i], self.state[self.x] = self.state[self.x], self.state[i]
self.x = 0
def crypt(self, input):
output = [None]*len(input)
for i in range(len(input)):
self.x = (self.x + 1) & 0xFF
self.y = (self.state[self.x] + self.y) & 0xFF
self.state[self.x], self.state[self.y] = self.state[self.y], self.state[self.x]
output[i] = self.chr((self.ord(input[i]) ^ self.state[(self.state[self.x] + self.state[self.y]) & 0xFF]))
return b''.join(output)
class BasePage(object):
ENCODING = 'iso-8859-15'
def get_token(self):
token = Attr('//form//input[@name="token"]', 'value', default=NotAvailable)(self.doc)
if not token:
try:
token = Regexp(Attr('//body', 'onload'), "saveToken\('(.*?)'")(self.doc)
except AttributeNotFound:
self.logger.warning('Unable to update token.')
return token
def on_load(self):
token = self.get_token()
if token:
self.browser.token = token
self.logger.debug('Update token to %s', self.browser.token)
def is_error(self):
for script in self.doc.xpath('//script'):
if script.text is not None and \
(u"Le service est momentanément indisponible" in script.text or
u"Le service est temporairement indisponible" in script.text or
u"Votre abonnement ne vous permet pas d'accéder à ces services" in script.text or
u'Merci de bien vouloir nous en excuser' in script.text):
return True
return False
def build_token(self, token):
"""
Each time there is a call to SAB (selectActionButton), the token
available in the form is modified with a key available in JS:
ipsff(function(){TW().ipthk([12, 25, 17, 5, 23, 26, 15, 30, 6]);});
Each value of the array is an index for the current token to append the
char at this position at the end of the token.
"""
table = None
for script in self.doc.xpath('//script'):
if script.text is None:
continue
m = re.search(r'ipthk\(([^\)]+)\)', script.text, flags=re.MULTILINE)
if m:
table = json.loads(m.group(1))
if table is None:
return token
for i in table:
token += token[i]
return token
def get_params(self):
params = {}
for field in self.doc.xpath('//input'):
params[field.attrib['name']] = field.attrib.get('value', '')
return params
def get_button_actions(self):
actions = {}
for script in self.doc.xpath('//script'):
if script.text is None:
continue
for id, action, strategy in re.findall(r'''attEvt\(window,"(?P[^"]+)","click","sab\('(?P[^']+)','(?P[^']+)'\);"''', script.text, re.MULTILINE):
actions[id] = {'dialogActionPerformed': action,
'validationStrategy': strategy,
}
return actions
def get_back_button_params(self, params=None, actions=None):
btn = self.doc.xpath('.//button[span[text()="Retour"]]')
if not btn:
return
params = params or self.get_params()
actions = actions or self.get_button_actions()
key = btn[0].attrib['id']
assert actions.get(key), "Key %s not found in actions %s" % (key, actions) # Currently it never happens
params.update(actions[key])
params['token'] = self.build_token(params['token'])
return params
class MyHTMLPage(BasePage, HTMLPage):
def build_doc(self, data, *args, **kwargs):
# XXX FUCKING HACK BECAUSE BANQUE POPULAIRE ARE NASTY AND INCLUDE NULL
# BYTES IN DOCUMENTS.
data = data.replace(b'\x00', b'')
return super(MyHTMLPage, self).build_doc(data, *args, **kwargs)
class RedirectPage(LoggedPage, MyHTMLPage):
ENCODING = None
"""
var i = 'lyhrnu551jo42yfzx0jm0sqk';
setCookie('i', i);
var welcomeMessage = decodeURI('M MACHIN');
var lastConnectionDate = decodeURI('17 Mai 2013');
var lastConnectionTime = decodeURI('14h27');
var userId = '12345678';
var userCat = '1';
setCookie('uwm', $.rc4EncryptStr(welcomeMessage, i));
setCookie('ulcd', $.rc4EncryptStr(lastConnectionDate, i));
setCookie('ulct', $.rc4EncryptStr(lastConnectionTime, i));
setCookie('uid', $.rc4EncryptStr(userId, i));
setCookie('uc', $.rc4EncryptStr(userCat, i));
var agentCivility = 'Mlle';
var agentFirstName = decodeURI('Jeanne');
var agentLastName = decodeURI('Machin');
var agentMail = decodeURI('gary@example.org');
setCookie('ac', $.rc4EncryptStr(agentCivility, i));
setCookie('afn', $.rc4EncryptStr(agentFirstName, i));
setCookie('aln', $.rc4EncryptStr(agentLastName, i));
setCookie('am', $.rc4EncryptStr(agentMail, i));
var agencyLabel = decodeURI('DTC');
var agencyPhoneNumber = decodeURI('0123456789');
setCookie('al', $.rc4EncryptStr(agencyLabel, i));
setCookie('apn', $.rc4EncryptStr(agencyPhoneNumber, i));
Note: that cookies are useless to login on website
"""
def add_cookie(self, name, value):
# httplib/cookielib don't seem to like unicode cookies...
if sys.version_info.major < 3:
name = to_unicode(name).encode('utf-8')
value = to_unicode(value).encode('utf-8')
self.browser.logger.debug('adding cookie %r=%r', name, value)
self.browser.session.cookies.set(name, value, domain=urlsplit(self.url).hostname)
def on_load(self):
redirect_url = None
args = {}
RC4 = None
for script in self.doc.xpath('//script'):
if script.text is None:
continue
m = re.search('window.location=\'([^\']+)\'', script.text, flags=re.MULTILINE)
if m:
redirect_url = m.group(1)
for line in script.text.split('\r\n'):
m = re.match("^var (\w+) ?= ?[^']*'([^']*)'.*", line)
if m:
args[m.group(1)] = m.group(2)
m = re.match("^setCookie\('([^']+)', (\w+)\);", line)
if m:
self.add_cookie(m.group(1), args[m.group(2)])
m = re.match("^setCookie\('([^']+)', .*rc4EncryptStr\((\w+), \w+\)", line)
if m:
enc = RC4.crypt(args[m.group(2)].encode('ascii'))
self.add_cookie(m.group(1), hexlify(enc).decode('ascii'))
if RC4 is None and 'i' in args:
RC4 = WikipediaARC4(args['i'].encode('ascii'))
if redirect_url is not None:
url = self.browser.absurl(redirect_url)
headers = {'Referer': self.url}
self.browser.logger.debug('redir...')
self.browser.location(url, headers=headers)
try:
form = self.get_form(name="CyberIngtegrationPostForm")
except FormNotFound:
pass
else:
form.submit()
class ErrorPage(LoggedPage, MyHTMLPage):
def on_load(self):
if CleanText('//script[contains(text(), "momentanément indisponible")]')(self.doc):
raise BrowserUnavailable(u"Le service est momentanément indisponible")
elif CleanText('//h1[contains(text(), "Cette page est indisponible")]')(self.doc):
raise BrowserUnavailable('Cette page est indisponible')
return super(ErrorPage, self).on_load()
def get_token(self):
try:
buf = self.doc.xpath('//body/@onload')[0]
except IndexError:
return
else:
m = re.search("saveToken\('([^']+)'\)", buf)
if m:
return m.group(1)
class UnavailablePage(LoggedPage, MyHTMLPage):
def on_load(self):
h1 = CleanText('//h1[1]')(self.doc)
if "est indisponible" in h1:
raise BrowserUnavailable(h1)
body = CleanText(".")(self.doc)
if "An unexpected error has occurred." in body or "Une erreur s'est produite" in body:
raise BrowserUnavailable(body)
a = Link('//a[@class="btn"][1]', default=None)(self.doc)
if not a:
raise BrowserUnavailable()
self.browser.location(a)
class LoginPage(MyHTMLPage):
def on_load(self):
h1 = CleanText('//h1[1]')(self.doc)
if h1.startswith('Le service est moment'):
text = CleanText('//h4[1]')(self.doc) or h1
raise BrowserUnavailable(text)
if not self.browser.no_login:
raise LoggedOut()
def login(self, login, passwd):
form = self.get_form(name='Login')
form['IDToken1'] = login.encode(self.ENCODING)
form['IDToken2'] = passwd.encode(self.ENCODING)
form.submit()
class MyVirtKeyboard(SplitKeyboard):
char_to_hash = {
'0': 'cce0f72c47c74a3dde57c4fdbcda1db4',
'1': 'f5d22afb3ece4dec73bd8a2a4c2844da',
'2': '6d3e5db2ccac3f2c13c1f0ba22571857',
'3': ('c8e4f6addac4d322f0f9668d472a146c', '34d0566ea3f2330c675365da3178f6ab'),
'4': '8a8c769418ec829c208ed442fbf5fe77',
'5': '2c3ae480bc91f73b431b048b584026c7',
'6': 'a80d639443818e838b434c36dd518df5',
'7': '8e59048702e4c5f89bbbc1a598d06d1e',
'8': '46bc59a5b288c63477ff52811a3961c5',
'9': 'a7bf34568154ef91e990aa5bade3e946',
}
codesep = ' '
def convert(self, buffer):
im = Image.open(BytesIO(buffer))
im = im.resize((5, 8), Image.BICUBIC)
im = im.filter(ImageFilter.UnsharpMask(radius=2,
percent=110,
threshold=3))
im = im.convert("L", dither=Image.NONE)
im = Image.eval(im, lambda x: 0 if x < 160 else 255)
s = BytesIO()
im.save(s, 'png')
return s.getvalue()
class Login2Page(LoginPage):
@property
def request_url(self):
transactionID = self.params['transactionID']
assert transactionID
return self.browser.redirect_url + transactionID
def on_load(self):
if not self.browser.no_login:
raise LoggedOut()
r = self.browser.open(self.request_url)
doc = r.json()
self.form_id, = [(k, v[0]['id'], v[0]['type']) for k, v in doc['step']['validationUnits'][0].items() if v[0]['type'] in ('PASSWORD_LOOKUP', 'IDENTIFIER')]
def virtualkeyboard(self, vk_obj, password):
imgs = {}
lst_img = self.browser.location(vk_obj['externalRestMediaApiUrl']).json()
for img_info in lst_img:
value = img_info['value']
url = img_info['uri']
resp = self.browser.location(url)
imgs[value] = resp.content
return MyVirtKeyboard(imgs).get_string_code(password)
def login(self, login, password):
payload = {
'validate': {
self.form_id[0]: [ {
'id': self.form_id[1],
'login': login.upper(),
'password': password,
'type': 'PASSWORD_LOOKUP',
} ]
}
}
url = self.request_url + '/step'
if self.form_id[2] == 'IDENTIFIER':
del payload['validate'][self.form_id[0]][0]['password']
payload['validate'][self.form_id[0]][0]['type'] = 'IDENTIFIER'
doc = self.browser.open(url, json=payload).json()
for k, v in doc['validationUnits'][0].items():
if v[0]['type'] in ('PASSWORD',):
form_id = (k, v[0]['id'], v[0]['type'])
if v[0].get('virtualKeyboard'):
if not password.isdigit():
# Users who get the virtual keyboard needs a password with digits only
raise BrowserIncorrectPassword()
password = self.virtualkeyboard(vk_obj=v[0]['virtualKeyboard'],
password=password)
payload = {
'validate': {
form_id[0]: [{
'id': form_id[1],
'password': password,
'type': 'PASSWORD',
}]
}
}
r = self.browser.open(url, json=payload)
doc = r.json()
self.logger.debug('doc = %s', doc)
if 'phase' in doc and doc['phase']['state'] == 'TERMS_OF_USE':
# Got:
# {u'phase': {u'state': u'TERMS_OF_USE'}, u'validationUnits': [{u'LIST_OF_TERMS': [{u'type': u'TERMS', u'id': u'b7f28f91-7aa0-48aa-8028-deec13ae341b', u'reference': u'CGU_CYBERPLUS'}]}]}
if 'reference' in doc['validationUnits'][0]:
del doc['validationUnits'][0]['reference']
elif 'reference' in doc['validationUnits'][0]['LIST_OF_TERMS'][0]:
del doc['validationUnits'][0]['LIST_OF_TERMS'][0]['reference']
payload = {'validate': doc['validationUnits'][0]}
url = self.request_url + '/step'
r = self.browser.open(url, json=payload)
doc = r.json()
self.logger.debug('doc = %s', doc)
if 'phase' in doc and doc['phase']['state'] == "ENROLLMENT":
raise ActionNeeded()
if (('phase' in doc and doc['phase']['previousResult'] == 'FAILED_AUTHENTICATION') or
doc['response']['status'] != 'AUTHENTICATION_SUCCESS'):
raise BrowserIncorrectPassword()
data = {'SAMLResponse': doc['response']['saml2_post']['samlResponse']}
self.browser.location(doc['response']['saml2_post']['action'], data=data)
class AlreadyLoginPage(LoggedPage, MyHTMLPage):
def is_here(self):
try:
doc = json.loads(self.response.text)
if 'response' in doc:
return doc['response']['status'] == 'AUTHENTICATION_SUCCESS' and 'saml2_post' in doc['response']
except ValueError:
# not a json page
# so it should be Login2Page
return False
return False
class IndexPage(LoggedPage, MyHTMLPage):
def get_token(self):
url = self.doc.xpath('//frame[@name="portalHeader"]')[0].attrib['src']
v = urlsplit(url)
args = dict(parse_qsl(v.query))
return args['token']
class HomePage(LoggedPage, MyHTMLPage):
# Sometimes, the page is empty but nothing is scrapped on it.
def build_doc(self, data, *args, **kwargs):
if not data:
return None
return super(MyHTMLPage, self).build_doc(data, *args, **kwargs)
@retry(KeyError)
# sometime the server redirects to a bad url, not containing token.
# therefore "return args['token']" crashes with a KeyError
def get_token(self):
vary = None
if self.params.get('vary', None) is not None:
vary = self.params['vary']
else:
for script in self.doc.xpath('//script'):
if script.text is None:
continue
m = re.search("'vary', '([\d-]+)'\)", script.text)
if m:
vary = m.group(1)
break
url = self.browser.absurl('/portailinternet/Transactionnel/Pages/CyberIntegrationPage.aspx')
headers = {'Referer': self.url}
# Sometime, the page is a 302 and redirect to a page where there are no information that we need,
# so we try with 2 others url to further fetch token when empty page
r = self.browser.open(url, data='taskId=aUniversMesComptes', params={'vary': vary}, headers=headers)
if not int(r.headers.get('Content-Length', 0)):
r = self.browser.open(url, data='taskId=aUniversMesComptes', headers=headers)
if not int(r.headers.get('Content-Length', 0)):
r = self.browser.open(url, data={'taskId': 'equipementDom'}, params={'vary': vary}, headers=headers)
doc = r.page.doc
date = None
for script in doc.xpath('//script'):
if script.text is None:
continue
m = re.search('lastConnectionDate":"([^"]*)"', script.text)
if m:
date = m.group(1)
url = self.browser.absurl('/cyber/ibp/ate/portal/integratedInternet.jsp')
data = 'session%%3Aate.lastConnectionDate=%s&taskId=aUniversMesComptes' % date
headers = {'Referer': r.url}
r = self.browser.open(url, data=data, headers=headers)
v = urlsplit(r.url)
args = dict(parse_qsl(v.query))
return args['token']
class AccountsPage(LoggedPage, MyHTMLPage):
ACCOUNT_TYPES = {u'Mes comptes d\'épargne': Account.TYPE_SAVINGS,
u'Mon épargne': Account.TYPE_SAVINGS,
u'Placements': Account.TYPE_SAVINGS,
u'Liste complète de mon épargne': Account.TYPE_SAVINGS,
u'Mes comptes': Account.TYPE_CHECKING,
u'Comptes en euros': Account.TYPE_CHECKING,
u'Mes comptes en devises': Account.TYPE_CHECKING,
u'Liste complète de mes comptes': Account.TYPE_CHECKING,
u'Mes emprunts': Account.TYPE_LOAN,
u'Liste complète de mes emprunts': Account.TYPE_LOAN,
u'Financements': Account.TYPE_LOAN,
u'Liste complète de mes engagements': Account.TYPE_LOAN,
u'Mes services': None, # ignore this kind of accounts (no bank ones)
u'Équipements': None, # ignore this kind of accounts (no bank ones)
u'Synthèse': None, # ignore this title
}
PATTERN = [(re.compile('.*Titres Pea.*'), Account.TYPE_PEA),
(re.compile(".*Plan D'epargne En Actions.*"), Account.TYPE_PEA),
(re.compile(".*Compte Especes Pea.*"), Account.TYPE_PEA),
(re.compile('.*Plan Epargne Retraite.*'), Account.TYPE_PERP),
(re.compile('.*Titres.*'), Account.TYPE_MARKET),
(re.compile('.*Selection Vie.*'),Account.TYPE_LIFE_INSURANCE),
(re.compile('^Fructi Pulse.*'), Account.TYPE_MARKET),
(re.compile('^(Quintessa|Solevia).*'), Account.TYPE_MARKET),
(re.compile('^Plan Epargne Enfant Mul.*'), Account.TYPE_MARKET),
(re.compile('^Alc Premium'), Account.TYPE_MARKET),
(re.compile('^Plan Epargne Enfant Msu.*'), Account.TYPE_LIFE_INSURANCE),
]
def pop_up(self):
if self.doc.xpath('//span[contains(text(), "du navigateur Internet.")]'):
return True
return False
def is_short_list(self):
return len(self.doc.xpath('//script[contains(text(), "EQUIPEMENT_COMPLET")]')) > 0
COL_NUMBER = 0
COL_TYPE = 1
COL_LABEL = 2
COL_BALANCE = 3
COL_COMING = 4
def iter_accounts(self, next_pages):
account_type = Account.TYPE_UNKNOWN
params = self.get_params()
actions = self.get_button_actions()
for div in self.doc.xpath('//div[has-class("btit")]'):
if div.text in (None, u'Synthèse'):
continue
account_type = self.ACCOUNT_TYPES.get(div.text.strip(), Account.TYPE_UNKNOWN)
if account_type is None:
# ignore services accounts
self.logger.debug('Ignore account type %s', div.text.strip())
continue
# Go to the full list of this kind of account, if any.
btn = div.getparent().xpath('.//button[span[text()="Suite"]]')
if len(btn) > 0:
_params = params.copy()
_params.update(actions[btn[0].attrib['id']])
next_pages.append(_params)
continue
currency = None
for th in div.getnext().xpath('.//thead//th'):
m = re.match('.*\((\w+)\)$', th.text)
if m and currency is None:
currency = Account.get_currency(m.group(1))
for tr in div.getnext().xpath('.//tbody/tr'):
if 'id' not in tr.attrib:
continue
args = dict(parse_qsl(tr.attrib['id']))
tds = tr.findall('td')
if len(tds) < 4 or 'identifiant' not in args:
self.logger.warning('Unable to parse an account')
continue
account = Account()
account.id = args['identifiant'].replace(' ', '')
account.label = u' '.join([u''.join([txt.strip() for txt in tds[1].itertext()]),
u''.join([txt.strip() for txt in tds[2].itertext()])]).strip()
for pattern, _type in self.PATTERN:
match = pattern.match(account.label)
if match:
account.type = _type
break
else:
account.type = account_type
balance_text = u''.join([txt.strip() for txt in tds[3].itertext()])
balance = FrenchTransaction.clean_amount(balance_text)
account.balance = Decimal(balance or '0.0')
account.currency = currency or Account.get_currency(balance_text)
if account.type == account.TYPE_LOAN:
account.balance = - abs(account.balance)
account._prev_debit = None
account._next_debit = None
account._params = None
account._coming_params = None
account._coming_count = None
account._invest_params = None
if balance != u'' and len(tds[3].xpath('.//a')) > 0:
account._params = params.copy()
account._params['dialogActionPerformed'] = 'SOLDE'
account._params['attribute($SEL_$%s)' % tr.attrib['id'].split('_')[0]] = tr.attrib['id'].split('_', 1)[1]
if len(tds) >= 5 and len(tds[self.COL_COMING].xpath('.//a')) > 0:
_params = account._params.copy()
_params['dialogActionPerformed'] = 'ENCOURS_COMPTE'
_params['attribute($SEL_$%s)' % tr.attrib['id'].split('_')[0]] = tr.attrib['id'].split('_', 1)[1]
# If there is an action needed before going to the cards page, save it.
m = re.search('dialogActionPerformed=([\w_]+)', self.url)
if m and m.group(1) != 'EQUIPEMENT_COMPLET':
_params['prevAction'] = m.group(1)
next_pages.append(_params)
if not account._params:
account._invest_params = params.copy()
account._invest_params['dialogActionPerformed'] = 'CONTRAT'
account._invest_params['attribute($SEL_$%s)' % tr.attrib['id'].split('_')[0]] = tr.attrib['id'].split('_', 1)[1]
yield account
# Needed to preserve navigation.
self.browser.follow_back_button_if_any(params=params.copy(), actions=actions)
class AccountsFullPage(AccountsPage):
pass
class CardsPage(LoggedPage, MyHTMLPage):
COL_ID = 1
COL_TYPE = 2
COL_LABEL = 3
COL_DATE = 4
COL_AMOUNT = 5
def iter_accounts(self, next_pages):
params = self.get_params()
account = None
currency = None
for th in self.doc.xpath('//table[@id="tbl1"]//thead//th'):
m = re.match('.*\((\w+)\)$', th.text)
if m and currency is None:
currency = Account.get_currency(m.group(1))
if currency is None:
currency = Account.get_currency(CleanText('//td[@id="tbl1_0_5_Cell"]//span')(self.doc))
for tr in self.doc.xpath('//table[@id="tbl1"]/tbody/tr'):
cols = tr.xpath('./td')
if len(cols) == 1 and CleanText('.')(cols[0]) == 'pas de carte':
self.logger.debug('there are no cards on this page')
continue
# We are processing another card, so reset account
if CleanText('.')(cols[0]) and account is not None:
yield account
account = None
id = CleanText(None).filter(cols[self.COL_ID])
if len(id) > 0:
if account is not None:
yield account
account = Account()
account.id = id.replace(' ', '')
account.type = Account.TYPE_CARD
account.balance = account.coming = Decimal('0')
account._next_debit = datetime.date.today()
account._prev_debit = datetime.date(2000,1,1)
account.label = u' '.join([CleanText(None).filter(cols[self.COL_TYPE]),
CleanText(None).filter(cols[self.COL_LABEL])])
account.currency = currency
account._params = None
account._invest_params = None
account._coming_params = params.copy()
account._coming_params['dialogActionPerformed'] = 'SELECTION_ENCOURS_CARTE'
account._coming_params['attribute($SEL_$%s)' % tr.attrib['id'].split('_')[0]] = tr.attrib['id'].split('_', 1)[1]
# select current row and next rows till parent name is empty
account._coming_start = int(tr.attrib['id'].split('_', 1)[1])
account._coming_count = 1
for row in tr.xpath('./following-sibling::tr[./td[5]/span[not(contains(text(), "(1)"))]]'):
if CleanText('./td[2]')(row):
break
account._coming_count += 1
elif account is None:
raise BrokenPageError('Unable to find accounts on cards page')
else:
account._params = params.copy()
account._params['dialogActionPerformed'] = 'SELECTION_ENCOURS_CARTE'
account._params['attribute($SEL_$%s)' % tr.attrib['id'].split('_')[0]] = tr.attrib['id'].split('_', 1)[1]
date_col = CleanText(None).filter(cols[self.COL_DATE])
m = re.search('(\d+)/(\d+)/(\d+)', date_col)
if not m:
self.logger.warning('Unable to parse date %r' % date_col)
continue
date = datetime.date(*[int(c) for c in m.groups()][::-1])
if date.year < 100:
date = date.replace(year=date.year+2000)
amount = Decimal(FrenchTransaction.clean_amount(CleanText(None).filter(cols[self.COL_AMOUNT])))
if not date_col.endswith('(1)'):
# debited
account.coming += - abs(amount)
account._next_debit = date
elif date > account._prev_debit:
account._prev_balance = - abs(amount)
account._prev_debit = date
if account is not None:
yield account
# Needed to preserve navigation.
self.browser.follow_back_button_if_any(params=params.copy())
class Transaction(FrenchTransaction):
PATTERNS = [(re.compile('^RET DAB (?P.*?) RETRAIT (DU|LE) (?P\d{2})(?P\d{2})(?P\d+).*'),
FrenchTransaction.TYPE_WITHDRAWAL),
(re.compile('^RET DAB (?P.*?) CARTE ?:.*'),
FrenchTransaction.TYPE_WITHDRAWAL),
(re.compile('^(?P.*) RETRAIT DU (?P\d{2})(?P\d{2})(?P\d{2}) .*'),
FrenchTransaction.TYPE_WITHDRAWAL),
(re.compile('^(RETRAIT CARTE )?RET(RAIT)? DAB (?P.*)'),
FrenchTransaction.TYPE_WITHDRAWAL),
(re.compile('((\w+) )?(?P\d{2})(?P\d{2})(?P\d{2}) CB[:\*][^ ]+ (?P.*)'),
FrenchTransaction.TYPE_CARD),
(re.compile('^VIR(EMENT)? (?P.*)'), FrenchTransaction.TYPE_TRANSFER),
(re.compile('^(PRLV|PRELEVEMENT) (?P.*)'),
FrenchTransaction.TYPE_ORDER),
(re.compile('^(?PCHEQUE .*)'), FrenchTransaction.TYPE_CHECK),
(re.compile('^(AGIOS /|FRAIS) (?P.*)', re.IGNORECASE),
FrenchTransaction.TYPE_BANK),
(re.compile('^(CONVENTION \d+ )?COTIS(ATION)? (?P.*)', re.IGNORECASE),
FrenchTransaction.TYPE_BANK),
(re.compile('^REMISE (?P.*)'), FrenchTransaction.TYPE_DEPOSIT),
(re.compile('^(?PECHEANCE PRET .*)'), FrenchTransaction.TYPE_LOAN_PAYMENT),
(re.compile('^(?P.*)( \d+)? QUITTANCE .*'),
FrenchTransaction.TYPE_ORDER),
(re.compile('^.* LE (?P\d{2})/(?P\d{2})/(?P\d{2})$'),
FrenchTransaction.TYPE_UNKNOWN),
(re.compile(r'^RELEVE CARTE'), FrenchTransaction.TYPE_CARD_SUMMARY),
]
class TransactionsPage(LoggedPage, MyHTMLPage):
def get_next_params(self):
nxt = self.doc.xpath('//li[contains(@id, "_nxt")]')
if len(nxt) == 0 or nxt[0].attrib.get('class', '') == 'nxt-dis':
return None
params = {}
for field in self.doc.xpath('//input'):
params[field.attrib['name']] = field.attrib.get('value', '')
params['validationStrategy'] = 'NV'
params['pagingDirection'] = 'NEXT'
params['pagerName'] = nxt[0].attrib['id'].split('_', 1)[0]
return params
def get_history(self, account, coming):
if len(self.doc.xpath('//table[@id="tbl1"]')) > 0:
return self.get_account_history()
if len(self.doc.xpath('//table[@id="TabFact"]')) > 0:
return self.get_card_history(account, coming)
raise NotImplementedError('Unable to find what kind of history it is.')
COL_COMPTA_DATE = 0
COL_LABEL = 1
COL_REF = 2 # optional
COL_OP_DATE = -4
COL_VALUE_DATE = -3
COL_DEBIT = -2
COL_CREDIT = -1
def get_account_history(self):
# Keep track of the order in the transaction list, so details can be retrieve
# Because each transaction row has a different id
# in the html page for each request of the TransactionPage
for tr in self.doc.xpath('//table[@id="tbl1"]/tbody/tr'):
tds = tr.findall('td')
if len(tds) < 5:
continue
t = Transaction()
# get the column index of the link to access transaction details
# (only used for GoCardLess transactions so far)
t._has_link = bool(tds[self.COL_DEBIT].findall('a') or tds[self.COL_CREDIT].findall('a'))
# XXX We currently take the *value* date, but it will probably
# necessary to use the *operation* one.
# Default sort on website is by compta date, so in browser.py we
# change the sort on value date.
cleaner = CleanText(None).filter
date = cleaner(tds[self.COL_OP_DATE])
vdate = cleaner(tds[self.COL_VALUE_DATE])
raw = cleaner(tds[self.COL_LABEL])
debit = cleaner(tds[self.COL_DEBIT])
credit = cleaner(tds[self.COL_CREDIT])
t.bdate = Date(dayfirst=True).filter(cleaner(tds[self.COL_COMPTA_DATE]))
t.parse(date, re.sub(r'[ ]+', ' ', raw), vdate)
t.set_amount(credit, debit)
t._amount_type = 'debit' if t.amount == debit else 'credit'
# Strip the balance displayed in transaction labels
t.label = re.sub('solde en valeur : .*', '', t.label)
t.raw = re.sub('solde en valeur : .*', '', t.raw)
# XXX Fucking hack to include the check number not displayed in the full label.
if re.match("^CHEQUE |^CHQ VOTRE CHEQUE", t.label):
t.raw = '%s No: %s' % (t.raw, cleaner(tds[self.COL_REF]))
# In rare cases, label is empty ..
if not t.label:
t.label = cleaner(tds[self.COL_REF])
# To be able to find by ref on the transaction page
t._ref = cleaner(tds[self.COL_REF])
yield t
COL_CARD_DATE = 0
COL_CARD_LABEL = 1
COL_CARD_AMOUNT = 2
def get_card_history(self, account, coming):
if coming:
debit_date = account._next_debit
elif not hasattr(account, '_prev_balance'):
return
else:
debit_date = account._prev_debit
if 'ContinueTask.do' in self.url:
t = Transaction()
t.parse(debit_date, 'RELEVE CARTE')
t.amount = -account._prev_balance
yield t
currency = Account.get_currency(self.doc\
.xpath('//table[@id="TabFact"]/thead//th')[self.COL_CARD_AMOUNT]\
.text\
.replace('(', ' ')\
.replace(')', ' '))
for i, tr in enumerate(self.doc.xpath('//table[@id="TabFact"]/tbody/tr')):
tds = tr.findall('td')
if len(tds) < 3:
continue
t = Transaction()
cleaner = CleanText(None).filter
date = cleaner(tds[self.COL_CARD_DATE])
label = cleaner(tds[self.COL_CARD_LABEL])
amount = '-' + cleaner(tds[self.COL_CARD_AMOUNT])
t.parse(debit_date, re.sub(r'[ ]+', ' ', label))
t.set_amount(amount)
t.rdate = t.bdate = t.parse_date(date)
t.original_currency = currency
if not t.type:
t.type = Transaction.TYPE_DEFERRED_CARD
yield t
def no_operations(self):
if len(self.doc.xpath('//table[@id="tbl1" or @id="TabFact"]//td[@colspan]')) > 0:
return True
if len(self.doc.xpath(u'//div[contains(text(), "Accès à LineBourse")]')) > 0:
return True
return False
def get_investment_page_params(self):
script = self.doc.xpath('//body')[0].attrib['onload']
url = None
m = re.search(r"','([^']+?)',\[", script, re.MULTILINE)
if m:
url = m.group(1)
params = {}
for key, value in re.findall(r"key:'(?PSJRToken)'\,value:'(?P.*?)'}", script, re.MULTILINE):
params[key] = value
return url, params if url and params else None
def get_transaction_table_id(self, ref):
tr = self.doc.xpath('//table[@id="tbl1"]/tbody/tr[.//span[contains(text(), "%s")]]' % ref)[0]
key = 'attribute($SEL_$%s)' % tr.attrib['id'].split('_')[0]
value = tr.attrib['id'].split('_', 1)[1]
return key, value
def get_gocardless_strategy_param(self, transaction):
# A form is filled and send with javascript
# the 'validationStrategy' parameter value only depends on the column
# index in which the link lies
#
# To get more details about how things are done, see the following javascript functions:
#- attachTableRowEvents (atre)
#- attachActiveSelectionEventsOnRow
#- astr
#- updateSelection (uds)
#- selectActionButton (sab)
#- a script element embedded in the html page (search for "tcl5", "tcl6")
assert transaction._has_link
if transaction._amount_type == 'debit':
return 'AV'
elif transaction._amount_type == 'credit':
return 'NV'
class NatixisChoicePage(LoggedPage, HTMLPage):
def on_load(self):
message = CleanText('//span[@class="rf-msgs-sum"]', default='')(self.doc)
if re.search(r"Le service de consultation de votre contrat \w+ est momentanément indisponible.", message):
raise BrowserUnavailable()
# TODO handle when there are multiple accounts on this page
account_tr, = self.doc.xpath('//tbody[@id="list:dataVie:tb"]/tr')
self.logger.info('opening automatically account %s', CleanText('./td[1]')(account_tr))
self.browser.location(Link('./td[1]/a')(account_tr))
class NatixisPage(LoggedPage, HTMLPage):
def on_load(self):
form = self.get_form(name="formRoutage")
form['javax.faces.source'] = 'formRoutageButton'
form['javax.faces.partial.execute'] = 'formRoutageButton @component'
form['javax.faces.partial.render'] = '@component'
form['AJAX:EVENTS_COUNT'] = '1'
form['javax.faces.partial.ajax'] = 'true'
form['javax.faces.partial.event'] = 'click'
form['org.richfaces.ajax.component'] = 'formRoutageButton'
form['formRoutageButton'] = 'formRoutageButton'
form.request.headers['Faces-Request'] = 'partial/ajax'
form.submit()
class TransactionsBackPage(TransactionsPage):
def is_here(self):
return self.doc.xpath('//div[text()="Liste des écritures"]')
class NatixisRedirect(LoggedPage, XMLPage):
def get_redirect(self):
url = self.doc.xpath('/partial-response/redirect/@url')[0]
return url.replace('http://', 'https://') # why do they use http on a bank site???
class NatixisErrorPage(LoggedPage, HTMLPage):
pass
class IbanPage(LoggedPage, MyHTMLPage):
def need_to_go(self):
return len(self.doc.xpath('//div[@class="grid"]/div/span[contains(text(), "IBAN")]')) == 0
def go_iban(self, account):
for tr in self.doc.xpath('//table[@id]/tbody/tr'):
conditions = (
account.type not in (Account.TYPE_LOAN, Account.TYPE_MARKET),
CleanText().filter(tr.xpath('./td[1]')) in account.id,
self.doc.xpath('//div[contains(text(), "Impression IBAN/RIB")]'),
)
if all(conditions):
form = self.get_form(id='myForm')
form['token'] = self.build_token(form['token'])
form['dialogActionPerformed'] = "DETAIL_IBAN_RIB"
tr_id = Attr(None, 'id').filter(tr.xpath('.')).split('_')
form[u'attribute($SEL_$%s)' % tr_id[0]] = tr_id[1]
form.submit()
return True
return False
def get_iban(self, acc_id):
iban_class = None
for div in self.doc.xpath('//div[@class="grid"]/div'):
if not iban_class and "IBAN" in CleanText().filter(div.xpath('./span')):
iban_class = Attr(None, 'class').filter(div.xpath('.'))
elif iban_class is not None and iban_class == Attr(None, 'class').filter(div.xpath('.')):
iban = CleanText().filter(div.xpath('.')).replace(' ', '')
if re.sub('\D', '', acc_id) in iban:
return iban
return NotAvailable
class EtnaPage(LoggedPage, MyHTMLPage):
pass
def float_to_decimal(f):
# Decimal(float_value) gives horrible results, convert to str first
return Decimal(str(f))
class NatixisInvestPage(LoggedPage, JsonPage):
@method
class get_investments(DictElement):
item_xpath = 'detailContratVie/valorisation/supports'
class item(ItemElement):
klass = Investment
obj_label = CleanText(Dict('nom'))
obj_code = CleanText(Dict('codeIsin'))
def obj_vdate(self):
dt = Dict('dateValeurUniteCompte', default=None)(self)
if dt is None:
dt = self.page.doc['detailContratVie']['valorisation']['date']
return Date().filter(dt)
obj_valuation = Eval(float_to_decimal, Dict('montant'))
obj_quantity = Eval(float_to_decimal, Dict('nombreUnitesCompte'))
obj_unitvalue = Eval(float_to_decimal, Dict('valeurUniteCompte'))
obj_portfolio_share = Eval(lambda x: float_to_decimal(x) / 100, Dict('repartition'))
class NatixisHistoryPage(LoggedPage, JsonPage):
@method
class get_history(DictElement):
item_xpath = None
class item(ItemElement):
klass = Transaction
obj_amount = Eval(float_to_decimal, Dict('montantNet'))
obj_raw = CleanText(Dict('libelle', default=''))
obj_vdate = Date(Dict('dateValeur', default=NotAvailable), default=NotAvailable)
obj_date = Date(Dict('dateEffet', default=NotAvailable), default=NotAvailable)
obj_investments = NotAvailable
obj_type = Transaction.TYPE_BANK
def validate(self, obj):
return bool(obj.raw) and bool(obj.date)
def use_invest_date(tr):
dates = [invest.vdate for invest in tr.investments]
if not dates:
return
assert all(d == dates[0] for d in dates)
tr.date = dates[0]
class NatixisDetailsPage(LoggedPage, RawPage):
def build_doc(self, data):
return list(get_pdf_rows(data))
COL_LABEL = 0
COL_DATE = 1
COL_TR_AMOUNT = 2
COL_VALUATION = 3
COL_UNITVALUE = 4
COL_QUANTITY = 5
# warning: tr amount is "brut", unlike invest amounts ("net")...
def get_history(self):
tr = None
for page in self.doc:
for n, row in enumerate(page):
if len(row) != 7:
continue
label = ' '.join(row[self.COL_LABEL])
if row[self.COL_TR_AMOUNT]:
if tr is not None:
if n == 0 and label == tr.label:
self.logger.debug('%r seems to continue on next page', tr)
continue
yield tr
tr = None
if not label:
# this pdf is really cryptic...
# we assume blue rows are a new transaction
# but if no label, it doesn't appear in the website json
continue
tr = Transaction()
tr.type = Transaction.TYPE_BANK
tr.raw = tr.label = label
tr.amount = CleanDecimal(replace_dots=True).filter(''.join(row[self.COL_TR_AMOUNT]))
elif not row[self.COL_DATE]:
if not tr:
# ignore transactions with the empty label, see above
continue
if label == 'Investissement':
tr.amount = abs(tr.amount)
elif label == 'Désinvestissement':
tr.amount = -abs(tr.amount)
else:
assert False, 'unhandled line %s' % label
assert not any(len(cell) for cell in row[self.COL_LABEL+1:]), 'there should be only the label'
else:
if not tr:
continue
inv = Investment()
inv.label = label
inv.valuation = CleanDecimal(replace_dots=True).filter(row[self.COL_VALUATION])
if tr.amount < 0:
inv.valuation = -inv.valuation
inv.vdate = Date(dayfirst=True).filter(''.join(row[self.COL_DATE]))
tr.date = inv.vdate
inv.quantity = CleanDecimal(replace_dots=True, default=NotAvailable).filter(''.join(row[self.COL_QUANTITY]))
if inv.quantity and tr.amount < 0:
inv.quantity = -inv.quantity
inv.unitvalue = CleanDecimal(replace_dots=True, default=NotAvailable).filter(''.join(row[self.COL_UNITVALUE]))
tr.investments.append(inv)
if tr:
yield tr
class AdvisorPage(LoggedPage, MyHTMLPage):
@method
class get_advisor(ItemElement):
klass = Advisor
condition = lambda self: Field('name')(self)
obj_name = CleanText(u'//div[label[contains(text(), "Votre conseiller")]]/span')
obj_agency = CleanText(u'//div[label[contains(text(), "Votre agence")]]/span')
obj_email = obj_mobile = NotAvailable
@method
class update_agency(ItemElement):
obj_phone = CleanText(u'//div[label[contains(text(), "Téléphone")]]/span', replace=[('.', '')])
obj_fax = CleanText(u'//div[label[contains(text(), "Fax")]]/span', replace=[('.', '')])
obj_address = CleanText(u'//div[div[contains(text(), "Votre agence")]]/following-sibling::div[1]//div[not(label)]/span')
def get_profile(self):
profile = Person()
# the name is only available in a welcome message. Sometimes, the message will look like that :
# "Bienvenue M - " and sometimes just "Bienvenue M "
# Or even "Bienvenue "
# We need to detect wether the company name is there, and where it begins.
# relying on the dash only is dangerous as people may have dashes in their name and so may companies.
# but we can detect company name from a dash between space
# because we consider that impossible to be called jean - charles but only jean-charles
welcome_msg = CleanText('//div[@id="BlcBienvenue"]/div[@class="btit"]')(self.doc)
full_name_re = re.search(r'Bienvenue\s(((?! - ).)*)( - )?(.*)', welcome_msg)
name_re = re.search(r'M(?:me|lle)? (.*)', full_name_re.group(1))
profile.email = CleanText('//span[@id="fld8"]')(self.doc)
if name_re:
profile.name = name_re.group(1)
if full_name_re.group(4):
profile.company_name = full_name_re.group(4)
else:
profile.company_name = full_name_re.group(1)
profile.email = CleanText('//span[contains(text(), "@")]')(self.doc)
return profile
class TransactionDetailPage(LoggedPage, MyHTMLPage):
def get_reference(self):
return CleanText('//div[label[contains(text(), "Référence")]]//text()')(self.doc)
class LineboursePage(LoggedPage, HTMLPage):
pass