Commit 6d3a5fa2 authored by Maxime Gasselin's avatar Maxime Gasselin Committed by Vincent A

[cmso] Don't store session.headers anymore

Erase this bad practice. Moreover with a low STATE_DURATION, keep
session headers has no interest anymore.
parent a532c970
......@@ -83,7 +83,7 @@ def cb():
class CmsoParBrowser(TwoFactorBrowser):
__states__ = ('headers', 'login_session_id', 'login_verifier', )
__states__ = ('login_session_id', 'login_verifier', )
STATE_DURATION = 5 # SMS validity
headers = None
HAS_CREDENTIALS_ONLY = True
......@@ -164,7 +164,6 @@ def __init__(self, website, config, *args, **kwargs):
self.website = website
self.accounts_list = []
self.logged = False
self.login_session_id = None
self.login_verifier = None
self.login_challenge = None
......@@ -186,69 +185,60 @@ def get_pkce_codes(self):
def init_login(self):
self.location(self.original_site)
if self.headers:
self.session.headers = self.headers
else:
self.session.cookies.clear()
self.accounts_list = []
# authorization request
self.login_verifier, self.login_challenge = self.get_pkce_codes()
params = {
'redirect_uri': self.redirect_uri,
'client_id': self.arkea_client_id,
'response_type': 'code',
'error_uri': self.error_uri,
'code_challenge_method': 'S256',
'code_challenge': self.login_challenge,
}
response = self.authorization_uri.go(params=params)
# get session_id in param location url
location_params = dict(parse_qsl(urlparse(self.url).fragment))
self.login_session_id = location_params['session_id']
self.set_profile(self.PROFILE) # reset headers but don't clear them
# authorization-code generation
data = self.get_authcode_data()
headers = self.get_tpp_headers(data)
self.login_verifier, self.login_challenge = self.get_pkce_codes()
params = {
'redirect_uri': self.redirect_uri,
'client_id': self.arkea_client_id,
'response_type': 'code',
'error_uri': self.error_uri,
'code_challenge_method': 'S256',
'code_challenge': self.login_challenge,
}
response = self.authorization_uri.go(params=params)
try:
response = self.authorization_codegen_uri.go(
data=data,
params={'session_id': self.login_session_id},
headers=headers
)
except ClientError as e:
if e.response.status_code == 403:
response = e.response.json()
if response.get('error_code') == 'SCA_REQUIRED':
label = 'Saisissez le code reçu par SMS'
phone = response['sca_medias'][0].get('numero_masque')
if phone:
label += ' envoyé au %s' % phone
raise BrowserQuestion(Value('code', label=label))
raise
location_params = dict(parse_qsl(urlparse(response.headers['Location']).fragment))
if location_params.get('error'):
if location_params.get('error_description') == 'authentication-failed':
raise BrowserIncorrectPassword()
# we encounter this case when an error comes from the website
elif location_params['error'] == 'server_error':
raise BrowserUnavailable()
# authentication token generation
data = self.get_tokengen_data(location_params['code'])
response = self.access_token_uri.go(json=data)
self.update_authentication_headers(response.json())
self.login.go(json={'espaceApplication': 'PART'})
if not self.page.check_is_logged():
raise AssertionError('The last login request failed')
# get session_id in param location url
location_params = dict(parse_qsl(urlparse(self.url).fragment))
self.login_session_id = location_params['session_id']
self.set_profile(self.PROFILE) # reset headers but don't clear them
# authorization-code generation
data = self.get_authcode_data()
headers = self.get_tpp_headers(data)
try:
response = self.authorization_codegen_uri.go(
data=data,
params={'session_id': self.login_session_id},
headers=headers
)
except ClientError as e:
if e.response.status_code == 403:
response = e.response.json()
if response.get('error_code') == 'SCA_REQUIRED':
label = 'Saisissez le code reçu par SMS'
phone = response['sca_medias'][0].get('numero_masque')
if phone:
label += ' envoyé au %s' % phone
raise BrowserQuestion(Value('code', label=label))
raise
location_params = dict(parse_qsl(urlparse(response.headers['Location']).fragment))
if location_params.get('error'):
if location_params.get('error_description') == 'authentication-failed':
raise BrowserIncorrectPassword()
# we encounter this case when an error comes from the website
elif location_params['error'] == 'server_error':
raise BrowserUnavailable()
# authentication token generation
data = self.get_tokengen_data(location_params['code'])
response = self.access_token_uri.go(json=data)
self.update_authentication_headers(response.json())
self.login.go(json={'espaceApplication': 'PART'})
def handle_sms(self):
data = {
......@@ -281,7 +271,8 @@ def handle_sms(self):
access_token = self.access_token_uri.go(json=data).json()
self.session.headers['Authorization'] = 'Bearer %s' % access_token['access_token']
self.headers = self.session.headers
self.update_authentication_headers(access_token)
self.login.go(json={'espaceApplication': 'PART'})
def get_authcode_data(self):
return {
......@@ -309,7 +300,6 @@ def update_authentication_headers(self, params):
self.session.headers['X-ARKEA-EFS'] = self.arkea
self.session.headers['X-Csrf-Token'] = params['access_token']
self.session.headers['X-REFERER-TOKEN'] = 'RWDPART'
self.headers = self.session.headers
def get_account(self, _id):
return find_object(self.iter_accounts(), id=_id, error=AccountNotFound)
......
......@@ -53,7 +53,8 @@
class LoginPage(JsonPage):
def check_is_logged(self):
@property
def logged(self):
# Just to verify we have the expected keys
return 'firstAccess' in self.doc and 'defaultBadContractNumber' in self.doc
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment