From 2e8e10e12a64b7ce97e34fab4d46fa9c10f903d4 Mon Sep 17 00:00:00 2001 From: Martin Lavoie Date: Fri, 12 Feb 2021 13:56:36 +0100 Subject: [PATCH] [americanexpress] Handle SCA without enrolment A SCA will be asked on every connection. --- modules/americanexpress/browser.py | 295 ++++++++++++++++++++++++----- modules/americanexpress/module.py | 7 +- modules/americanexpress/pages.py | 32 +++- 3 files changed, 280 insertions(+), 54 deletions(-) diff --git a/modules/americanexpress/browser.py b/modules/americanexpress/browser.py index c3144f9064..faa336077a 100644 --- a/modules/americanexpress/browser.py +++ b/modules/americanexpress/browser.py @@ -20,32 +20,43 @@ from __future__ import unicode_literals import datetime -from uuid import uuid4 +import uuid from dateutil.parser import parse as parse_date from collections import OrderedDict -from woob.exceptions import BrowserIncorrectPassword, ActionNeeded, BrowserUnavailable -from woob.browser.browsers import LoginBrowser, need_login -from woob.browser.exceptions import HTTPNotFound, ServerError +from woob.exceptions import ( + BrowserIncorrectPassword, ActionNeeded, BrowserUnavailable, + AuthMethodNotImplemented, BrowserQuestion, +) +from woob.browser.browsers import TwoFactorBrowser, need_login +from woob.browser.exceptions import HTTPNotFound, ServerError, ClientError from woob.browser.url import URL -from woob.tools.compat import urlencode +from woob.tools.compat import urljoin, urlencode, quote +from woob.tools.value import Value from .pages import ( AccountsPage, JsonBalances, JsonPeriods, JsonHistory, JsonBalances2, CurrencyPage, LoginPage, NoCardPage, - NotFoundPage, JsDataPage, HomeLoginPage, + NotFoundPage, HomeLoginPage, + ReadAuthChallengePage, UpdateAuthTokenPage, ) __all__ = ['AmericanExpressBrowser'] -class AmericanExpressBrowser(LoginBrowser): +class AmericanExpressBrowser(TwoFactorBrowser): BASEURL = 'https://global.americanexpress.com' + TWOFA_BASEURL = r'https://functions.americanexpress.com' home_login = URL(r'/login\?inav=fr_utility_logout', HomeLoginPage) login = URL(r'/myca/logon/emea/action/login', LoginPage) + read_auth_challenges = URL(TWOFA_BASEURL + r'/ReadAuthenticationChallenges.v1', ReadAuthChallengePage) + create_otp_uri = URL(TWOFA_BASEURL + r'/CreateOneTimePasscodeDelivery.v1') + update_auth_token = URL(TWOFA_BASEURL + r'/UpdateAuthenticationTokenWithChallenge.v1', UpdateAuthTokenPage) + create_2fa_uri = URL(TWOFA_BASEURL + r'/CreateTwoFactorAuthenticationForUser.v1') + accounts = URL(r'/api/servicing/v1/member', AccountsPage) json_balances = URL(r'/api/servicing/v1/financials/balances', JsonBalances) json_balances2 = URL(r'/api/servicing/v1/financials/transaction_summary\?type=split_by_cardmember&statement_end_date=(?P[\d-]+)', JsonBalances2) @@ -60,8 +71,6 @@ class AmericanExpressBrowser(LoginBrowser): json_periods = URL(r'/api/servicing/v1/financials/statement_periods', JsonPeriods) currency_page = URL(r'https://www.aexp-static.com/cdaas/axp-app/modules/axp-balance-summary/4.7.0/(?P\w\w-\w\w)/axp-balance-summary.json', CurrencyPage) - js_data = URL(r'/myca/logon/us/docs/javascript/gatekeeper/gtkp_aa.js', JsDataPage) - no_card = URL(r'https://www.americanexpress.com/us/content/no-card/', r'https://www.americanexpress.com/us/no-card/', NoCardPage) @@ -72,52 +81,73 @@ class AmericanExpressBrowser(LoginBrowser): 'PRELEVEMENT AUTOMATIQUE ENREGISTRE-MERCI', ] + HAS_CREDENTIALS_ONLY = False + def __init__(self, *args, **kwargs): super(AmericanExpressBrowser, self).__init__(*args, **kwargs) - def get_version(self): - self.js_data.go() - return self.page.get_version() + # State to keep during OTP + self.authentication_action_id = None + self.application_id = None + self.account_token = None + self.mfa_id = None + self.auth_trusted = None + + self.__states__ += ( + 'authentication_action_id', + 'application_id', + 'account_token', + 'mfa_id', + 'auth_trusted', + ) - def do_login(self): + self.AUTHENTICATION_METHODS = { + 'otp': self.handle_otp, + } + + def init_login(self): self.home_login.go() + now = datetime.datetime.utcnow() + transaction_id = 'LOGIN-%s' % str(uuid.uuid4()) # Randomly generated in js data = { 'request_type': 'login', + 'Face': 'fr_FR', + 'Logon': 'Logon', + 'version': 4, + 'inauth_profile_transaction_id': transaction_id, + 'DestPage': urljoin(self.BASEURL,'dashboard'), 'UserID': self.username, 'Password': self.password, - 'Logon': 'Logon', + 'channel': 'Web', 'REMEMBERME': 'on', - 'Face': 'fr_FR', - 'DestPage': self.BASEURL + '/dashboard', - 'inauth_profile_transaction_id': 'USLOGON-%s' % str(uuid4()), + 'b_hour': now.hour, + 'b_minute': now.minute, + 'b_second': now.second, + 'b_dayNumber': now.day, + 'b_month': now.month, + 'b_year': now.year, + 'b_timeZone': '0', + 'devicePrint': self.make_device_print(), } - # we have to overwrite `Content-Length` and `Cookie` to get all - # headers in alphabetical order or they will be added at the end - # when doing request, also we add every headers needed on website - # to try to exactly match what's done or we could get a LGON011 error - self.session.headers.update({ + self.send_login_request(data) + + def send_login_request(self, data): + # Match the headers on website to prevent LGON011 error + headers_for_login = { 'Accept': '*/*', - 'Accept-Encoding': 'gzip: deflate: br', - 'Accept-Language': 'en-US,en;q=0.9,fr;q=0.8', + 'Accept-Language': 'en-US,en;q=0.5', 'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8', - 'Content-Length': str(len(urlencode(data))), - 'Cookie': '; '.join('%s=%s' % (k, v) for k, v in self.session.cookies.get_dict().items()), - 'Origin': self.BASEURL, - 'Referer': self.BASEURL + '/login?inav=fr_utility_logout', - 'Sec-Fetch-Dest': 'empty', - 'Sec-Fetch-Mode': 'cors', - 'Sec-Fetch-Site': 'same-origin', - }) - - self.session.headers = OrderedDict(sorted(self.session.headers.items())) - del self.session.headers['Upgrade-Insecure-Requests'] + 'Origin': 'https://www.americanexpress.com', + 'Host': 'global.americanexpress.com', - self.login.go(data=data) + # Setting headers to None to remove them from the request + 'Referer': None, + 'Upgrade-Insecure-Requests': None, + } - # set back headers - self.set_profile(self.PROFILE) + self.login.go(data=data, headers=headers_for_login) if self.page.get_status_code() != 0: error_code = self.page.get_error_code() @@ -141,15 +171,194 @@ def do_login(self): # - headers not in the right order # - headers with value that doesn't match the one from website # - headers missing - # what's next ? - assert False, 'Error code "LGON011" (msg:"%s")' % message + # - IP blacklisted + # What's next ? + raise AssertionError('Error code "LGON011" (msg:"%s")' % message) + elif error_code == 'LGON013': + self.raise_otp() else: - assert False, 'Error code "%s" (msg:"%s") not handled' % (error_code, message) + raise AssertionError('Error code "%s" (msg:"%s") not handled' % (error_code, message)) + + def prepare_request(self, req): + # Get all headers in alphabetical order to prevent LGON011 error + prep = super(AmericanExpressBrowser, self).prepare_request(req) + prep.headers = OrderedDict(sorted(prep.headers.items(), key=lambda i: i[0].lower())) + return prep + + def make_device_print(self): + d = OrderedDict() + d['version'] = "3.4.0.0_1" + d['pm_fpua'] = self.session.headers['User-Agent'] + '|5.0 (X11)|Linux x86_64' + d['pm_fpsc'] = '24|1650|498|498' + d['pm_fptw'] = '' + d['pm_fptz'] = 0 + d['pm_fpln'] = 'lang=en-US|syslang=|userlang=' + d['pm_fpjv'] = 0 + d['pm_fpco'] = 1 + d['pm_fpasw'] = '' + d['pm_fpan'] = "Netscape" + d['pm_fpacn'] = "Mozilla" + d['pm_fpol'] = 'true' + d['pm_fposp'] = '' + d['pm_fpup'] = '' + d['pm_fpsaw'] = '1920' + d['pm_fpspd'] = '24' + d['pm_fpsbd'] = '' + d['pm_fpsdx'] = '' + d['pm_fpsdy'] = '' + d['pm_fpslx'] = '' + d['pm_fpsly'] = '' + d['pm_fpsfse'] = '' + d['pm_fpsui'] = '' + d['pm_os'] = 'Linux' + d['pm_brmjv'] = 78 + d['pm_br'] = 'Firefox' + d['pm_inpt'] = '' + d['pm_expt'] = '' + return ( + urlencode(d,quote_via=quote) # using quote to prevent encoding space as + + # The next four character are not quoted by quote + .replace('~', "%7E") + .replace('-', "%2D") + .replace('_', "%5F") + .replace('.', "%2E") + + # These replace are to remove the & and = included by urlencode + .replace('=', "%3D") + .replace('&', "%26") + ) + + def raise_otp(self): + self.check_interactive() + + reauth = self.page.get_reauth() + self.authentication_action_id = reauth["actionId"] + self.application_id = reauth["applicationId"] + self.mfa_id = reauth["mfaId"] + self.auth_trusted = reauth["trust"] + + if not self.auth_trusted: + self.logger.warning( + "We are not trusted. There could be a problem with the fingerprinting of cc.js" + ) + + read_auth_challenges_payload = [{ + "authenticationActionId": self.authentication_action_id, + "applicationId": self.application_id, + "locale": self.locale, + }] + self.read_auth_challenges.go(json=read_auth_challenges_payload) + + challenge = self.page.get_challenge() + assert challenge == "OTP", "We don't know how to handle '%s' challenge." % challenge + + self.account_token = self.page.get_account_token() + methods = self.page.get_otp_methods() + delivery_payload, message = self.make_otp_delivery_payload(methods) + + self.create_otp_uri.go(json=delivery_payload) + raise BrowserQuestion( + Value('otp', label=message) + ) + + def make_otp_delivery_payload(self, methods): + known_methods = ["SMS", "EMAIL"] # This is also our preference order. + methods = {m["deliveryMethod"]: m for m in methods} + + chosen_method = None + + # Select the 2FA method for this authentification. + # Search for them in the order of known_methods. + for known_method in known_methods: + chosen_method = methods.get(known_method) + if chosen_method: + break + + if chosen_method is None: + assert methods != {}, "Received no challenge option" + raise AuthMethodNotImplemented(', '.join(methods.keys())) + + delivery_method = chosen_method["deliveryMethod"] + delivery_payload = [{ + "authenticationActionId": self.authentication_action_id, + "applicationId": self.application_id, + "accountToken": self.account_token, + "locale": self.locale, + "deliveryMethod": delivery_method, + "channelType": chosen_method["channelType"], + "channelEncryptedValue": chosen_method["channelEncryptedValue"], + }] + + display_value = chosen_method["channelDisplayValue"] + if delivery_method == "EMAIL": + message = "Veuillez entrer le code d’authentification qui vous a été envoyé à l'adresse courriel %s." % display_value + else: + message = "Veuillez entrer le code d’authentification qui vous a été envoyé au %s." % display_value + + return delivery_payload, message + + def handle_otp(self): + update_auth_token_payload = [{ + "authenticationActionId": self.authentication_action_id, + "applicationId": self.application_id, + "accountToken": self.account_token, + "locale": self.locale, + "fieldName": "OTP", + "fieldValue": self.otp, + }] + try: + self.update_auth_token.go(json=update_auth_token_payload) + pending_challenge = self.page.get_pending_challenges() + except ClientError as e: + self.drop_2fa_state() + if e.response.status_code == 400 and "UEVE008" in e.response.text: + # {"description":"Invalid Claim: Data does not match SOR","errorCode":"UEVE008"} + raise BrowserIncorrectPassword("Mauvais code lors de l'authentification forte.") + raise + + if pending_challenge != "": + self.drop_2fa_state() + raise AssertionError("Multiple challenge not handled by the module yet.") + + self.enrol_device() + self.tfa_login() + self.drop_2fa_state() + + def drop_2fa_state(self): + self.account_token = None + self.application_id = None + self.authentication_action_id = None + self.mfa_id = None + self.auth_trusted = None + + def enrol_device(self): + if self.auth_trusted: + enrol_payload = [{ + "locale": self.locale, + "trust": self.auth_trusted, + "deviceName":"Accès Budget Insight pour agrégation", + }] + self.create_2fa_uri.go(json=enrol_payload) + else: + self.logger.info("Cannot enrol when we are not trusted.") + + def tfa_login(self): + data = { + 'request_type': "login", + 'Face': 'fr_FR', + 'Logon': 'Logon', + 'version': 4, + 'mfaId': self.mfa_id, + } + self.send_login_request(data) + + @property + def locale(self): + return self.session.cookies.get_dict(domain=".americanexpress.com")['axplocale'] @need_login def iter_accounts(self): - loc = self.session.cookies.get_dict(domain=".americanexpress.com")['axplocale'].lower() - self.currency_page.go(locale=loc) + self.currency_page.go(locale=self.locale.lower()) currency = self.page.get_currency() self.accounts.go() diff --git a/modules/americanexpress/module.py b/modules/americanexpress/module.py index 8fcb2a24d0..2a80c1cd6b 100644 --- a/modules/americanexpress/module.py +++ b/modules/americanexpress/module.py @@ -20,7 +20,7 @@ from woob.capabilities.bank import CapBank from woob.tools.backend import Module, BackendConfig -from woob.tools.value import ValueBackendPassword +from woob.tools.value import ValueBackendPassword, ValueTransient from .browser import AmericanExpressBrowser @@ -37,12 +37,15 @@ class AmericanExpressModule(Module, CapBank): LICENSE = 'LGPLv3+' CONFIG = BackendConfig( ValueBackendPassword('login', label='Code utilisateur', masked=False), - ValueBackendPassword('password', label='Mot de passe') + ValueBackendPassword('password', label='Mot de passe'), + ValueTransient('request_information'), + ValueTransient('otp', regexp=r'^\d{6}$'), ) BROWSER = AmericanExpressBrowser def create_default_browser(self): return self.create_browser( + self.config, self.config['login'].get(), self.config['password'].get() ) diff --git a/modules/americanexpress/pages.py b/modules/americanexpress/pages.py index 7b5ffdaae0..0b202c8012 100644 --- a/modules/americanexpress/pages.py +++ b/modules/americanexpress/pages.py @@ -20,9 +20,8 @@ from __future__ import unicode_literals from decimal import Decimal -import re -from woob.browser.pages import LoggedPage, JsonPage, HTMLPage, RawPage +from woob.browser.pages import LoggedPage, JsonPage, HTMLPage from woob.browser.elements import ItemElement, DictElement, method from woob.browser.filters.standard import ( Date, Eval, Env, CleanText, Field, CleanDecimal, Format, @@ -77,6 +76,7 @@ def get_error_code(self): # - LGON005 = Account blocked # - LGON008 = ? # - LGON010 = Browser unavailable + # - LGON013 = SCA return CleanText(Dict('errorCode'))(self.doc) def get_error_message(self): @@ -88,6 +88,27 @@ def get_error_message(self): def get_redirect_url(self): return CleanText(Dict('redirectUrl'))(self.doc) + def get_reauth(self): + return Dict('reauth')(self.doc) + + +class ReadAuthChallengePage(JsonPage): + def get_challenge(self): + return Dict("challenge")(self.doc) + + def get_account_token(self): + identity_data = Dict("identityData")(self.doc) + assert len(identity_data) == 1, "How can we have multiple identity_data?" + return identity_data[0]["identityValue"] + + def get_otp_methods(self): + return Dict("tenuredChannels")(self.doc) + + +class UpdateAuthTokenPage(JsonPage): + def get_pending_challenges(self): + return Dict('pendingChallenges')(self.doc) + class AccountsPage(LoggedPage, JsonPage): @method @@ -213,10 +234,3 @@ def obj_original_amount(self): return original_amount obj__ref = Dict('identifier') - - -class JsDataPage(RawPage): - def get_version(self): - version = re.search(r'"(\d\.[\d\._]+)"', self.text) - assert version, 'Could not match version number in javascript' - return version.group(1) -- GitLab