Newer
Older
# -*- coding: utf-8 -*-
# Copyright(C) 2012 Romain Bignon
#
# This file is part of a weboob module.
# This weboob module is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This weboob module is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this weboob module. If not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
from hashlib import sha256
from uuid import uuid4
from collections import OrderedDict
from decimal import Decimal
from dateutil import parser
from weboob.browser import LoginBrowser, need_login, StatesMixin
from weboob.browser.switch import SiteSwitch
from weboob.capabilities.bank import (
Account, AddRecipientStep, Recipient, TransferBankError, Transaction, TransferStep,
Guillaume Risbourg
committed
AddRecipientBankError,
from weboob.capabilities.base import NotAvailable, find_object
from weboob.capabilities.bill import Subscription
from weboob.capabilities.profile import Profile
from weboob.browser.exceptions import BrowserHTTPNotFound, ClientError, ServerError
from weboob.exceptions import (
BrowserIncorrectPassword, BrowserUnavailable, BrowserHTTPError, BrowserPasswordExpired,
AuthMethodNotImplemented, AppValidation, AppValidationExpired,
)
from weboob.tools.capabilities.bank.transactions import (
sorted_transactions, FrenchTransaction, keep_only_card_transactions,
omit_deferred_transactions,
)
Quentin Defenouillere
committed
from weboob.tools.capabilities.bank.investments import create_french_liquidity
from weboob.tools.compat import urljoin, urlparse, parse_qsl, parse_qs, urlencode, urlunparse
from weboob.tools.json import json
from weboob.tools.value import Value
from weboob.tools.decorators import retry
IndexPage, ErrorPage, MarketPage, LifeInsurance, LifeInsuranceHistory, LifeInsuranceInvestments,
GarbagePage, MessagePage, LoginPage,
TransferPage, ProTransferPage, TransferConfirmPage, TransferSummaryPage, ProTransferConfirmPage,
ProTransferSummaryPage, ProAddRecipientOtpPage, ProAddRecipientPage,
SmsPage, ValidationPageOption, AuthentPage, RecipientPage, CanceledAuth,
CaissedepargneKeyboard, CaissedepargneNewKeyboard,
TransactionsDetailsPage, LoadingPage, ConsLoanPage, MeasurePage,
NatixisLIHis, NatixisLIInv, NatixisRedirectPage,
SubscriptionPage, CreditCooperatifMarketPage, UnavailablePage,
CardsPage, CardsComingPage, CardsOldWebsitePage, TransactionPopupPage,
OldLeviesPage, NewLeviesPage, NewLoginPage, JsFilePage, AuthorizePage,
AuthenticationMethodPage, VkImagePage, AuthenticationStepPage, LoginTokensPage,
AppValidationPage,
from .transfer_pages import CheckingPage, TransferListPage
Quentin Defenouillere
committed
from .linebourse_browser import LinebourseAPIBrowser
__all__ = ['CaisseEpargne']
def decode_utf8_cookie(data):
# caissedepargne/palatine cookies may contain non-ascii bytes which is ill-defined.
# Actually, they use utf-8.
# Since it's not standard, requests/urllib interprets it freely... as latin-1
# and we can't really blame for that.
# Let's decode this shit ourselves.
if sys.version_info.major == 2 and isinstance(data, bytes):
# on top of that, sometimes the cookie is already unicode
# which part does this? urllib? requests?
# who knows, in the end we have to avoid puking despite the stench
return data.decode('utf-8')
else:
return data.encode('latin-1').decode('utf-8')
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def monkeypatch_for_lowercase_percent(session):
# In the transfer flow, the main site (something like net123.caisse-epargne.fr)
# redirects to the OTP site (something like www.icgauth.caisse-epargne.fr).
# %2F is equivalent to %2f, right? It's hexadecimal after all. That's what
# RFC3986, RFC2396, RFC1630 say, also normalization of case is possible.
# That's what requests and urllib3 implement.
# But some dumbasses think otherwise and simply violate the RFCs.
# They SHOULD [interpreted as described in RFC2119] step away from the computer
# and never touch it again because they are obviously too stupid to use it.
# So, we are forced to hack deep in urllib3 to force our custom URL tweaking.
def patch_attr(obj, attr, func):
if hasattr(obj, '_old_%s' % attr):
return
old_func = getattr(obj, attr)
setattr(obj, '_old_%s' % attr, old_func)
setattr(obj, attr, func)
pm = session.adapters['https://'].poolmanager
def connection_from_host(*args, **kwargs):
pool = pm._old_connection_from_host(*args, **kwargs)
def make_request(conn, method, url, *args, **kwargs):
if url.startswith('/dacswebssoissuer/AuthnRequestServlet'):
# restrict this hazardous change to otp urls
url = re.sub(r'%[0-9A-F]{2}', lambda m: m.group(0).lower(), url)
return pool._old__make_request(conn, method, url, *args, **kwargs)
patch_attr(pool, '_make_request', make_request)
return pool
patch_attr(pm, 'connection_from_host', connection_from_host)
class CaisseEpargne(LoginBrowser, StatesMixin):
BASEURL = "https://www.caisse-epargne.fr"
HISTORY_MAX_PAGE = 200
Quentin Defenouillere
committed
LINEBOURSE_BROWSER = LinebourseAPIBrowser
login = URL(
r'/authentification/manage\?step=identification&identifiant=(?P<login>.*)',
r'https://.*/login.aspx',
LoginPage
)
new_login = URL(r'/se-connecter/sso', NewLoginPage)
js_file = URL(r'/se-connecter/main-.*.js$', JsFilePage)
authorize = URL(r'https://www.as-ex-ath-groupe.caisse-epargne.fr/api/oauth/v2/authorize', AuthorizePage)
login_tokens = URL(r'https://www.as-ex-ath-groupe.caisse-epargne.fr/api/oauth/v2/consume', LoginTokensPage)
# Login and transfer authentication
authentication_step = URL(
r'https://(?P<domain>www.icgauth.[^/]+)/dacsrest/api/v1u0/transaction/(?P<validation_id>[^/]+)/step',
AuthenticationStepPage
)
authentication_method_page = URL(
r'https://(?P<domain>www.icgauth.[^/]+)/dacsrest/api/v1u0/transaction/(?P<validation_id>)',
r'https://www.icgauth.caisse-epargne.fr/dacsrest/api/v1u0/transaction/.*',
AuthenticationMethodPage,
vk_image = URL(
r'https://(?P<domain>www.icgauth.[^/]+)/dacs-rest-media/api/v1u0/medias/mappings/[a-z0-9-]+/images',
VkImagePage,
)
# eg of both possible regexes:
# https://www.icgauth.caisse-epargne.fr/dacstemplate-SOL/index.html?transactionID=CtxDACSP[a-f0-9]+
# https://www.icgauth.caisse-epargne.fr/dacstemplate-SOL/_12579/index.html?transactionID=CtxDACSP[a-f0-9]+
validation_option = URL(
r'https://(?P<domain>www.icgauth.[^/]+)/dacstemplate-SOL/(?:[^/]+/)?index.html\?transactionID=.*',
ValidationPageOption
)
sms = URL(r'https://(?P<domain>www.icgauth.[^/]+)/dacswebssoissuer/AuthnRequestServlet', SmsPage)
app_validation = URL(r'https://(?P<domain>www.icgauth.[^/]+)/dacsrest/WaitingCallbackHandler', AppValidationPage)
account_login = URL(
r'/authentification/manage\?step=account&identifiant=(?P<login>.*)&account=(?P<accountType>.*)',
LoginPage
)
loading = URL(r'https://.*/CreditConso/ReroutageCreditConso.aspx', LoadingPage)
cons_loan = URL(
r'https://www.credit-conso-cr.caisse-epargne.fr/websavcr-web/rest/contrat/getContrat\?datePourIe=(?P<datepourie>)',
ConsLoanPage
)
transaction_detail = URL(r'https://.*/Portail.aspx.*', TransactionsDetailsPage)
recipient = URL(r'https://.*/Portail.aspx.*', RecipientPage)
checking = URL(r'https://.*/Portail.aspx.*', CheckingPage)
transfer_list = URL(r'https://.*/Portail.aspx.*', TransferListPage)
transfer = URL(r'https://.*/Portail.aspx.*', TransferPage)
transfer_summary = URL(r'https://.*/Portail.aspx.*', TransferSummaryPage)
transfer_confirm = URL(r'https://.*/Portail.aspx.*', TransferConfirmPage)
pro_transfer = URL(r'https://.*/Portail.aspx.*', ProTransferPage)
pro_transfer_confirm = URL(r'https://.*/Portail.aspx.*', ProTransferConfirmPage)
pro_transfer_summary = URL(r'https://.*/Portail.aspx.*', ProTransferSummaryPage)
pro_add_recipient_otp = URL(r'https://.*/Portail.aspx.*', ProAddRecipientOtpPage)
pro_add_recipient = URL(r'https://.*/Portail.aspx.*', ProAddRecipientPage)
measure_page = URL(r'https://.*/Portail.aspx.*', MeasurePage)
cards_old = URL(r'https://.*/Portail.aspx.*', CardsOldWebsitePage)
cards = URL(r'https://.*/Portail.aspx.*', CardsPage)
cards_coming = URL(r'https://.*/Portail.aspx.*', CardsComingPage)
old_checkings_levies = URL(r'https://.*/Portail.aspx.*', OldLeviesPage)
new_checkings_levies = URL(r'https://.*/Portail.aspx.*', NewLeviesPage)
authent = URL(r'https://.*/Portail.aspx.*', AuthentPage)
subscription = URL(r'https://.*/Portail.aspx\?tache=(?P<tache>).*', SubscriptionPage)
transaction_popup = URL(r'https://.*/Portail.aspx.*', TransactionPopupPage)
home = URL(r'https://.*/Portail.aspx.*', IndexPage)
home_tache = URL(r'https://.*/Portail.aspx\?tache=(?P<tache>).*', IndexPage)
error = URL(
r'https://.*/login.aspx',
r'https://.*/Pages/logout.aspx.*',
r'https://.*/particuliers/Page_erreur_technique.aspx.*',
ErrorPage
)
market = URL(
r'https://.*/Pages/Bourse.*',
r'https://www.caisse-epargne.offrebourse.com/ReroutageSJR',
r'https://www.caisse-epargne.offrebourse.com/fr/6CE.*',
MarketPage
)
unavailable_page = URL(r'https://www.caisse-epargne.fr/.*/au-quotidien', UnavailablePage)
creditcooperatif_market = URL(r'https://www.offrebourse.com/.*', CreditCooperatifMarketPage) # just to catch the landing page of the Credit Cooperatif's Linebourse
natixis_redirect = URL(
r'/NaAssuranceRedirect/NaAssuranceRedirect.aspx',
r'https://www.espace-assurances.caisse-epargne.fr/espaceinternet-ce/views/common/routage-itce.xhtml\?windowId=automatedEntryPoint',
NatixisRedirectPage
)
life_insurance_history = URL(
r'https://www.extranet2.caisse-epargne.fr/cin-front/contrats/evenements',
LifeInsuranceHistory
)
life_insurance_investments = URL(
r'https://www.extranet2.caisse-epargne.fr/cin-front/contrats/details',
LifeInsuranceInvestments
)
life_insurance = URL(
r'https://.*/Assurance/Pages/Assurance.aspx',
r'https://www.extranet2.caisse-epargne.fr.*',
LifeInsurance
)
natixis_life_ins_his = URL(
r'https://www.espace-assurances.caisse-epargne.fr/espaceinternet-ce/rest/v2/contratVie/load-operation/(?P<id1>\w+)/(?P<id2>\w+)/(?P<id3>)',
NatixisLIHis
)
natixis_life_ins_inv = URL(
r'https://www.espace-assurances.caisse-epargne.fr/espaceinternet-ce/rest/v2/contratVie/load/(?P<id1>\w+)/(?P<id2>\w+)/(?P<id3>)',
NatixisLIInv
)
Quentin Defenouillere
committed
message = URL(r'https://www.caisse-epargne.offrebourse.com/DetailMessage\?refresh=O', MessagePage)
garbage = URL(
r'https://www.caisse-epargne.offrebourse.com/Portefeuille',
r'https://www.caisse-epargne.fr/particuliers/.*/emprunter.aspx',
r'https://.*/particuliers/emprunter.*',
r'https://.*/particuliers/epargner.*',
GarbagePage
)
Quentin Defenouillere
committed
__states__ = (
'BASEURL', 'multi_type', 'typeAccount', 'is_cenet_website', 'recipient_form',
'is_send_sms', 'is_app_validation', 'otp_validation',
# Accounts managed in life insurance space (not in linebourse)
insurance_accounts = (
'AIKIDO',
'ASSURECUREUIL',
'ECUREUIL PROJET',
'GARANTIE RETRAITE EU',
'INITIATIVES PLUS',
'INITIATIVES TRANSMIS',
'LIVRET ASSURANCE VIE',
'OCEOR EVOLUTION',
'PATRIMONIO CRESCENTE',
'PEP TRANSMISSION',
'PERP',
'PERSPECTIVES ECUREUI',
'POINTS RETRAITE ECUR',
'RICOCHET',
'SOLUTION PERP',
'TENDANCES',
'YOGA',
)
def __init__(self, nuser, *args, **kwargs):
self.BASEURL = kwargs.pop('domain', self.BASEURL)
if not self.BASEURL.startswith('https://'):
self.BASEURL = 'https://%s' % self.BASEURL
self.new_website = True
self.multi_type = False
self.accounts = None
self.loans = None
self.typeAccount = None
self.inexttype = 0 # keep track of index in the connection type's list
self.is_send_sms = None
self.otp_validation = None
self.weboob = kwargs['weboob']
self.market_url = kwargs.pop(
'market_url',
'https://www.caisse-epargne.offrebourse.com',
)
self.has_subscription = True
super(CaisseEpargne, self).__init__(*args, **kwargs)
dirname = self.responses_dirname
if dirname:
dirname += '/bourse'
self.linebourse = self.LINEBOURSE_BROWSER(
self.market_url,
logger=self.logger,
responses_dirname=dirname,
weboob=self.weboob,
proxy=self.PROXIES,
)
monkeypatch_for_lowercase_percent(self.session)
def deleteCTX(self):
# For connection to offrebourse and natixis, we need to delete duplicate of CTX cookie
if len([k for k in self.session.cookies.keys() if k == 'CTX']) > 1:
del self.session.cookies['CTX']
if state.get('expire') and parser.parse(state['expire']) < datetime.datetime.now():
return self.logger.info('State expired, not reloading it from storage')
transfer_states = ('recipient_form', 'is_app_validation', 'is_send_sms', 'otp_validation')
for transfer_state in transfer_states:
if transfer_state in state and state[transfer_state] is not None:
super(CaisseEpargne, self).load_state(state)
self.logged = True
break
def locate_browser(self, state):
# in case of transfer/add recipient, we shouldn't go back to previous page
# site will crash else
pass
Damien Mat Jedrzejewski
committed
data = self.get_connection_data()
accounts_types = data.get('account')
if data.get('authMode', '') == 'redirect': # the connection type EU could also be used as a criteria
raise SiteSwitch('cenet')
type_account = data['account'][0]
if self.multi_type:
assert type_account == self.typeAccount
if 'keyboard' in data:
self.do_old_login(data, type_account, accounts_types)
else:
# New virtual keyboard
self.do_new_login(data)
def get_connection_data(self):
"""
Attempt to log in.
Note: this method does nothing if we are already logged in.
"""
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
# Among the parameters used during the login step, there is
# a connection type (called typeAccount) that can take the
# following values:
# WE: espace particulier
# WP: espace pro
# WM: personnes protégées
# EU: Cenet
#
# A connection can have one connection type as well as many of
# them. There is an issue when there is many connection types:
# the connection type to use can't be guessed in advance, we
# have to test all of them until the login step is successful
# (sometimes all connection type can be used for the login, sometimes
# only one will work).
#
# For simplicity's sake, we try each connection type from first to
# last (they are returned in a list by the first request)
#
# Examples of connection types combination that have been seen so far:
# [WE]
# [WP]
# [WE, WP]
# [WE, WP, WM]
# [WP, WM]
# [EU]
# [EU, WE] (EU tends to come first when present)
if not self.username or not self.password:
raise BrowserIncorrectPassword()
Damien Mat Jedrzejewski
committed
@retry(ValueError)
def retry_go_login():
"""
On occasions the page is not the expected JsonPage,
although response is a code 200,
and trying to parse it as such would throw a JSONDecodeError.
Retrying does the trick and avoids raising a BrowserUnavailable.
"""
return self.login.go(login=self.username)
# Retrieve the list of types: can contain a single type or more
# - when there is a single type: all the information are available
# - when there are several types: an additional request is needed
Damien Mat Jedrzejewski
committed
connection = retry_go_login()
data = connection.get_response()
Damien Mat Jedrzejewski
committed
data = self.check_connection_data(data)
assert data is not None
return data
def check_connection_data(self, data):
accounts_types = data.get('account', [])
if not self.nuser and 'WE' not in accounts_types:
raise BrowserIncorrectPassword("Utilisez Caisse d'Épargne Professionnels et renseignez votre nuser pour connecter vos comptes sur l'epace Professionels ou Entreprises.")
if len(accounts_types) > 1:
# Additional request when there is more than one connection type
# to "choose" from the list of connection types
self.multi_type = True
if self.inexttype < len(accounts_types):
if accounts_types[self.inexttype] == 'EU' and not self.nuser:
# when EU is present and not alone, it tends to come first
# if nuser is unset though, user probably doesn't want 'EU'
self.inexttype += 1
elif accounts_types[self.inexttype] == 'WE' and self.nuser:
# User is probably a netpro user and want to access their
# professional accounts
self.inexttype += 1
self.typeAccount = accounts_types[self.inexttype]
raise AssertionError('should have logged in with at least one connection type')
self.inexttype += 1
data = self.account_login.go(login=self.username, accountType=self.typeAccount).get_response()
Damien Mat Jedrzejewski
committed
return data
def do_old_login(self, data, type_account, accounts_types):
# Old virtual keyboard
id_token_clavier = data['keyboard']['Id']
vk = CaissedepargneKeyboard(data['keyboard']['ImageClavier'], data['keyboard']['Num']['string'])
newCodeConf = vk.get_string_code(self.password)
'auth_mode': 'ajax',
'nuusager': self.nuser.encode('utf-8'),
'typeAccount': type_account,
'ctx': 'typsrv={}'.format(type_account),
res = self.location(data['url'], params=payload)
except ValueError:
raise BrowserUnavailable()
if not res.page:
raise BrowserUnavailable()
response = res.page.get_response()
if response['error'] == 'Veuillez changer votre mot de passe':
raise BrowserPasswordExpired(response['error'])
# the only possible way to log in w/o nuser is on WE. if we're here no need to go further.
if not self.nuser and self.typeAccount == 'WE':
Damien Mat Jedrzejewski
committed
raise BrowserIncorrectPassword(self.page.get_wrongpass_message())
Damien Mat Jedrzejewski
committed
# all typeAccount tested and still not logged
# next iteration will throw the AssertionError if we don't raise an error here
if self.inexttype == len(accounts_types):
raise BrowserIncorrectPassword(self.page.get_wrongpass_message())
if self.multi_type:
# try to log in with the next connection type's value
Damien Mat Jedrzejewski
committed
raise BrowserIncorrectPassword(self.page.get_wrongpass_message())
self.BASEURL = urljoin(data['url'], '/')
Guillaume Risbourg
committed
def get_auth_mechanisms_validation_info(self):
""" First step of strong authentication validation
Guillaume Risbourg
committed
This method retrieve all informations needed for validation form.
Warning: need to be on `validation_option` page to get the "transaction ID".
"""
transaction_id = re.search(r'transactionID=(.*)', self.page.url)
if transaction_id:
transaction_id = transaction_id.group(1)
else:
raise AssertionError('Transfer transaction id was not found in url')
otp_validation_domain = urlparse(self.url).netloc
self.authentication_method_page.go(
domain=otp_validation_domain,
validation_id=transaction_id
)
# Can have error at first authentication request.
# In that case, it's not an invalid otp error.
# So, return a wrongpass.
self.page.check_errors(feature='login')
self.otp_validation = self.page.get_authentication_method_info()
Guillaume Risbourg
committed
if self.otp_validation['type'] not in ('SMS', 'CLOUDCARD', 'PASSWORD'):
self.logger.warning('Not handled authentication method : "%s"' % self.otp_validation['type'])
raise AuthMethodNotImplemented()
Guillaume Risbourg
committed
self.otp_validation['validation_unit_id'] = self.page.validation_unit_id
self.otp_validation['validation_id'] = transaction_id
self.otp_validation['domain'] = otp_validation_domain
def do_otp_sms_authentication(self, **params):
""" Second step of sms authentication validation
This method validate otp sms.
Warning:
* need to be used through `do_authentication_validation` method
in order to handle authentication response
* do not forget to use the first part to have all form information
* do not forget to set `otp_sms` params
Parameters:
otp_sms (str): the OTP received by SMS
"""
assert self.otp_validation
assert 'otp_sms' in params
self.authentication_step.go(
domain=self.otp_validation['domain'],
validation_id=self.otp_validation['validation_id'],
json={
'validate': {
self.otp_validation['validation_unit_id']: [{
'id': self.otp_validation['id'],
'otp_sms': params['otp_sms'],
'type': 'SMS',
}],
},
}
)
Guillaume Risbourg
committed
self.otp_validation = None
def do_cloudcard_authentication(self, **params):
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
""" Second step of cloudcard authentication validation
This method check the application validation status.
Warning:
* need to be used through `do_authentication_validation` method
in order to handle authentication response
* do not forget to use the first part to have all form information
"""
assert self.otp_validation
timeout = time.time() + 300.0
referer_url = self.authentication_method_page.build(
domain=self.otp_validation['domain'],
validation_id=self.otp_validation['validation_id'],
)
while time.time() < timeout:
self.app_validation.go(
domain=self.otp_validation['domain'],
headers={'Referer': referer_url},
)
status = self.page.get_status()
# The status is 'valid' even when the user cancels it on
# the application. The `authentication_step` will return
# AUTHENTICATION_CANCELED in its response status.
if status == 'valid':
self.authentication_step.go(
domain=self.otp_validation['domain'],
validation_id=self.otp_validation['validation_id'],
json={
'validate': {
self.otp_validation['validation_unit_id']: [{
'id': self.otp_validation['id'],
'type': 'CLOUDCARD',
}],
},
},
)
break
assert status == 'progress', 'Unhandled CloudCard status : "%s"' % status
time.sleep(2)
else:
raise AppValidationExpired()
self.otp_validation = None
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
def do_vk_authentication(self, **params):
""" Authentication with virtual keyboard
Warning: need to be used through `do_authentication_validation` method
in order to handle authentication response
"""
# Can have error at first authentication request.
# In that case, it's not a vk error, return a wrongpass.
self.page.check_errors(feature='login')
validation_id = self.page.get_validation_id()
validation_unit_id = self.page.validation_unit_id
vk_info = self.page.get_authentication_method_info()
vk_id = vk_info['id']
vk_images_url = vk_info['virtualKeyboard']['externalRestMediaApiUrl']
otp_validation_domain = urlparse(self.url).netloc
self.location(vk_images_url)
images_url = self.page.get_all_images_data()
vk = CaissedepargneNewKeyboard(self, images_url)
code = vk.get_string_code(self.password)
self.authentication_step.go(
domain=otp_validation_domain,
validation_id=validation_id,
json={
'validate': {
validation_unit_id: [{
'id': vk_id,
'password': code,
'type': 'PASSWORD',
}],
},
},
headers={
'Referer': self.BASEURL,
'Accept': 'application/json, text/plain, */*',
},
)
def do_authentication_validation(self, authentication_method, feature, **params):
""" Handle all sort of authentication with `icgauth`
This method is used for login or transfer/new recipient authentication.
Parameters:
authentication_method (str): authentication method in ('SMS', 'CLOUDCARD', 'PASSWORD')
feature (str): action that need authentication in ('login', 'transfer', 'recipient')
"""
AUTHENTICATION_METHODS = {
'SMS': self.do_otp_sms_authentication,
'CLOUDCARD': self.do_cloudcard_authentication,
'PASSWORD': self.do_vk_authentication,
}
AUTHENTICATION_METHODS[authentication_method](**params)
assert self.authentication_step.is_here()
self.page.check_errors(feature=feature)
redirect_data = self.page.get_redirect_data()
assert redirect_data, 'redirect_data must not be empty'
self.location(
redirect_data['action'],
data={
'SAMLResponse': redirect_data['samlResponse'],
},
headers={
'Referer': self.BASEURL,
'Accept': 'application/json, text/plain, */*',
},
)
def do_new_login(self, data):
connection_type = self.page.get_connection_type()
csid = str(uuid4())
redirect_url = data['url']
parts = list(urlparse(redirect_url))
url_params = parse_qs(urlparse(redirect_url).query)
qs = OrderedDict(parse_qsl(parts[4]))
qs.update({'csid': csid})
parts[4] = urlencode(qs)
url = urlunparse(parts)
continue_url = url_params['continue'][0]
continue_parameters = data['continueParameters']
# snid is either present in continue_parameters (creditcooperatif / banquebcp)
# or in url_params (caissedepargne / other children)
snid = json.loads(continue_parameters).get('snid') or url_params['snid'][0]
self.location(
url,
method='POST',
params={
'continue_parameters': continue_parameters,
},
)
main_js_file = self.page.get_main_js_file_url()
self.location(main_js_file)
client_id = self.page.get_client_id()
nonce = self.page.get_nonce() # Hardcoded in their js...
# On the website, this sends back json because of the header
# 'Accept': 'applcation/json'. If we do not add this header, we
# instead have a form that we can directly send to complete
# the login.
claims = {
'userinfo': {
'cdetab': None,
'authMethod': None,
'authLevel': None,
},
'id_token': {
'auth_time': {"essential": True},
"last_login": None,
},
}
bpcesta = {
"csid": csid,
"typ_app": "rest",
"enseigne": "ce",
"typ_sp": "out-band",
"typ_act": "auth",
"snid": snid,
"cdetab": url_params['cdetab'][0],
params = {
'nonce': nonce,
'scope': 'openid readUser',
'response_type': 'id_token token',
'response_mode': 'form_post',
'cdetab': url_params['cdetab'][0],
'login_hint': self.username,
'display': 'page',
'client_id': client_id,
# don't know if the separators= is really needed
'claims': json.dumps(claims, separators=(',', ':')),
'bpcesta': json.dumps(bpcesta, separators=(',', ':')),
}
if self.nuser:
params['login_hint'] += ' %s' % self.nuser
self.page.send_form()
if self.response.headers.get('Page_Erreur', '') == 'INDISPO':
raise BrowserUnavailable()
pre_login_status = self.page.get_wrong_pre_login_status()
if pre_login_status == 'AUTHENTICATION_FAILED':
# failing at this step means no password has been submitted yet
# and no auth method type cannot be recovered
# corresponding to 'erreur technique' on website
raise BrowserUnavailable()
authentication_method = self.page.get_authentication_method_type()
self.do_authentication_validation(
authentication_method=authentication_method,
feature='login'
)
access_token = self.page.get_access_token()
id_token = self.page.get_id_token()
continue_parameters = json.loads(continue_parameters)
self.location(
continue_url,
data={
'id_token': id_token,
'access_token': access_token,
'ctx': continue_parameters['ctx'],
'redirectUrl': continue_parameters['redirectUrl'],
'ctx_routage': continue_parameters['ctx_routage'],
},
)
# Url look like this : https://www.net382.caisse-epargne.fr/Portail.aspx
# We only want the https://www.net382.caisse-epargne.fr part
# We start the .find at 8 to get the first `/` after `https://`
parsed_url = urlparse(self.url)
self.BASEURL = 'https://' + parsed_url.netloc
def loans_conso(self):
days = ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun')
month = ('Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec')
now = datetime.datetime.today()
# for non-DST
# d = '%s %s %s %s %s:%s:%s GMT+0100 (heure normale d’Europe centrale)' % (days[now.weekday()], now.day, month[now.month - 1], now.year, now.hour, format(now.minute, "02"), now.second)
# TODO use babel library to simplify this code
d = '%s %s %s %s %s:%s:%s GMT+0200 (heure d’été d’Europe centrale)' % (
days[now.weekday()], now.day, month[now.month - 1], now.year,
now.hour, format(now.minute, "02"), now.second,
)
if self.home.is_here():
msg = self.page.loan_unavailable_msg()
if msg:
self.logger.warning('%s' % msg)
return None
return self.page.get_conso()
self.home.go()
if not self.measure_page.is_here():
raise AssertionError('Should be on measure_page')
self.page.go_measure_list()
for _ in range(page_num):
self.page.goto_next_page()
Damien Mat Jedrzejewski
committed
def get_owner_name(self):
# Get name from profile to verify who is the owner of accounts.
name = self.get_profile().name.upper().split(' ', 1)
if len(name) == 2: # if the name is complete (with first and last name)
owner_name = name[1]
else: # if there is only first name
owner_name = name[0]
Damien Mat Jedrzejewski
committed
return owner_name
@need_login
def get_measure_accounts_list(self):
"""
On home page there is a list of "measure" links, each one leading to one person accounts list.
Iter over each 'measure' and navigate to it to get all accounts
"""
self.home.go()
owner_name = self.get_owner_name()
# Make sure we are on list of measures page
if self.measure_page.is_here():
self.page.check_no_accounts()
self.accounts = []
for page_num in range(20):
for measure_id in self.page.get_measure_ids():
self.page.go_measure_accounts_list(measure_id)
if self.page.check_measure_accounts():
for account in self.page.get_list(owner_name):
account._info['measure_id'] = measure_id
account._info['measure_id_page_num'] = page_num
self.accounts.append(account)
self.go_measure_list(page_num)
if not self.page.has_next_page():
break
self.page.goto_next_page()
for account in self.accounts:
if 'acc_type' in account._info and account._info['acc_type'] == Account.TYPE_LIFE_INSURANCE:
self.go_measure_list(account._info['measure_id_page_num'])
self.page.go_measure_accounts_list(account._info['measure_id'])
self.page.go_history(account._info)
if self.message.is_here():
self.page.submit()
self.page.go_history(account._info)
balance = self.page.get_measure_balance(account)
account.balance = Decimal(FrenchTransaction.clean_amount(balance))
account.currency = account.get_currency(balance)
return self.accounts
def update_linebourse_token(self):
assert self.linebourse is not None, "linebourse browser should already exist"
self.linebourse.session.cookies.update(self.session.cookies)
Quentin Defenouillere
committed
# It is important to fetch the domain dynamically because
# for caissedepargne the domain is 'www.caisse-epargne.offrebourse.com'
# whereas for creditcooperatif it is 'www.offrebourse.com'
domain = urlparse(self.url).netloc
self.linebourse.session.headers['X-XSRF-TOKEN'] = self.session.cookies.get('XSRF-TOKEN', domain=domain)
Damien Mat Jedrzejewski
committed
def add_linebourse_accounts_data(self):
for account in self.accounts:
self.deleteCTX()
if account.type in (Account.TYPE_MARKET, Account.TYPE_PEA):
self.home_tache.go(tache='CPTSYNT0')
self.page.go_history(account._info)
Damien Mat Jedrzejewski
committed
if self.message.is_here():
self.page.submit()
self.page.go_history(account._info)
Damien Mat Jedrzejewski
committed
# Some users may not have access to this.
if not self.market.is_here():
continue
self.page.submit()
Damien Mat Jedrzejewski
committed
if 'offrebourse.com' in self.url:
# Some users may not have access to this.
Damien Mat Jedrzejewski
committed
if self.page.is_error():
Damien Mat Jedrzejewski
committed
self.update_linebourse_token()
page = self.linebourse.go_portfolio(account.id)
assert self.linebourse.portfolio.is_here()
# We must declare "page" because this URL also matches MarketPage
account.valuation_diff = page.get_valuation_diff()
Damien Mat Jedrzejewski
committed
# We need to go back to the synthesis, else we can not go home later
self.home_tache.go(tache='CPTSYNT0')
else:
raise AssertionError("new domain that hasn't been seen so far?")
Damien Mat Jedrzejewski
committed
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
def add_card_accounts(self):
"""
Card cases are really tricky on the new website.
There are 2 kinds of page where we can find cards information
- CardsPage: List some of the PSU cards
- CardsComingPage: On the coming transaction page (for a specific checking account),
we can find all cards related to this checking account. Information to reach this
CC is in the home page
We have to go through this both kind of page for those reasons:
- If there is no coming yet, the card will not be found in the home page and we will not
be able to reach the CardsComingPage. But we can find it on CardsPage
- Some cards are only on the CardsComingPage and not the CardsPage
- In CardsPage, there are cards (with "Business" in the label) without checking account on the
website (neither history nor coming), so we skip them.
- Some card on the CardsPage that have a checking account parent, but if we follow the link to
reach it with CardsComingPage, we find an other card that is not in CardsPage.
"""
if self.new_website:
for account in self.accounts:
# Adding card's account that we find in CardsComingPage of each Checking account
if account._card_links:
self.home.go()
self.page.go_history(account._card_links)
for card in self.page.iter_cards():
card.parent = account
card._coming_info = self.page.get_card_coming_info(card.number, card.parent._card_links.copy())
card.ownership = account.ownership
self.accounts.append(card)
Damien Mat Jedrzejewski
committed
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
self.home.go()
self.page.go_list()
self.page.go_cards()
# We are on the new website. We already added some card, but we can find more of them on the CardsPage
if self.cards.is_here():
for card in self.page.iter_cards():
card.parent = find_object(self.accounts, number=card._parent_id)
assert card.parent, 'card account parent %s was not found' % card
# If we already added this card, we don't have to add it a second time
if find_object(self.accounts, number=card.number):
continue
info = card.parent._card_links
# If card.parent._card_links is not filled, it mean this checking account
# has no coming transactions.
card._coming_info = None
card.ownership = card.parent.ownership
if info:
self.page.go_list()
self.page.go_history(info)
card._coming_info = self.page.get_card_coming_info(card.number, info.copy())
if not card._coming_info:
self.logger.warning('Skip card %s (not found on checking account)', card.number)
Damien Mat Jedrzejewski
committed
self.accounts.append(card)
# We are on the old website. We add all card that we can find on the CardsPage
elif self.cards_old.is_here():
for card in self.page.iter_cards():
card.parent = find_object(self.accounts, number=card._parent_id)
assert card.parent, 'card account parent %s was not found' % card.number