diff --git a/modules/banquepopulaire/browser.py b/modules/banquepopulaire/browser.py
index b4d5ae43008f5170f6fc360cd6c20f2c62cf03b1..f4a2e8a112b9e9d39f4a7d3a37debcc621236e70 100644
--- a/modules/banquepopulaire/browser.py
+++ b/modules/banquepopulaire/browser.py
@@ -18,10 +18,11 @@
# along with weboob. If not, see .
+from collections import OrderedDict
import urllib
-from weboob.deprecated.browser import Browser, BrowserIncorrectPassword, BrokenPageError
-
+from weboob.exceptions import BrowserIncorrectPassword
+from weboob.browser import LoginBrowser, URL, need_login
from weboob.capabilities.bank import Account
from weboob.capabilities.base import NotAvailable
@@ -34,62 +35,69 @@
__all__ = ['BanquePopulaire']
-class BanquePopulaire(Browser):
- PROTOCOL = 'https'
- ENCODING = 'iso-8859-15'
- PAGES = {'https://[^/]+/auth/UI/Login.*': LoginPage,
- 'https://[^/]+/cyber/internet/Login.do': IndexPage,
- 'https://[^/]+/cyber/internet/StartTask.do\?taskInfoOID=mesComptes.*': AccountsPage,
- 'https://[^/]+/cyber/internet/StartTask.do\?taskInfoOID=maSyntheseGratuite.*': AccountsPage,
- 'https://[^/]+/cyber/internet/StartTask.do\?taskInfoOID=accueilSynthese.*': AccountsPage,
- 'https://[^/]+/cyber/internet/StartTask.do\?taskInfoOID=equipementComplet.*': AccountsPage,
- 'https://[^/]+/cyber/internet/StartTask.do\?taskInfoOID=cyberIBAN.*': IbanPage,
- 'https://[^/]+/cyber/internet/ContinueTask.do\?.*dialogActionPerformed=EQUIPEMENT_COMPLET.*': AccountsFullPage,
- 'https://[^/]+/cyber/internet/ContinueTask.do\?.*dialogActionPerformed=VUE_COMPLETE.*': AccountsPage,
- 'https://[^/]+/cyber/internet/ContinueTask.do\?.*dialogActionPerformed=ENCOURS_COMPTE.*': CardsPage,
- 'https://[^/]+/cyber/internet/ContinueTask.do\?.*dialogActionPerformed=SELECTION_ENCOURS_CARTE.*': TransactionsPage,
- 'https://[^/]+/cyber/internet/ContinueTask.do\?.*dialogActionPerformed=SOLDE.*': TransactionsPage,
- 'https://[^/]+/cyber/internet/ContinueTask.do\?.*dialogActionPerformed=CONTRAT.*': TransactionsPage,
- 'https://[^/]+/cyber/internet/ContinueTask.do\?.*dialogActionPerformed=DETAIL_IBAN_RIB.*': IbanPage,
- 'https://[^/]+/cyber/internet/Page.do\?.*': TransactionsPage,
- 'https://[^/]+/cyber/internet/Sort.do\?.*': TransactionsPage,
- 'https://[^/]+/cyber/internet/ContinueTask.do': ErrorPage,
- 'https://[^/]+/s3f-web/.*': UnavailablePage,
- 'https://[^/]+/static/errors/nondispo.html': UnavailablePage,
- 'https://[^/]+/portailinternet/_layouts/Ibp.Cyi.Layouts/RedirectSegment.aspx.*': RedirectPage,
- 'https://[^/]+/portailinternet/Catalogue/Segments/.*.aspx(\?vary=(?P.*))?': HomePage,
- 'https://[^/]+/portailinternet/Pages/.*.aspx\?vary=(?P.*)': HomePage,
- 'https://[^/]+/portailinternet/Pages/default.aspx': HomePage,
- 'https://[^/]+/portailinternet/Transactionnel/Pages/CyberIntegrationPage.aspx': HomePage,
- 'https://[^/]+/WebSSO_BP/_(?P\d+)/index.html\?transactionID=(?P.*)': Login2Page,
- 'https://www.linebourse.fr/ReroutageSJR': LineboursePage,
- 'https://www.linebourse.fr/DetailMessage.*': MessagePage,
- 'https://www.linebourse.fr/Portefeuille': InvestmentLineboursePage,
- 'https://www.assurances.natixis.fr/espaceinternet-bp/views/common.*': NatixisPage,
- 'https://www.assurances.natixis.fr/espaceinternet-bp/views/contrat.*': InvestmentNatixisPage,
- 'https://www.assurances.natixis.fr/espaceinternet-bp/error-redirect.*': NatixisErrorPage,
- }
+class BrokenPageError(Exception):
+ pass
- def __init__(self, website, *args, **kwargs):
- self.DOMAIN = website
- self.token = None
- Browser.__init__(self, *args, **kwargs)
+class BanquePopulaire(LoginBrowser):
+ login_page = URL(r'https://[^/]+/auth/UI/Login.*', LoginPage)
+ index_page = URL(r'https://[^/]+/cyber/internet/Login.do', IndexPage)
+ accounts_page = URL(r'https://[^/]+/cyber/internet/StartTask.do\?taskInfoOID=mesComptes.*',
+ r'https://[^/]+/cyber/internet/StartTask.do\?taskInfoOID=maSyntheseGratuite.*',
+ r'https://[^/]+/cyber/internet/StartTask.do\?taskInfoOID=accueilSynthese.*',
+ r'https://[^/]+/cyber/internet/StartTask.do\?taskInfoOID=equipementComplet.*',
+ r'https://[^/]+/cyber/internet/ContinueTask.do\?.*dialogActionPerformed=VUE_COMPLETE.*',
+ AccountsPage)
+
+ iban_page = URL(r'https://[^/]+/cyber/internet/StartTask.do\?taskInfoOID=cyberIBAN.*',
+ r'https://[^/]+/cyber/internet/ContinueTask.do\?.*dialogActionPerformed=DETAIL_IBAN_RIB.*',
+ IbanPage)
+
+ accounts_full_page = URL(r'https://[^/]+/cyber/internet/ContinueTask.do\?.*dialogActionPerformed=EQUIPEMENT_COMPLET.*',
+ AccountsFullPage)
+
+ cards_page = URL(r'https://[^/]+/cyber/internet/ContinueTask.do\?.*dialogActionPerformed=ENCOURS_COMPTE.*', CardsPage)
+
+ transactions_page = URL(r'https://[^/]+/cyber/internet/ContinueTask.do\?.*dialogActionPerformed=SELECTION_ENCOURS_CARTE.*',
+ r'https://[^/]+/cyber/internet/ContinueTask.do\?.*dialogActionPerformed=SOLDE.*',
+ r'https://[^/]+/cyber/internet/ContinueTask.do\?.*dialogActionPerformed=CONTRAT.*',
+ r'https://[^/]+/cyber/internet/Page.do\?.*',
+ r'https://[^/]+/cyber/internet/Sort.do\?.*',
+ TransactionsPage)
- # XXX FUCKING HACK BECAUSE BANQUE POPULAIRE ARE FAGGOTS AND INCLUDE NULL
- # BYTES IN DOCUMENTS.
- def get_document(self, result, parser=None, encoding=None):
- from io import BytesIO
- buf = BytesIO(result.read().replace('\0', ''))
- return Browser.get_document(self, buf, parser, encoding)
+ error_page = URL(r'https://[^/]+/cyber/internet/ContinueTask.do', ErrorPage)
+ unavailable_page = URL(r'https://[^/]+/s3f-web/.*',
+ r'https://[^/]+/static/errors/nondispo.html', UnavailablePage)
+
+ redirect_page = URL(r'https://[^/]+/portailinternet/_layouts/Ibp.Cyi.Layouts/RedirectSegment.aspx.*', RedirectPage)
+ home_page = URL(r'https://[^/]+/portailinternet/Catalogue/Segments/.*.aspx(\?vary=(?P.*))?',
+ r'https://[^/]+/portailinternet/Pages/.*.aspx\?vary=(?P.*)',
+ r'https://[^/]+/portailinternet/Pages/default.aspx',
+ r'https://[^/]+/portailinternet/Transactionnel/Pages/CyberIntegrationPage.aspx',
+ HomePage)
+
+ login2_page = URL(r'https://[^/]+/WebSSO_BP/_(?P\d+)/index.html\?transactionID=(?P.*)', Login2Page)
+
+ # linebourse
+ linebourse_page = URL(r'https://www.linebourse.fr/ReroutageSJR', LineboursePage)
+ message_page = URL(r'https://www.linebourse.fr/DetailMessage.*', MessagePage)
+ invest_linebourse_page = URL(r'https://www.linebourse.fr/Portefeuille', InvestmentLineboursePage)
+
+ # natixis
+ natixis_page = URL(r'https://www.assurances.natixis.fr/espaceinternet-bp/views/common.*', NatixisPage)
+ invest_natixis_page = URL(r'https://www.assurances.natixis.fr/espaceinternet-bp/views/contrat.*', InvestmentNatixisPage)
+ natixis_error_page = URL(r'https://www.assurances.natixis.fr/espaceinternet-bp/error-redirect.*', NatixisErrorPage)
+
+ def __init__(self, website, *args, **kwargs):
+ self.BASEURL = 'https://%s' % website
+ self.token = None
- def is_logged(self):
- return not self.is_on_page(LoginPage)
+ super(BanquePopulaire, self).__init__(*args, **kwargs)
- def home(self):
- self.login()
+ #def home(self):
+ # self.do_login()
- def login(self):
+ def do_login(self):
"""
Attempt to log in.
Note: this method does nothing if we are already logged in.
@@ -97,35 +105,38 @@ def login(self):
assert isinstance(self.username, basestring)
assert isinstance(self.password, basestring)
- if not self.is_on_page(LoginPage):
- self.location('%s://%s' % (self.PROTOCOL, self.DOMAIN), no_login=True)
+ if not self.login_page.is_here():
+ self.location(self.BASEURL)
self.page.login(self.username, self.password)
- if not self.is_logged():
+ if self.login_page.is_here():
raise BrowserIncorrectPassword()
ACCOUNT_URLS = ['mesComptes', 'mesComptesPRO', 'maSyntheseGratuite', 'accueilSynthese', 'equipementComplet']
+ @need_login
def go_on_accounts_list(self):
for taskInfoOID in self.ACCOUNT_URLS:
- self.location(self.buildurl('/cyber/internet/StartTask.do', taskInfoOID=taskInfoOID, token=self.token))
+ data = OrderedDict([('taskInfoOID', taskInfoOID), ('token', self.token)])
+ self.location('/cyber/internet/StartTask.do?%s' % urllib.urlencode(data))
if not self.page.is_error():
if self.page.pop_up():
self.logger.debug('Popup displayed, retry')
- self.location(self.buildurl('/cyber/internet/StartTask.do', taskInfoOID=taskInfoOID, token=self.token))
+ data = OrderedDict([('taskInfoOID', taskInfoOID), ('token', self.token)])
+ self.location('/cyber/internet/StartTask.do?%s' % urllib.urlencode(data))
self.ACCOUNT_URLS = [taskInfoOID]
break
else:
raise BrokenPageError('Unable to go on the accounts list page')
if self.page.is_short_list():
- self.select_form(nr=0)
- self.set_all_readonly(False)
- self['dialogActionPerformed'] = 'EQUIPEMENT_COMPLET'
- self['token'] = self.page.build_token(self['token'])
- self.submit()
+ form = self.page.get_form(nr=0)
+ form['dialogActionPerformed'] = 'EQUIPEMENT_COMPLET'
+ form['token'] = self.page.build_token(form['token'])
+ form.submit()
+ @need_login
def get_accounts_list(self, get_iban=True):
# We have to parse account list in 2 different way depending if we want the iban number or not
# thanks to stateful website
@@ -143,17 +154,17 @@ def get_accounts_list(self, get_iban=True):
while len(next_pages) > 0:
next_page = next_pages.pop()
- if not self.is_on_page(AccountsFullPage):
+ if not self.accounts_full_page.is_here():
self.go_on_accounts_list()
# If there is an action needed to go to the "next page", do it.
if 'prevAction' in next_page:
params = self.page.get_params()
params['dialogActionPerformed'] = next_page.pop('prevAction')
params['token'] = self.page.build_token(self.token)
- self.location('/cyber/internet/ContinueTask.do', urllib.urlencode(params))
+ self.location('/cyber/internet/ContinueTask.do', data=params)
next_page['token'] = self.page.build_token(self.token)
- self.location('/cyber/internet/ContinueTask.do', urllib.urlencode(next_page))
+ self.location('/cyber/internet/ContinueTask.do', data=next_page)
for a in self.page.iter_accounts(next_pages):
if get_iban:
@@ -166,6 +177,7 @@ def get_accounts_list(self, get_iban=True):
a.iban = self.get_iban_number(a)
yield a
+ @need_login
def get_iban_number(self, account):
self.location('/cyber/internet/StartTask.do?taskInfoOID=cyberIBAN&token=%s' % self.page.build_token(self.token))
# Sometimes we can't choose an account
@@ -173,6 +185,7 @@ def get_iban_number(self, account):
return NotAvailable
return self.page.get_iban(account.id)
+ @need_login
def get_account(self, id):
assert isinstance(id, basestring)
@@ -182,6 +195,7 @@ def get_account(self, id):
return None
+ @need_login
def get_history(self, account, coming=False):
account = self.get_account(account.id)
@@ -195,20 +209,20 @@ def get_history(self, account, coming=False):
params['token'] = self.page.build_token(params['token'])
- self.location('/cyber/internet/ContinueTask.do', urllib.urlencode(params))
+ self.location('/cyber/internet/ContinueTask.do', data=params)
if not self.page or self.page.no_operations():
return
# Sort by values dates (see comment in TransactionsPage.get_history)
- if len(self.page.document.xpath('//a[@id="tcl4_srt"]')) > 0:
- self.select_form(predicate=lambda form: form.attrs.get('id', '') == 'myForm')
- self.form.action = self.absurl('/cyber/internet/Sort.do?property=tbl1&sortBlocId=blc2&columnName=dateValeur')
+ if len(self.page.doc.xpath('//a[@id="tcl4_srt"]')) > 0:
+ form = self.page.get_form(id='myForm')
+ form.url = self.absurl('/cyber/internet/Sort.do?property=tbl1&sortBlocId=blc2&columnName=dateValeur')
params['token'] = self.page.build_token(params['token'])
- self.submit()
+ form.submit()
while True:
- assert self.is_on_page(TransactionsPage)
+ assert self.transactions_page.is_here()
for tr in self.page.get_history(account, coming):
yield tr
@@ -217,8 +231,9 @@ def get_history(self, account, coming=False):
if next_params is None:
return
- self.location(self.buildurl('/cyber/internet/Page.do', **next_params))
+ self.location('/cyber/internet/Page.do?%s' % urllib.urlencode(next_params))
+ @need_login
def get_investment(self, account):
if not account._invest_params:
raise NotImplementedError()
@@ -226,22 +241,22 @@ def get_investment(self, account):
account = self.get_account(account.id)
params = account._invest_params
params['token'] = self.page.build_token(params['token'])
- self.location('/cyber/internet/ContinueTask.do', urllib.urlencode(params))
+ self.location('/cyber/internet/ContinueTask.do', data=params)
- if self.is_on_page(ErrorPage):
+ if self.error_page.is_here():
raise NotImplementedError()
url, params = self.page.get_investment_page_params()
if params:
- self.location(url, urllib.urlencode(params))
- if self.is_on_page(LineboursePage):
+ self.location(url, data=params)
+ if self.linebourse_page.is_here():
self.location('https://www.linebourse.fr/Portefeuille')
- while self.is_on_page(MessagePage):
+ while self.message_page.is_here():
self.page.skip()
self.location('https://www.linebourse.fr/Portefeuille')
- elif self.is_on_page(NatixisPage):
+ elif self.natixis_page.is_here():
self.page.submit_form()
- if self.is_on_page(NatixisErrorPage):
+ if self.natixis_error_page.is_here():
return iter([])
return self.page.get_investments()
diff --git a/modules/banquepopulaire/module.py b/modules/banquepopulaire/module.py
index 4286ef3b1f9c46361551a2d06024d4d77845216c..ea70cbb2628b2099144a746f2e57dc2ff2bbe7a2 100644
--- a/modules/banquepopulaire/module.py
+++ b/modules/banquepopulaire/module.py
@@ -71,26 +71,20 @@ def create_default_browser(self):
self.config['password'].get())
def iter_accounts(self):
- with self.browser:
- return self.browser.get_accounts_list()
+ return self.browser.get_accounts_list()
def get_account(self, _id):
- with self.browser:
- account = self.browser.get_account(_id)
-
+ account = self.browser.get_account(_id)
if account:
return account
else:
raise AccountNotFound()
def iter_history(self, account):
- with self.browser:
- return self.browser.get_history(account)
+ return self.browser.get_history(account)
def iter_coming(self, account):
- with self.browser:
- return self.browser.get_history(account, coming=True)
+ return self.browser.get_history(account, coming=True)
def iter_investment(self, account):
- with self.browser:
- return self.browser.get_investment(account)
+ return self.browser.get_investment(account)
diff --git a/modules/banquepopulaire/pages.py b/modules/banquepopulaire/pages.py
index 9b9e5b2dda81574db90ce05c6266df83aff0473c..f7da6dafe6b331ae8fcd18b5e3ccf8a256922e59 100644
--- a/modules/banquepopulaire/pages.py
+++ b/modules/banquepopulaire/pages.py
@@ -23,19 +23,23 @@
from decimal import Decimal
import re
import urllib
-from mechanize import Cookie, FormNotFoundError
-
from weboob.browser.filters.standard import CleanText
-from weboob.browser.filters.html import Attr
+from weboob.browser.filters.html import Attr, Link
from weboob.exceptions import BrowserUnavailable, BrowserIncorrectPassword
-from weboob.deprecated.browser import Page as _BasePage, BrokenPageError
+
+from weboob.browser.pages import HTMLPage, LoggedPage, FormNotFound
+
from weboob.capabilities.bank import Account, Investment
from weboob.capabilities import NotAvailable
from weboob.tools.capabilities.bank.transactions import FrenchTransaction
from weboob.tools.json import json
+class BrokenPageError(Exception):
+ pass
+
+
class WikipediaARC4(object):
def __init__(self, key=None):
self.state = range(256)
@@ -60,16 +64,17 @@ def crypt(self, input):
return ''.join(output)
-class BasePage(_BasePage):
+class BasePage(object):
+ ENCODING = 'iso-8859-15'
+
def get_token(self):
- try:
- token = self.parser.select(self.document.getroot(), '//form//input[@name="token"]', 1, 'xpath').attrib['value']
- except BrokenPageError:
- token = self.parser.select(self.document.getroot(), '//body', 1, 'xpath').attrib['onload']
+ token = Attr('//form//input[@name="token"]', 'value')(self.doc)
+ if not token:
+ token = Attr('//body', 'onload')(self.doc)
token = re.search(r"saveToken\('(.*?)'", token).group(1)
return token
- def on_loaded(self):
+ def on_load(self):
if not self.is_error():
self.browser.token = self.get_token()
self.logger.debug('Update token to %s', self.browser.token)
@@ -90,7 +95,7 @@ def build_token(self, token):
char at this position at the end of the token.
"""
table = None
- for script in self.document.xpath('//script'):
+ for script in self.doc.xpath('//script'):
if script.text is None:
continue
m = re.search(r'ipthk\(([^\)]+)\)', script.text, flags=re.MULTILINE)
@@ -105,13 +110,13 @@ def build_token(self, token):
def get_params(self):
params = {}
- for field in self.document.xpath('//input'):
+ for field in self.doc.xpath('//input'):
params[field.attrib['name']] = field.attrib.get('value', '')
return params
def get_button_actions(self):
actions = {}
- for script in self.document.xpath('//script'):
+ for script in self.doc.xpath('//script'):
if script.text is None:
continue
@@ -122,7 +127,15 @@ def get_button_actions(self):
return actions
-class RedirectPage(BasePage):
+class MyHTMLPage(BasePage, HTMLPage):
+ def build_doc(self, data, *args, **kwargs):
+ # XXX FUCKING HACK BECAUSE BANQUE POPULAIRE ARE FAGGOTS AND INCLUDE NULL
+ # BYTES IN DOCUMENTS.
+ data = data.replace(b'\x00', b'')
+ return super(MyHTMLPage, self).build_doc(data, *args, **kwargs)
+
+
+class RedirectPage(LoggedPage, MyHTMLPage):
"""
var i = 'lyhrnu551jo42yfzx0jm0sqk';
setCookie('i', i);
@@ -153,6 +166,7 @@ class RedirectPage(BasePage):
"""
def add_cookie(self, name, value):
+ """
c = Cookie(0, name, value,
None, False,
'.' + self.browser.DOMAIN, True, True,
@@ -165,12 +179,15 @@ def add_cookie(self, name, value):
{})
cookiejar = self.browser._ua_handlers["_cookies"].cookiejar
cookiejar.set_cookie(c)
+ """
+ self.browser.logger.debug('adding cookie %s=%s', name, value)
+ self.browser.session.cookies.set(name, value)
- def on_loaded(self):
+ def on_load(self):
redirect_url = None
args = {}
RC4 = None
- for script in self.document.xpath('//script'):
+ for script in self.doc.xpath('//script'):
if script.text is None:
continue
@@ -195,20 +212,23 @@ def on_loaded(self):
RC4 = WikipediaARC4(args['i'])
if redirect_url is not None:
- self.browser.location(self.browser.request_class(self.browser.absurl(redirect_url), None, {'Referer': self.url}))
+ url = self.browser.absurl(redirect_url)
+ headers = {'Referer': self.url}
+ self.browser.logger.debug('redir...')
+ self.browser.location(url, headers=headers)
try:
- self.browser.select_form(name="CyberIngtegrationPostForm")
- except FormNotFoundError:
+ form = self.get_form(name="CyberIngtegrationPostForm")
+ except FormNotFound:
pass
else:
- self.browser.submit(nologin=True)
+ form.submit()
-class ErrorPage(BasePage):
+class ErrorPage(LoggedPage, MyHTMLPage):
def get_token(self):
try:
- buf = self.document.xpath('//body/@onload')[0]
+ buf = self.doc.xpath('//body/@onload')[0]
except IndexError:
return
else:
@@ -216,95 +236,93 @@ def get_token(self):
if m:
return m.group(1)
-class UnavailablePage(BasePage):
- def on_loaded(self):
- try:
- a = self.document.xpath('//a[@class="btn"]')[0]
- except IndexError:
+
+class UnavailablePage(LoggedPage, MyHTMLPage):
+ def on_load(self):
+ a = Link('//a[@class="btn"][1]')(self.doc)
+ if not a:
raise BrowserUnavailable()
- else:
- self.browser.location(a.attrib['href'], nologin=True)
+ self.browser.location(a)
-class LoginPage(BasePage):
- def on_loaded(self):
- try:
- h1 = self.parser.select(self.document.getroot(), 'h1', 1)
- except BrokenPageError:
- pass
+class LoginPage(MyHTMLPage):
+ def on_load(self):
+ h1 = CleanText('//h1[1]')(self.doc)
- if h1.text is not None and h1.text.startswith('Le service est moment'):
- try:
- raise BrowserUnavailable(self.document.xpath('//h4')[0].text)
- except KeyError:
- raise BrowserUnavailable(h1.text)
+ if h1.startswith('Le service est moment'):
+ text = CleanText('//h4[1]')(self.doc) or h1
+ raise BrowserUnavailable(text)
def login(self, login, passwd):
- self.browser.select_form(name='Login')
- self.browser['IDToken1'] = login.encode(self.browser.ENCODING)
- self.browser['IDToken2'] = passwd.encode(self.browser.ENCODING)
- self.browser.submit(nologin=True)
+ form = self.get_form(name='Login')
+ form['IDToken1'] = login.encode(self.ENCODING)
+ form['IDToken2'] = passwd.encode(self.ENCODING)
+ form.submit()
class Login2Page(LoginPage):
@property
def request_url(self):
- transactionID = self.group_dict['transactionID']
+ transactionID = self.params['transactionID']
+ assert transactionID
return 'https://www.icgauth.banquepopulaire.fr/dacswebssoissuer/api/v1u0/transaction/%s' % transactionID
- def on_loaded(self):
- r = self.browser.openurl(self.request_url)
- doc = json.load(r)
+ def on_load(self):
+ r = self.browser.open(self.request_url)
+ doc = json.loads(r.content)
self.form_id = doc['step']['validationUnits'][0]['PASSWORD_LOOKUP'][0]['id']
def login(self, login, password):
payload = {'validate': {'PASSWORD_LOOKUP': [{'id': self.form_id,
- 'login': login.encode(self.browser.ENCODING).upper(),
- 'password': password.encode(self.browser.ENCODING),
+ 'login': login.encode(self.ENCODING).upper(),
+ 'password': password.encode(self.ENCODING),
'type': 'PASSWORD_LOOKUP'
}]
}
}
- req = self.browser.request_class(self.request_url + '/step')
- req.add_header('Content-Type', 'application/json')
- r = self.browser.openurl(req, json.dumps(payload))
- doc = json.load(r)
- self.logger.debug(doc)
+ url = self.request_url + '/step'
+ headers = {'Content-Type': 'application/json'}
+ r = self.browser.open(url, data=json.dumps(payload), headers=headers)
+
+ doc = json.loads(r.content)
+ self.logger.debug('doc = %s', doc)
if 'phase' in doc and doc['phase']['state'] == 'TERMS_OF_USE':
# Got:
# {u'phase': {u'state': u'TERMS_OF_USE'}, u'validationUnits': [{u'LIST_OF_TERMS': [{u'type': u'TERMS', u'id': u'b7f28f91-7aa0-48aa-8028-deec13ae341b', u'reference': u'CGU_CYBERPLUS'}]}]}
if 'reference' in doc['validationUnits'][0]:
del doc['validationUnits'][0]['reference']
payload = {'validate': doc['validationUnits'][0]}
- req = self.browser.request_class(self.request_url + '/step')
- req.add_header('Content-Type', 'application/json')
- r = self.browser.openurl(req, json.dumps(payload))
- doc = json.load(r)
- self.logger.debug(doc)
+
+ url = self.request_url + '/step'
+ headers = {'Content-Type': 'application/json'}
+ r = self.browser.open(url, data=json.dumps(payload), headers=headers)
+ doc = json.loads(r.content)
+ self.logger.debug('doc = %s', doc)
if ('phase' in doc and doc['phase']['previousResult'] == 'FAILED_AUTHENTICATION') or \
doc['response']['status'] != 'AUTHENTICATION_SUCCESS':
raise BrowserIncorrectPassword()
- self.browser.location(doc['response']['saml2_post']['action'], urllib.urlencode({'SAMLResponse': doc['response']['saml2_post']['samlResponse']}))
+ data = {'SAMLResponse': doc['response']['saml2_post']['samlResponse']}
+ self.browser.location(doc['response']['saml2_post']['action'], data=data)
-class IndexPage(BasePage):
+class IndexPage(LoggedPage, MyHTMLPage):
def get_token(self):
- url = self.document.getroot().xpath('//frame[@name="portalHeader"]')[0].attrib['src']
+ url = self.doc.xpath('//frame[@name="portalHeader"]')[0].attrib['src']
v = urlsplit(url)
args = dict(parse_qsl(v.query))
return args['token']
-class HomePage(BasePage):
+class HomePage(LoggedPage, MyHTMLPage):
def get_token(self):
vary = None
- if self.group_dict.get('vary', None) is not None:
- vary = self.group_dict['vary']
+ if self.params.get('vary', None) is not None:
+ vary = self.params['vary']
else:
- for script in self.document.xpath('//script'):
+ for script in self.doc.xpath('//script'):
if script.text is None:
continue
@@ -313,10 +331,16 @@ def get_token(self):
vary = m.group(1)
break
- r = self.browser.openurl(self.browser.request_class(self.browser.buildurl(self.browser.absurl('/portailinternet/Transactionnel/Pages/CyberIntegrationPage.aspx'), vary=vary), 'taskId=aUniversMesComptes', {'Referer': self.url}))
- if not int(r.info().get('Content-Length', '')):
- r = self.browser.openurl(self.browser.request_class(self.browser.buildurl(self.browser.absurl('/portailinternet/Transactionnel/Pages/CyberIntegrationPage.aspx')), 'taskId=aUniversMesComptes', {'Referer': self.url}))
- doc = self.browser.get_document(r)
+ url = self.browser.absurl('/portailinternet/Transactionnel/Pages/CyberIntegrationPage.aspx?%s' % urllib.urlencode({'vary': vary}))
+ headers = {'Referer': self.url}
+ r = self.browser.open(url, data='taskId=aUniversMesComptes', headers=headers)
+
+ if not int(r.headers.get('Content-Length', 0)):
+ url = self.browser.absurl('/portailinternet/Transactionnel/Pages/CyberIntegrationPage.aspx')
+ headers = {'Referer': self.url}
+ r = self.browser.open(url, data='taskId=aUniversMesComptes', headers=headers)
+
+ doc = r.page.doc
date = None
for script in doc.xpath('//script'):
if script.text is None:
@@ -326,13 +350,17 @@ def get_token(self):
if m:
date = m.group(1)
- r = self.browser.openurl(self.browser.request_class(self.browser.absurl('/cyber/ibp/ate/portal/integratedInternet.jsp'), 'session%%3Aate.lastConnectionDate=%s&taskId=aUniversMesComptes' % date, {'Referer': r.geturl()}))
- v = urlsplit(r.geturl())
+ url = self.browser.absurl('/cyber/ibp/ate/portal/integratedInternet.jsp')
+ data = 'session%%3Aate.lastConnectionDate=%s&taskId=aUniversMesComptes' % date
+ headers = {'Referer': r.url}
+ r = self.browser.open(url, data=data, headers=headers)
+
+ v = urlsplit(r.url)
args = dict(parse_qsl(v.query))
return args['token']
-class AccountsPage(BasePage):
+class AccountsPage(LoggedPage, MyHTMLPage):
ACCOUNT_TYPES = {u'Mes comptes d\'épargne': Account.TYPE_SAVINGS,
u'Mon épargne': Account.TYPE_SAVINGS,
u'Placements': Account.TYPE_SAVINGS,
@@ -355,7 +383,7 @@ class AccountsPage(BasePage):
(re.compile('^Fructi Pulse.*'), Account.TYPE_MARKET),
]
def is_error(self):
- for script in self.document.xpath('//script'):
+ for script in self.doc.xpath('//script'):
if script.text is not None and \
(u"Le service est momentanément indisponible" in script.text or
u"Votre abonnement ne vous permet pas d'accéder à ces services" in script.text):
@@ -364,12 +392,12 @@ def is_error(self):
return False
def pop_up(self):
- if self.document.xpath('//span[contains(text(), "du navigateur Internet.")]'):
+ if self.doc.xpath('//span[contains(text(), "du navigateur Internet.")]'):
return True
return False
def is_short_list(self):
- return len(self.document.xpath('//script[contains(text(), "EQUIPEMENT_COMPLET")]')) > 0
+ return len(self.doc.xpath('//script[contains(text(), "EQUIPEMENT_COMPLET")]')) > 0
COL_NUMBER = 0
COL_TYPE = 1
@@ -383,7 +411,7 @@ def iter_accounts(self, next_pages):
params = self.get_params()
actions = self.get_button_actions()
- for div in self.document.getroot().cssselect('div.btit'):
+ for div in self.doc.getroot().cssselect('div.btit'):
if div.text in (None, u'Synthèse'):
continue
account_type = self.ACCOUNT_TYPES.get(div.text.strip(), Account.TYPE_UNKNOWN)
@@ -466,19 +494,19 @@ def iter_accounts(self, next_pages):
yield account
# Needed to preserve navigation.
- btn = self.document.xpath('.//button/span[text()="Retour"]')
+ btn = self.doc.xpath('.//button/span[text()="Retour"]')
if len(btn) > 0:
btn = btn[0].getparent()
_params = params.copy()
_params.update(actions[btn.attrib['id']])
- self.browser.openurl('/cyber/internet/ContinueTask.do', urllib.urlencode(_params))
+ self.browser.open('/cyber/internet/ContinueTask.do', data=_params)
class AccountsFullPage(AccountsPage):
pass
-class CardsPage(BasePage):
+class CardsPage(LoggedPage, MyHTMLPage):
COL_ID = 0
COL_TYPE = 1
COL_LABEL = 2
@@ -490,15 +518,15 @@ def iter_accounts(self, next_pages):
account = None
currency = None
- for th in self.document.xpath('//table[@id="TabCtes"]//thead//th'):
+ for th in self.doc.xpath('//table[@id="TabCtes"]//thead//th'):
m = re.match('.*\((\w+)\)$', th.text)
if m and currency is None:
currency = Account.get_currency(m.group(1))
- for tr in self.document.xpath('//table[@id="TabCtes"]/tbody/tr'):
+ for tr in self.doc.xpath('//table[@id="TabCtes"]/tbody/tr'):
cols = tr.xpath('./td')
- id = self.parser.tocleanstring(cols[self.COL_ID])
+ id = CleanText(None).filter(cols[self.COL_ID])
if len(id) > 0:
if account is not None:
yield account
@@ -508,8 +536,8 @@ def iter_accounts(self, next_pages):
account.balance = account.coming = Decimal('0')
account._next_debit = datetime.date.today()
account._prev_debit = datetime.date(2000,1,1)
- account.label = u' '.join([self.parser.tocleanstring(cols[self.COL_TYPE]),
- self.parser.tocleanstring(cols[self.COL_LABEL])])
+ account.label = u' '.join([CleanText(None).filter(cols[self.COL_TYPE]),
+ CleanText(None).filter(cols[self.COL_LABEL])])
account.currency = currency
account._params = None
account._invest_params = None
@@ -523,7 +551,7 @@ def iter_accounts(self, next_pages):
account._params['dialogActionPerformed'] = 'SELECTION_ENCOURS_CARTE'
account._params['attribute($SEL_$%s)' % tr.attrib['id'].split('_')[0]] = tr.attrib['id'].split('_', 1)[1]
- date_col = self.parser.tocleanstring(cols[self.COL_DATE])
+ date_col = CleanText(None).filter(cols[self.COL_DATE])
m = re.search('(\d+)/(\d+)/(\d+)', date_col)
if not m:
self.logger.warning('Unable to parse date %r' % date_col)
@@ -533,7 +561,7 @@ def iter_accounts(self, next_pages):
if date.year < 100:
date = date.replace(year=date.year+2000)
- amount = Decimal(FrenchTransaction.clean_amount(self.parser.tocleanstring(cols[self.COL_AMOUNT])))
+ amount = Decimal(FrenchTransaction.clean_amount(CleanText(None).filter(cols[self.COL_AMOUNT])))
if not date_col.endswith('(1)'):
# debited
@@ -547,13 +575,13 @@ def iter_accounts(self, next_pages):
yield account
# Needed to preserve navigation.
- btn = self.document.xpath('.//button/span[text()="Retour"]')
+ btn = self.doc.xpath('.//button/span[text()="Retour"]')
if len(btn) > 0:
btn = btn[0].getparent()
actions = self.get_button_actions()
_params = params.copy()
_params.update(actions[btn.attrib['id']])
- self.browser.openurl('/cyber/internet/ContinueTask.do', urllib.urlencode(_params))
+ self.browser.open('/cyber/internet/ContinueTask.do', data=_params)
class Transaction(FrenchTransaction):
@@ -584,14 +612,14 @@ class Transaction(FrenchTransaction):
]
-class TransactionsPage(BasePage):
+class TransactionsPage(LoggedPage, MyHTMLPage):
def get_next_params(self):
- nxt = self.document.xpath('//li[contains(@id, "_nxt")]')
+ nxt = self.doc.xpath('//li[contains(@id, "_nxt")]')
if len(nxt) == 0 or nxt[0].attrib.get('class', '') == 'nxt-dis':
return None
params = {}
- for field in self.document.xpath('//input'):
+ for field in self.doc.xpath('//input'):
params[field.attrib['name']] = field.attrib.get('value', '')
params['validationStrategy'] = 'NV'
@@ -601,9 +629,9 @@ def get_next_params(self):
return params
def get_history(self, account, coming):
- if len(self.document.xpath('//table[@id="tbl1"]')) > 0:
+ if len(self.doc.xpath('//table[@id="tbl1"]')) > 0:
return self.get_account_history()
- if len(self.document.xpath('//table[@id="TabFact"]')) > 0:
+ if len(self.doc.xpath('//table[@id="TabFact"]')) > 0:
return self.get_card_history(account, coming)
raise NotImplementedError('Unable to find what kind of history it is.')
@@ -617,7 +645,7 @@ def get_history(self, account, coming):
COL_CREDIT = -1
def get_account_history(self):
- for tr in self.document.xpath('//table[@id="tbl1"]/tbody/tr'):
+ for tr in self.doc.xpath('//table[@id="tbl1"]/tbody/tr'):
tds = tr.findall('td')
if len(tds) < 5:
@@ -629,11 +657,12 @@ def get_account_history(self):
# necessary to use the *operation* one.
# Default sort on website is by compta date, so in browser.py we
# change the sort on value date.
- date = self.parser.tocleanstring(tds[self.COL_OP_DATE])
- vdate = self.parser.tocleanstring(tds[self.COL_VALUE_DATE])
- raw = self.parser.tocleanstring(tds[self.COL_LABEL])
- debit = self.parser.tocleanstring(tds[self.COL_DEBIT])
- credit = self.parser.tocleanstring(tds[self.COL_CREDIT])
+ cleaner = CleanText(None).filter
+ date = cleaner(tds[self.COL_OP_DATE])
+ vdate = cleaner(tds[self.COL_VALUE_DATE])
+ raw = cleaner(tds[self.COL_LABEL])
+ debit = cleaner(tds[self.COL_DEBIT])
+ credit = cleaner(tds[self.COL_CREDIT])
t.parse(date, re.sub(r'[ ]+', ' ', raw), vdate)
t.set_amount(credit, debit)
@@ -644,11 +673,11 @@ def get_account_history(self):
# XXX Fucking hack to include the check number not displayed in the full label.
if re.match("^CHEQUE |^CHQ VOTRE CHEQUE", t.label):
- t.raw = '%s No: %s' % (t.raw, self.parser.tocleanstring(tds[self.COL_REF]))
+ t.raw = '%s No: %s' % (t.raw, cleaner(tds[self.COL_REF]))
# In rare cases, label is empty ..
if not t.label:
- t.label = self.parser.tocleanstring(tds[self.COL_REF])
+ t.label = cleaner(tds[self.COL_REF])
yield t
@@ -669,12 +698,12 @@ def get_card_history(self, account, coming):
t.amount = -account._prev_balance
yield t
- currency = Account.get_currency(self.document\
+ currency = Account.get_currency(self.doc\
.xpath('//table[@id="TabFact"]/thead//th')[self.COL_CARD_AMOUNT]\
.text\
.replace('(', ' ')\
.replace(')', ' '))
- for i, tr in enumerate(self.document.xpath('//table[@id="TabFact"]/tbody/tr')):
+ for i, tr in enumerate(self.doc.xpath('//table[@id="TabFact"]/tbody/tr')):
tds = tr.findall('td')
if len(tds) < 3:
@@ -682,9 +711,10 @@ def get_card_history(self, account, coming):
t = Transaction()
- date = self.parser.tocleanstring(tds[self.COL_CARD_DATE])
- label = self.parser.tocleanstring(tds[self.COL_CARD_LABEL])
- amount = '-' + self.parser.tocleanstring(tds[self.COL_CARD_AMOUNT])
+ cleaner = CleanText(None).filter
+ date = cleaner(tds[self.COL_CARD_DATE])
+ label = cleaner(tds[self.COL_CARD_LABEL])
+ amount = '-' + cleaner(tds[self.COL_CARD_AMOUNT])
t.parse(debit_date, re.sub(r'[ ]+', ' ', label))
t.set_amount(amount)
@@ -693,15 +723,15 @@ def get_card_history(self, account, coming):
yield t
def no_operations(self):
- if len(self.document.xpath('//table[@id="tbl1" or @id="TabFact"]//td[@colspan]')) > 0:
+ if len(self.doc.xpath('//table[@id="tbl1" or @id="TabFact"]//td[@colspan]')) > 0:
return True
- if len(self.document.xpath(u'//div[contains(text(), "Accès à LineBourse")]')) > 0:
+ if len(self.doc.xpath(u'//div[contains(text(), "Accès à LineBourse")]')) > 0:
return True
return False
def get_investment_page_params(self):
- script = self.document.xpath('//body')[0].attrib['onload']
+ script = self.doc.xpath('//body')[0].attrib['onload']
url = None
m = re.search(r"','(.+?)',\[", script, re.MULTILINE)
if m:
@@ -712,11 +742,11 @@ def get_investment_page_params(self):
return url, params if url and params else None
-class LineboursePage(_BasePage):
+class LineboursePage(LoggedPage, HTMLPage):
pass
-class InvestmentLineboursePage(_BasePage):
+class InvestmentLineboursePage(LoggedPage, HTMLPage):
COL_LABEL = 0
COL_QUANTITY = 1
COL_UNITVALUE = 2
@@ -724,14 +754,17 @@ class InvestmentLineboursePage(_BasePage):
COL_UNITPRICE = 4
COL_PERF_PERCENT = 5
COL_PERF = 6
+
def get_investments(self):
- for line in self.document.xpath('//table[contains(@summary, "Contenu")]/tbody/tr[@class="color4"]'):
+ for line in self.doc.xpath('//table[contains(@summary, "Contenu")]/tbody/tr[@class="color4"]'):
cols1 = line.findall('td')
cols2 = line.xpath('./following-sibling::tr')[0].findall('td')
+ cleaner = CleanText(None).filter
+
inv = Investment()
- inv.label = self.parser.tocleanstring(cols1[self.COL_LABEL].xpath('.//span')[0])
- inv.code = self.parser.tocleanstring(cols1[self.COL_LABEL].xpath('./a')[0]).split(' ')[-1]
+ inv.label = cleaner(cols1[self.COL_LABEL].xpath('.//span')[0])
+ inv.code = cleaner(cols1[self.COL_LABEL].xpath('./a')[0]).split(' ')[-1]
inv.quantity = self.parse_decimal(cols2[self.COL_QUANTITY])
inv.unitprice = self.parse_decimal(cols2[self.COL_UNITPRICE])
inv.unitvalue = self.parse_decimal(cols2[self.COL_UNITVALUE])
@@ -741,33 +774,34 @@ def get_investments(self):
yield inv
def parse_decimal(self, string):
- value = self.parser.tocleanstring(string)
+ value = CleanText(None).filter(string)
if value == '':
return NotAvailable
return Decimal(Transaction.clean_amount(value))
-class NatixisPage(_BasePage):
+class NatixisPage(LoggedPage, HTMLPage):
def submit_form(self):
- self.browser.select_form(name="formRoutage")
- self.browser.submit(nologin=True)
+ form = self.get_form(name="formRoutage")
+ form.submit()
-class NatixisErrorPage(_BasePage):
+class NatixisErrorPage(LoggedPage, HTMLPage):
pass
-class InvestmentNatixisPage(_BasePage):
+class InvestmentNatixisPage(LoggedPage, HTMLPage):
COL_LABEL = 0
COL_QUANTITY = 2
COL_UNITVALUE = 3
COL_VALUATION = 4
+
def get_investments(self):
- for line in self.document.xpath('//div[@class="row-fluid table-contrat-supports"]/table/tbody[(@class)]/tr'):
+ for line in self.doc.xpath('//div[@class="row-fluid table-contrat-supports"]/table/tbody[(@class)]/tr'):
cols = line.findall('td')
inv = Investment()
- inv.label = self.parser.tocleanstring(cols[self.COL_LABEL]).replace('Cas sans risque ', '')
+ inv.label = CleanText(None).filter(cols[self.COL_LABEL]).replace('Cas sans risque ', '')
inv.quantity = self.parse_decimal(cols[self.COL_QUANTITY])
inv.unitvalue = self.parse_decimal(cols[self.COL_UNITVALUE])
inv.valuation = self.parse_decimal(cols[self.COL_VALUATION])
@@ -775,42 +809,41 @@ def get_investments(self):
yield inv
def parse_decimal(self, string):
- value = self.parser.tocleanstring(string).replace('Si famille fonds generaux, on affiche un tiret', '').replace('Cas sans risque', '').replace(' ', '')
+ value = CleanText(None).filter(string).replace('Si famille fonds generaux, on affiche un tiret', '').replace('Cas sans risque', '').replace(' ', '')
if value == '-':
return NotAvailable
return Decimal(Transaction.clean_amount(value))
-class MessagePage(_BasePage):
+class MessagePage(LoggedPage, HTMLPage):
def skip(self):
try:
- self.browser.select_form(name="leForm")
- except FormNotFoundError:
+ form = self.get_form(name="leForm")
+ except FormNotFound:
pass
else:
- self.browser.submit(nologin=True)
+ form.submit()
-class IbanPage(BasePage):
+class IbanPage(LoggedPage, MyHTMLPage):
def need_to_go(self):
- return len(self.document.xpath('//div[@class="grid"]/div/span[contains(text(), "IBAN")]')) == 0
+ return len(self.doc.xpath('//div[@class="grid"]/div/span[contains(text(), "IBAN")]')) == 0
def go_iban(self, account):
- for tr in self.document.xpath('//table[@id]/tbody/tr'):
+ for tr in self.doc.xpath('//table[@id]/tbody/tr'):
if account.type not in (Account.TYPE_LOAN, Account.TYPE_MARKET) and CleanText().filter(tr.xpath('./td[1]')) in account.id:
- self.browser.select_form(predicate=lambda form: form.attrs.get('id', '') == 'myForm')
- self.browser.set_all_readonly(False)
- self.browser['token'] = self.build_token(self.browser['token'])
- self.browser['dialogActionPerformed'] = "DETAIL_IBAN_RIB"
+ form = self.get_form(id='myForm')
+ form['token'] = self.build_token(form['token'])
+ form['dialogActionPerformed'] = "DETAIL_IBAN_RIB"
tr_id = Attr(None, 'id').filter(tr.xpath('.')).split('_')
- self.browser[u'attribute($SEL_$%s)' % tr_id[0]] = tr_id[1]
- self.browser.submit()
+ form[u'attribute($SEL_$%s)' % tr_id[0]] = tr_id[1]
+ form.submit()
return True
return False
def get_iban(self, acc_id):
iban_class = None
- for div in self.document.xpath('//div[@class="grid"]/div'):
+ for div in self.doc.xpath('//div[@class="grid"]/div'):
if not iban_class and "IBAN" in CleanText().filter(div.xpath('./span')):
iban_class = Attr(None, 'class').filter(div.xpath('.'))
elif iban_class is not None and iban_class == Attr(None, 'class').filter(div.xpath('.')):