# -*- coding: utf-8 -*- # Copyright(C) 2013 Romain Bignon # # This file is part of a woob module. # # This woob module is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This woob module is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this woob module. If not, see . import datetime import uuid from dateutil.parser import parse as parse_date from collections import OrderedDict from urllib.parse import urljoin, urlencode, quote from woob.browser.selenium import ( SeleniumBrowser, SubSeleniumMixin, IsHereCondition, webdriver, ) from woob.exceptions import ( BrowserIncorrectPassword, BrowserPasswordExpired, BrowserUnavailable, AuthMethodNotImplemented, BrowserQuestion, ScrapingBlocked, ) from woob.browser.browsers import need_login from woob.browser.mfa import TwoFactorBrowser from woob.browser.exceptions import HTTPNotFound, ServerError, ClientError from woob.browser.url import URL from woob.tools.value import Value from .pages import ( AccountsPage, JsonBalances, JsonPeriods, JsonHistory, JsonBalances2, CurrencyPage, LoginPage, NoCardPage, NotFoundPage, HomeLoginPage, ReadAuthChallengePage, UpdateAuthTokenPage, SHomePage, SLoginPage, ) from .fingerprint import FingerprintPage class AmericanExpressBrowser(TwoFactorBrowser): BASEURL = 'https://global.americanexpress.com' TWOFA_BASEURL = r'https://functions.americanexpress.com' home_login = URL(r'/login\?inav=fr_utility_logout', HomeLoginPage) login = URL(r'/myca/logon/emea/action/login', LoginPage) fingerprint = URL(r'https://www.cdn-path.com/cc.js\?=&sid=ee490b8fb9a4d570&tid=(?P.*)&namespace=inauth', FingerprintPage) read_auth_challenges = URL(TWOFA_BASEURL + r'/ReadAuthenticationChallenges.v1', ReadAuthChallengePage) create_otp_uri = URL(TWOFA_BASEURL + r'/CreateOneTimePasscodeDelivery.v1') update_auth_token = URL(TWOFA_BASEURL + r'/UpdateAuthenticationTokenWithChallenge.v1', UpdateAuthTokenPage) create_2fa_uri = URL(TWOFA_BASEURL + r'/CreateTwoFactorAuthenticationForUser.v1') accounts = URL(r'/api/servicing/v1/member', AccountsPage) json_balances = URL(r'/api/servicing/v1/financials/balances', JsonBalances) json_balances2 = URL(r'/api/servicing/v1/financials/transaction_summary\?type=split_by_cardmember&statement_end_date=(?P[\d-]+)', JsonBalances2) json_pending = URL( r'/api/servicing/v1/financials/transactions\?limit=1000&offset=(?P\d+)&status=pending', JsonHistory ) json_posted = URL( r'/api/servicing/v1/financials/transactions\?limit=1000&offset=(?P\d+)&statement_end_date=(?P[0-9-]+)&status=posted', JsonHistory ) json_periods = URL(r'/api/servicing/v1/financials/statement_periods', JsonPeriods) currency_page = URL(r'https://www.aexp-static.com/cdaas/axp-app/modules/axp-balance-summary/4.7.0/(?P\w\w-\w\w)/axp-balance-summary.json', CurrencyPage) no_card = URL(r'https://www.americanexpress.com/us/content/no-card/', r'https://www.americanexpress.com/us/no-card/', NoCardPage) not_found = URL(r'/accounts/error', NotFoundPage) SUMMARY_CARD_LABEL = [ 'PAYMENT RECEIVED - THANK YOU', 'PRELEVEMENT AUTOMATIQUE ENREGISTRE-MERCI', ] HAS_CREDENTIALS_ONLY = True def __init__(self, *args, **kwargs): super(AmericanExpressBrowser, self).__init__(*args, **kwargs) # State to keep during OTP self.authentication_action_id = None self.application_id = None self.account_token = None self.mfa_id = None self.auth_trusted = None self.__states__ += ( 'authentication_action_id', 'application_id', 'account_token', 'mfa_id', 'auth_trusted', ) self.AUTHENTICATION_METHODS = { 'otp': self.handle_otp, } def init_login(self): self.setup_browser_for_login_request() transaction_id = self.make_transaction_id() now = datetime.datetime.utcnow() data = { 'request_type': 'login', 'Face': 'fr_FR', 'Logon': 'Logon', 'version': 4, 'inauth_profile_transaction_id': transaction_id, 'DestPage': urljoin(self.BASEURL,'dashboard'), 'UserID': self.username, 'Password': self.password, 'channel': 'Web', 'REMEMBERME': 'on', 'b_hour': now.hour, 'b_minute': now.minute, 'b_second': now.second, 'b_dayNumber': now.day, 'b_month': now.month, 'b_year': now.year, 'b_timeZone': '0', 'devicePrint': self.make_device_print(), } self.send_login_request(data) def send_login_request(self, data): # Match the headers on website to prevent LGON011 error headers_for_login = { 'Accept': '*/*', 'Accept-Language': 'en-US,en;q=0.5', 'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8', 'Origin': 'https://www.americanexpress.com', 'Host': 'global.americanexpress.com', # Setting headers to None to remove them from the request 'Referer': None, 'Upgrade-Insecure-Requests': None, } self.login.go(data=data, headers=headers_for_login) if self.page.get_status_code() != 0: error_code = self.page.get_error_code() message = self.page.get_error_message() if any(code in error_code for code in ('LGON001', 'LGON003')): raise BrowserIncorrectPassword(message) elif error_code == 'LGON004': # This error happens when the website needs the user to # enter his card information and reset his password. # There is no message returned when this error happens. raise BrowserPasswordExpired() elif error_code == 'LGON008': # Don't know what this error means, but if we follow the redirect # url it allows us to be correctly logged. self.location(self.page.get_redirect_url()) elif error_code == 'LGON010': raise BrowserUnavailable(message) elif error_code == 'LGON011': # this kind of error is for mystical reasons, # but until now it was headers related, it could be : # - headers not in the right order # - headers with value that doesn't match the one from website # - headers missing # What's next ? if "CBIS_Challenge_Or_Deny" in message: # IP blacklisted raise ScrapingBlocked() raise AssertionError('Error code "LGON011" (msg:"%s")' % message) elif error_code == 'LGON013': self.raise_otp() else: raise AssertionError('Error code "%s" (msg:"%s") not handled' % (error_code, message)) def prepare_request(self, req): # Get all headers in alphabetical order to prevent LGON011 error prep = super(AmericanExpressBrowser, self).prepare_request(req) prep.headers = OrderedDict(sorted(prep.headers.items(), key=lambda i: i[0].lower())) return prep def clear_init_cookies(self): # Keep the device-id to prevent an SCA for cookie in self.session.cookies: if cookie.name == "device-id": device = cookie break else: device = None self.session.cookies.clear() if device: self.session.cookies.set_cookie(device) def setup_browser_for_login_request(self): self.home_login.go() def make_transaction_id(self): transaction_id = 'LOGIN-%s' % uuid.uuid4() # Randomly generated in js self.register_transaction_id(transaction_id) return transaction_id def register_transaction_id(self, transaction_id): self.fingerprint.go(transaction_id=transaction_id) payload = self.page.make_payload_for_s2(transaction_id) self.open('https://www.cdn-path.com/s2', method="POST", params={ 't': self.page.get_t(), 'x': 1, # Not seen change yet 'sid': 'ee490b8fb9a4d570', # Not seen change yet 'tid': transaction_id, }, files = { '_f': payload, }, headers = { 'Accept-Encoding': 'gzip, deflate, br', 'Host': 'www.cdn-path.com', 'Origin': 'https://www.americanexpress.com', 'Referer': 'https://www.americanexpress.com/', 'Pragma': 'no-cache', 'TE': 'Trailers', }, ) def make_device_print(self): d = OrderedDict() d['version'] = "3.4.0.0_1" d['pm_fpua'] = self.session.headers['User-Agent'] + '|5.0 (X11)|Linux x86_64' d['pm_fpsc'] = '24|1650|498|498' d['pm_fptw'] = '' d['pm_fptz'] = 0 d['pm_fpln'] = 'lang=en-US|syslang=|userlang=' d['pm_fpjv'] = 0 d['pm_fpco'] = 1 d['pm_fpasw'] = '' d['pm_fpan'] = "Netscape" d['pm_fpacn'] = "Mozilla" d['pm_fpol'] = 'true' d['pm_fposp'] = '' d['pm_fpup'] = '' d['pm_fpsaw'] = '1920' d['pm_fpspd'] = '24' d['pm_fpsbd'] = '' d['pm_fpsdx'] = '' d['pm_fpsdy'] = '' d['pm_fpslx'] = '' d['pm_fpsly'] = '' d['pm_fpsfse'] = '' d['pm_fpsui'] = '' d['pm_os'] = 'Linux' d['pm_brmjv'] = 78 d['pm_br'] = 'Firefox' d['pm_inpt'] = '' d['pm_expt'] = '' return ( urlencode(d,quote_via=quote) # using quote to prevent encoding space as + # The next four character are not quoted by quote .replace('~', "%7E") .replace('-', "%2D") .replace('_', "%5F") .replace('.', "%2E") # These replace are to remove the & and = included by urlencode .replace('=', "%3D") .replace('&', "%26") ) def raise_otp(self): self.check_interactive() reauth = self.page.get_reauth() self.authentication_action_id = reauth["actionId"] self.application_id = reauth["applicationId"] self.mfa_id = reauth["mfaId"] self.auth_trusted = reauth["trust"] if not self.auth_trusted: self.logger.warning( "We are not trusted. There could be a problem with the fingerprinting of cc.js" ) read_auth_challenges_payload = [{ "authenticationActionId": self.authentication_action_id, "applicationId": self.application_id, "locale": self.locale, }] self.read_auth_challenges.go(json=read_auth_challenges_payload) challenge = self.page.get_challenge() assert challenge == "OTP", "We don't know how to handle '%s' challenge." % challenge self.account_token = self.page.get_account_token() methods = self.page.get_otp_methods() delivery_payload, message = self.make_otp_delivery_payload(methods) self.create_otp_uri.go(json=delivery_payload) raise BrowserQuestion( Value('otp', label=message) ) def make_otp_delivery_payload(self, methods): known_methods = ["SMS", "EMAIL"] # This is also our preference order. methods = {m["deliveryMethod"]: m for m in methods} chosen_method = None # Select the 2FA method for this authentification. # Search for them in the order of known_methods. for known_method in known_methods: chosen_method = methods.get(known_method) if chosen_method: break if chosen_method is None: assert methods != {}, "Received no challenge option" raise AuthMethodNotImplemented(', '.join(methods.keys())) delivery_method = chosen_method["deliveryMethod"] delivery_payload = [{ "authenticationActionId": self.authentication_action_id, "applicationId": self.application_id, "accountToken": self.account_token, "locale": self.locale, "deliveryMethod": delivery_method, "channelType": chosen_method["channelType"], "channelEncryptedValue": chosen_method["channelEncryptedValue"], }] display_value = chosen_method["channelDisplayValue"] if delivery_method == "EMAIL": message = "Veuillez entrer le code d’authentification qui vous a été envoyé à l'adresse courriel %s." % display_value else: message = "Veuillez entrer le code d’authentification qui vous a été envoyé au %s." % display_value return delivery_payload, message def handle_otp(self): update_auth_token_payload = [{ "authenticationActionId": self.authentication_action_id, "applicationId": self.application_id, "accountToken": self.account_token, "locale": self.locale, "fieldName": "OTP", "fieldValue": self.otp, }] try: self.update_auth_token.go(json=update_auth_token_payload) pending_challenge = self.page.get_pending_challenges() except ClientError as e: self.drop_2fa_state() if e.response.status_code == 400 and "UEVE008" in e.response.text: # {"description":"Invalid Claim: Data does not match SOR","errorCode":"UEVE008"} raise BrowserIncorrectPassword("Mauvais code lors de l'authentification forte.") raise if pending_challenge != "": self.drop_2fa_state() raise AssertionError("Multiple challenge not handled by the module yet.") self.enrol_device() self.tfa_login() self.drop_2fa_state() def drop_2fa_state(self): self.account_token = None self.application_id = None self.authentication_action_id = None self.mfa_id = None self.auth_trusted = None def enrol_device(self): if self.auth_trusted: enrol_payload = [{ "locale": self.locale, "trust": self.auth_trusted, "deviceName":"Accès Budget Insight pour agrégation", }] self.create_2fa_uri.go(json=enrol_payload) else: self.logger.info("Cannot enrol when we are not trusted.") def tfa_login(self): data = { 'request_type': "login", 'Face': 'fr_FR', 'Logon': 'Logon', 'version': 4, 'mfaId': self.mfa_id, } self.send_login_request(data) @property def locale(self): return self.session.cookies.get_dict(domain=".americanexpress.com")['axplocale'] @need_login def iter_accounts(self): self.currency_page.go(locale=self.locale.lower()) currency = self.page.get_currency() self.accounts.go() account_list = list(self.page.iter_accounts(currency=currency)) for account in account_list: try: # for the main account self.json_balances.go(headers={'account_tokens': account.id}) except HTTPNotFound: # for secondary accounts self.json_periods.go(headers={'account_token': account._history_token}) periods = self.page.get_periods() period_index = 1 if len(periods) == 1: # Recently created accounts have only one period period_index = 0 self.json_balances2.go(date=periods[period_index], headers={'account_tokens': account.id}) self.page.fill_balances(obj=account) yield account @need_login def iter_history(self, account): self.json_periods.go(headers={'account_token': account._history_token}) periods = self.page.get_periods() today = datetime.date.today() # TODO handle pagination for p in periods: self.json_posted.go(offset=0, end=p, headers={'account_token': account._history_token}) for tr in self.page.iter_history(periods=periods): # As the website is very handy, passing account_token is not enough: # it will return every transactions of each account, so we # have to match them manually if tr._owner == account._idforJSON and tr.date <= today: yield tr @need_login def iter_coming(self, account): # Coming transactions can be found in a 'pending' JSON if it exists # ('En attente' tab on the website), as well as in a 'posted' JSON # ('Enregistrées' tab on the website) # "pending" have no vdate and debit date is in future self.json_periods.go(headers={'account_token': account._history_token}) periods = self.page.get_periods() date = parse_date(periods[0]).date() today = datetime.date.today() # when the latest period ends today we can't know the coming debit date if date != today: try: self.json_pending.go(offset=0, headers={'account_token': account._history_token}) except ServerError as exc: # At certain times of the month a connection might not have pendings; # in that case, `json_pending.go` would throw a 502 error Bad Gateway error_code = exc.response.json().get('code') error_message = exc.response.json().get('message') self.logger.warning('No pendings page to access to, got error %s and message "%s" instead.', error_code, error_message) else: for tr in self.page.iter_history(periods=periods): if tr._owner == account._idforJSON: tr.date = date yield tr # "posted" have a vdate but debit date can be future or past for p in periods: self.json_posted.go(offset=0, end=p, headers={'account_token': account._history_token}) for tr in self.page.iter_history(periods=periods): if tr.date > today or not tr.date: if tr._owner == account._idforJSON: yield tr else: return class AmericanExpressSeleniumFingerprintBrowser(SeleniumBrowser): BASEURL = 'https://global.americanexpress.com' home_login = URL(r'/login\?inav=fr_utility_logout', SHomePage) login = URL(r'https://www.americanexpress.com/en-us/account/login', SLoginPage) HEADLESS = True # Always change to True for prod WINDOW_SIZE = (1800, 1000) DRIVER = webdriver.Chrome def __init__(self, config, *args, **kwargs): super(AmericanExpressSeleniumFingerprintBrowser, self).__init__(*args, **kwargs) def _build_options(self, preferences): ops = super(AmericanExpressSeleniumFingerprintBrowser, self)._build_options(preferences) if self.DRIVER is webdriver.Chrome: ops.add_argument('--no-sandbox') return ops def do_login(self): """ We don't really support login via selenium. We only load the login to execute some javascript and then extract cookies + some other values generated in javascript. """ self.home_login.go() self.wait_until(IsHereCondition(self.login)) class AmericanExpressWithSeleniumBrowser(SubSeleniumMixin, AmericanExpressBrowser): """ Use a selenium browser to pass the fingerprinting instead of trying to solve it manually. Selenium is executed at the start of init_login in setup_browser_for_login_request. From inside SubSeleniumMixin.do_login, the load_selenium_session method will be called after the 'login' process of selenium has finished. That allows to retrieve informations that will be needed in the rest of the login process. After that, the login proceed as normal except for the overriden make_device_print and make make_transaction_id where we used values directly from selenium. """ SELENIUM_BROWSER = AmericanExpressSeleniumFingerprintBrowser def __init__(self, *args, **kwargs): super(AmericanExpressWithSeleniumBrowser, self).__init__(*args, **kwargs) self.selenium_login_transaction_id = None self.selenium_device_print = None self.selenium_user_agent = None self.__states__ += ('selenium_user_agent', ) def _build_options(self, preferences): ops = super(AmericanExpressWithSeleniumBrowser, self)._build_options(preferences) if self.DRIVER is webdriver.Chrome: ops.add_argument('--no-sandbox') return ops def do_login(self, *args, **kwargs): AmericanExpressBrowser.do_login(self, *args, **kwargs) def load_state(self, *args, **kwargs): super(AmericanExpressWithSeleniumBrowser, self).load_state(*args, **kwargs) if self.selenium_user_agent: self.session.headers['User-Agent'] = self.selenium_user_agent def load_selenium_session(self, selenium): self.clear_init_cookies() super(AmericanExpressWithSeleniumBrowser, self).load_selenium_session(selenium) # We need to send this value in the login request. self.selenium_login_transaction_id = selenium.driver.execute_script("return window.inauth._cc[0][1].tid;") selenium_has_device_print = selenium.driver.execute_script('return window.RSA;') if not selenium_has_device_print: # New login method is not supported. raise BrowserUnavailable() # Save the device print and the user-agent from selenium to replicate the website as much as possible self.selenium_device_print = selenium.driver.execute_script('return RSA.encode_deviceprint();') self.selenium_user_agent = selenium.driver.execute_script("return navigator.userAgent;") self.session.headers['User-Agent'] = self.selenium_user_agent def setup_browser_for_login_request(self): SubSeleniumMixin.do_login(self) def make_device_print(self): assert self.selenium_device_print return self.selenium_device_print def make_transaction_id(self): assert self.selenium_login_transaction_id return self.selenium_login_transaction_id