diff --git a/modules/cmso/par/browser.py b/modules/cmso/par/browser.py index 0a5e707a5be31e03718fd0194733b43644fbe4a9..2fae62d0ef696ba4bcf7bf560fc600b5d385d7b8 100644 --- a/modules/cmso/par/browser.py +++ b/modules/cmso/par/browser.py @@ -41,7 +41,7 @@ from .pages import ( LogoutPage, AccountsPage, HistoryPage, LifeinsurancePage, MarketPage, AdvisorPage, LoginPage, ProfilePage, RedirectInsurancePage, SpacesPage, - ChangeSpacePage, + ChangeSpacePage, ConsentPage, ) from .transfer_pages import TransferInfoPage, RecipientsListPage, TransferPage, AllowedRecipientsPage @@ -83,8 +83,8 @@ def cb(): class CmsoParBrowser(TwoFactorBrowser): - __states__ = ('headers',) - STATE_DURATION = 1 + __states__ = ('headers', 'login_session_id', 'login_verifier', ) + STATE_DURATION = 5 # SMS validity headers = None HAS_CREDENTIALS_ONLY = True NEW_PROFILE = True @@ -104,6 +104,7 @@ class CmsoParBrowser(TwoFactorBrowser): spaces = URL(r'/domiapi/oauth/json/accesAbonnement', SpacesPage) change_space = URL(r'/securityapi/changeSpace', ChangeSpacePage) + consent = URL(r'/consentapi/tpp/consents', ConsentPage) accounts = URL(r'/domiapi/oauth/json/accounts/synthese(?P.*)', AccountsPage) history = URL(r'/domiapi/oauth/json/accounts/(?P.*)', HistoryPage) @@ -140,9 +141,9 @@ class CmsoParBrowser(TwoFactorBrowser): json_headers = {'Content-Type': 'application/json'} - authorization_uri = 'https://api.cmso.com/oauth/authorize' - access_token_uri = 'https://api.cmso.com/oauth/token' - authorization_codegen_uri = 'https://api.cmso.com/oauth/authorization-code' + authorization_uri = URL(r'/oauth/authorize') + access_token_uri = URL(r'/oauth/token') + authorization_codegen_uri = URL(r'/oauth/authorization-code') redirect_uri = 'https://mon.cmso.com/auth/checkuser' error_uri = 'https://mon.cmso.com/auth/errorauthn' client_uri = 'com.arkea.cmso.siteaccessible' @@ -164,6 +165,9 @@ def __init__(self, website, config, *args, **kwargs): self.website = website self.accounts_list = [] self.logged = False + self.login_session_id = None + self.login_verifier = None + self.login_challenge = None self.AUTHENTICATION_METHODS = { 'code': self.handle_sms, @@ -189,25 +193,45 @@ def init_login(self): self.accounts_list = [] # authorization request - verifier, challenge = self.get_pkce_codes() + self.login_verifier, self.login_challenge = self.get_pkce_codes() params = { 'redirect_uri': self.redirect_uri, 'client_id': self.arkea_client_id, 'response_type': 'code', 'error_uri': self.error_uri, 'code_challenge_method': 'S256', - 'code_challenge': challenge, + 'code_challenge': self.login_challenge, } - response = self.location(self.authorization_uri, params=params) + response = self.authorization_uri.go(params=params) # get session_id in param location url location_params = dict(parse_qsl(urlparse(self.url).fragment)) + self.login_session_id = location_params['session_id'] self.set_profile(self.PROFILE) # reset headers but don't clear them # authorization-code generation data = self.get_authcode_data() - response = self.location(self.authorization_codegen_uri, data=data, params=location_params) + headers = self.get_tpp_headers(data) + + try: + response = self.authorization_codegen_uri.go( + data=data, + params={'session_id': self.login_session_id}, + headers=headers + ) + except ClientError as e: + if e.response.status_code == 403: + response = e.response.json() + + if response.get('error_code') == 'SCA_REQUIRED': + label = 'Saisissez le code reçu par SMS' + phone = response['sca_medias'][0].get('numero_masque') + if phone: + label += ' envoyé au %s' % phone + raise BrowserQuestion(Value('code', label=label)) + raise + location_params = dict(parse_qsl(urlparse(response.headers['Location']).fragment)) if location_params.get('error'): @@ -218,73 +242,68 @@ def init_login(self): raise BrowserUnavailable() # authentication token generation - json = self.get_tokengen_data(location_params['code'], verifier) - response = self.location(self.access_token_uri, json=json) + data = self.get_tokengen_data(location_params['code']) + response = self.access_token_uri.go(json=data) self.update_authentication_headers(response.json()) - if location_params.get('scope') == 'consent': - self.check_interactive() - self.send_sms() - self.login.go(json={'espaceApplication': 'PART'}) if not self.page.check_is_logged(): raise AssertionError('The last login request failed') - def send_sms(self): - contact_information = self.location('/securityapi/person/coordonnees', method='POST').json() - - for phone_key in ('portable', 'portablePro',): - if phone_key in contact_information: - break - else: - raise AssertionError('Phone not found in the JSON response') + def handle_sms(self): data = { - 'template': '', - 'typeMedia': 'SMS', # can be SVI for interactive voice server - 'valueMedia': contact_information[phone_key]['numeroCrypte'], + 'access_code': self.username, + 'password': self.code, + 'authenticationMethod': 'SMS_MFA2', } - self.location('/securityapi/otp/generate', json=data) + headers = self.get_tpp_headers(data) + self.authorization_codegen_uri.go( + params={'session_id': self.login_session_id}, + data=data, + headers=headers + ) + location_params = dict(parse_qsl(urlparse(self.response.headers['Location']).fragment)) - raise BrowserQuestion(Value('code', label='Enter the SMS code')) + if location_params.get('error'): + if location_params.get('error_description') == 'authentication-failed': + raise BrowserIncorrectPassword() + # we encounter this case when an error comes from the website + elif location_params['error'] == 'server_error': + raise BrowserUnavailable() - def handle_sms(self): - self.session.headers = self.headers - data = self.get_sms_data() - otp_validation = self.location('/securityapi/otp/authenticate', json=data).json() - self.session.headers['Authorization'] = 'Bearer %s' % otp_validation['access_token'] - self.headers = self.session.headers - - def get_sms_data(self): - return { - 'otpValue': self.code, - 'typeMedia': 'WEB', - 'userAgent': 'Mozilla/5.0 (X11; Linux x86_64; rv:68.0) Gecko/20100101 Firefox/68.0', - 'redirectUri': self.redirect_uri, - 'errorUri': self.error_uri, - 'clientId': self.client_uri, - 'redirect': 'true', + data = { + 'code': location_params['code'], + 'grant_type': 'authorization_code', 'client_id': self.arkea_client_id, - 'accessInfos': { - 'efs': self.arkea, - 'si': self.arkea_si, - }, + 'redirect_uri': self.redirect_uri, + 'code_verifier': self.login_verifier, } + access_token = self.access_token_uri.go(json=data).json() + self.session.headers['Authorization'] = 'Bearer %s' % access_token['access_token'] + self.headers = self.session.headers + def get_authcode_data(self): return { 'access_code': self.username, 'password': self.password, + 'space': 'PART', } - def get_tokengen_data(self, code, verifier): + def get_tokengen_data(self, code): return { 'client_id': self.arkea_client_id, 'code': code, 'grant_type': 'authorization_code', - 'code_verifier': verifier, + 'code_verifier': self.login_verifier, 'redirect_uri': self.redirect_uri, } + def get_tpp_headers(self, data=''): + # This method can be overload by a TPP + # to add specific headers and be recognize by the bank + return {} + def update_authentication_headers(self, params): self.session.headers['Authorization'] = "Bearer %s" % params['access_token'] self.session.headers['X-ARKEA-EFS'] = self.arkea diff --git a/modules/cmso/par/pages.py b/modules/cmso/par/pages.py index 8b6c5236b4af2b804c0cde411a3105812dde998f..abe3a397a376c5dec535c3f04d84abbf35851fd3 100644 --- a/modules/cmso/par/pages.py +++ b/modules/cmso/par/pages.py @@ -33,7 +33,7 @@ from weboob.browser.elements import DictElement, ItemElement, TableElement, SkipItem, method from weboob.browser.filters.standard import ( CleanText, Upper, Date, Regexp, Format, CleanDecimal, Filter, Env, Slugify, - Field, Currency, Map, Base, MapIn, Coalesce, + Field, Currency, Map, Base, MapIn, Coalesce, DateTime, ) from weboob.browser.filters.json import Dict from weboob.browser.filters.html import Attr, Link, TableCell, AbsoluteLink @@ -62,6 +62,11 @@ class LogoutPage(RawPage): pass +class ConsentPage(JsonPage): + def get_consent_date_expire(self): + return DateTime(Dict('expirationDate'))(self.doc) + + class SpacesPage(LoggedPage, JsonPage): def get_part_space(self): for abo in Dict('listAbonnement')(self.doc):