Commit a532c970 authored by Maxime Gasselin's avatar Maxime Gasselin Committed by Vincent A

[cmso] Handle new sms authentification

2FA is now automatically sent when it is needed. Only TPP can have
access to consent url to check if 2FA is required.
parent 7a8df0e0
......@@ -41,7 +41,7 @@
from .pages import (
LogoutPage, AccountsPage, HistoryPage, LifeinsurancePage, MarketPage,
AdvisorPage, LoginPage, ProfilePage, RedirectInsurancePage, SpacesPage,
ChangeSpacePage,
ChangeSpacePage, ConsentPage,
)
from .transfer_pages import TransferInfoPage, RecipientsListPage, TransferPage, AllowedRecipientsPage
......@@ -83,8 +83,8 @@ def cb():
class CmsoParBrowser(TwoFactorBrowser):
__states__ = ('headers',)
STATE_DURATION = 1
__states__ = ('headers', 'login_session_id', 'login_verifier', )
STATE_DURATION = 5 # SMS validity
headers = None
HAS_CREDENTIALS_ONLY = True
NEW_PROFILE = True
......@@ -104,6 +104,7 @@ class CmsoParBrowser(TwoFactorBrowser):
spaces = URL(r'/domiapi/oauth/json/accesAbonnement', SpacesPage)
change_space = URL(r'/securityapi/changeSpace', ChangeSpacePage)
consent = URL(r'/consentapi/tpp/consents', ConsentPage)
accounts = URL(r'/domiapi/oauth/json/accounts/synthese(?P<type>.*)', AccountsPage)
history = URL(r'/domiapi/oauth/json/accounts/(?P<page>.*)', HistoryPage)
......@@ -140,9 +141,9 @@ class CmsoParBrowser(TwoFactorBrowser):
json_headers = {'Content-Type': 'application/json'}
authorization_uri = 'https://api.cmso.com/oauth/authorize'
access_token_uri = 'https://api.cmso.com/oauth/token'
authorization_codegen_uri = 'https://api.cmso.com/oauth/authorization-code'
authorization_uri = URL(r'/oauth/authorize')
access_token_uri = URL(r'/oauth/token')
authorization_codegen_uri = URL(r'/oauth/authorization-code')
redirect_uri = 'https://mon.cmso.com/auth/checkuser'
error_uri = 'https://mon.cmso.com/auth/errorauthn'
client_uri = 'com.arkea.cmso.siteaccessible'
......@@ -164,6 +165,9 @@ def __init__(self, website, config, *args, **kwargs):
self.website = website
self.accounts_list = []
self.logged = False
self.login_session_id = None
self.login_verifier = None
self.login_challenge = None
self.AUTHENTICATION_METHODS = {
'code': self.handle_sms,
......@@ -189,25 +193,45 @@ def init_login(self):
self.accounts_list = []
# authorization request
verifier, challenge = self.get_pkce_codes()
self.login_verifier, self.login_challenge = self.get_pkce_codes()
params = {
'redirect_uri': self.redirect_uri,
'client_id': self.arkea_client_id,
'response_type': 'code',
'error_uri': self.error_uri,
'code_challenge_method': 'S256',
'code_challenge': challenge,
'code_challenge': self.login_challenge,
}
response = self.location(self.authorization_uri, params=params)
response = self.authorization_uri.go(params=params)
# get session_id in param location url
location_params = dict(parse_qsl(urlparse(self.url).fragment))
self.login_session_id = location_params['session_id']
self.set_profile(self.PROFILE) # reset headers but don't clear them
# authorization-code generation
data = self.get_authcode_data()
response = self.location(self.authorization_codegen_uri, data=data, params=location_params)
headers = self.get_tpp_headers(data)
try:
response = self.authorization_codegen_uri.go(
data=data,
params={'session_id': self.login_session_id},
headers=headers
)
except ClientError as e:
if e.response.status_code == 403:
response = e.response.json()
if response.get('error_code') == 'SCA_REQUIRED':
label = 'Saisissez le code reçu par SMS'
phone = response['sca_medias'][0].get('numero_masque')
if phone:
label += ' envoyé au %s' % phone
raise BrowserQuestion(Value('code', label=label))
raise
location_params = dict(parse_qsl(urlparse(response.headers['Location']).fragment))
if location_params.get('error'):
......@@ -218,73 +242,68 @@ def init_login(self):
raise BrowserUnavailable()
# authentication token generation
json = self.get_tokengen_data(location_params['code'], verifier)
response = self.location(self.access_token_uri, json=json)
data = self.get_tokengen_data(location_params['code'])
response = self.access_token_uri.go(json=data)
self.update_authentication_headers(response.json())
if location_params.get('scope') == 'consent':
self.check_interactive()
self.send_sms()
self.login.go(json={'espaceApplication': 'PART'})
if not self.page.check_is_logged():
raise AssertionError('The last login request failed')
def send_sms(self):
contact_information = self.location('/securityapi/person/coordonnees', method='POST').json()
for phone_key in ('portable', 'portablePro',):
if phone_key in contact_information:
break
else:
raise AssertionError('Phone not found in the JSON response')
def handle_sms(self):
data = {
'template': '',
'typeMedia': 'SMS', # can be SVI for interactive voice server
'valueMedia': contact_information[phone_key]['numeroCrypte'],
'access_code': self.username,
'password': self.code,
'authenticationMethod': 'SMS_MFA2',
}
self.location('/securityapi/otp/generate', json=data)
headers = self.get_tpp_headers(data)
self.authorization_codegen_uri.go(
params={'session_id': self.login_session_id},
data=data,
headers=headers
)
location_params = dict(parse_qsl(urlparse(self.response.headers['Location']).fragment))
raise BrowserQuestion(Value('code', label='Enter the SMS code'))
if location_params.get('error'):
if location_params.get('error_description') == 'authentication-failed':
raise BrowserIncorrectPassword()
# we encounter this case when an error comes from the website
elif location_params['error'] == 'server_error':
raise BrowserUnavailable()
def handle_sms(self):
self.session.headers = self.headers
data = self.get_sms_data()
otp_validation = self.location('/securityapi/otp/authenticate', json=data).json()
self.session.headers['Authorization'] = 'Bearer %s' % otp_validation['access_token']
self.headers = self.session.headers
def get_sms_data(self):
return {
'otpValue': self.code,
'typeMedia': 'WEB',
'userAgent': 'Mozilla/5.0 (X11; Linux x86_64; rv:68.0) Gecko/20100101 Firefox/68.0',
'redirectUri': self.redirect_uri,
'errorUri': self.error_uri,
'clientId': self.client_uri,
'redirect': 'true',
data = {
'code': location_params['code'],
'grant_type': 'authorization_code',
'client_id': self.arkea_client_id,
'accessInfos': {
'efs': self.arkea,
'si': self.arkea_si,
},
'redirect_uri': self.redirect_uri,
'code_verifier': self.login_verifier,
}
access_token = self.access_token_uri.go(json=data).json()
self.session.headers['Authorization'] = 'Bearer %s' % access_token['access_token']
self.headers = self.session.headers
def get_authcode_data(self):
return {
'access_code': self.username,
'password': self.password,
'space': 'PART',
}
def get_tokengen_data(self, code, verifier):
def get_tokengen_data(self, code):
return {
'client_id': self.arkea_client_id,
'code': code,
'grant_type': 'authorization_code',
'code_verifier': verifier,
'code_verifier': self.login_verifier,
'redirect_uri': self.redirect_uri,
}
def get_tpp_headers(self, data=''):
# This method can be overload by a TPP
# to add specific headers and be recognize by the bank
return {}
def update_authentication_headers(self, params):
self.session.headers['Authorization'] = "Bearer %s" % params['access_token']
self.session.headers['X-ARKEA-EFS'] = self.arkea
......
......@@ -33,7 +33,7 @@
from weboob.browser.elements import DictElement, ItemElement, TableElement, SkipItem, method
from weboob.browser.filters.standard import (
CleanText, Upper, Date, Regexp, Format, CleanDecimal, Filter, Env, Slugify,
Field, Currency, Map, Base, MapIn, Coalesce,
Field, Currency, Map, Base, MapIn, Coalesce, DateTime,
)
from weboob.browser.filters.json import Dict
from weboob.browser.filters.html import Attr, Link, TableCell, AbsoluteLink
......@@ -62,6 +62,11 @@ class LogoutPage(RawPage):
pass
class ConsentPage(JsonPage):
def get_consent_date_expire(self):
return DateTime(Dict('expirationDate'))(self.doc)
class SpacesPage(LoggedPage, JsonPage):
def get_part_space(self):
for abo in Dict('listAbonnement')(self.doc):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment