Newer
Older
# -*- coding: utf-8 -*-
# Copyright(C) 2013 Romain Bignon
#
# This woob module is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This woob module is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this woob module. If not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
from dateutil.parser import parse as parse_date
from collections import OrderedDict
from woob.exceptions import (
BrowserIncorrectPassword, ActionNeeded, BrowserUnavailable,
AuthMethodNotImplemented, BrowserQuestion,
)
from woob.browser.browsers import TwoFactorBrowser, need_login
from woob.browser.exceptions import HTTPNotFound, ServerError, ClientError
from woob.tools.compat import urljoin, urlencode, quote
from woob.tools.value import Value
from .pages import (
AccountsPage, JsonBalances, JsonPeriods, JsonHistory,
JsonBalances2, CurrencyPage, LoginPage, NoCardPage,
NotFoundPage, HomeLoginPage,
ReadAuthChallengePage, UpdateAuthTokenPage,
from .fingerprint import FingerprintPage
__all__ = ['AmericanExpressBrowser']
class AmericanExpressBrowser(TwoFactorBrowser):
BASEURL = 'https://global.americanexpress.com'
TWOFA_BASEURL = r'https://functions.americanexpress.com'
home_login = URL(r'/login\?inav=fr_utility_logout', HomeLoginPage)
login = URL(r'/myca/logon/emea/action/login', LoginPage)
fingerprint = URL(r'https://www.cdn-path.com/cc.js\?=&sid=ee490b8fb9a4d570&tid=(?P<transaction_id>.*)&namespace=inauth', FingerprintPage)
read_auth_challenges = URL(TWOFA_BASEURL + r'/ReadAuthenticationChallenges.v1', ReadAuthChallengePage)
create_otp_uri = URL(TWOFA_BASEURL + r'/CreateOneTimePasscodeDelivery.v1')
update_auth_token = URL(TWOFA_BASEURL + r'/UpdateAuthenticationTokenWithChallenge.v1', UpdateAuthTokenPage)
create_2fa_uri = URL(TWOFA_BASEURL + r'/CreateTwoFactorAuthenticationForUser.v1')
accounts = URL(r'/api/servicing/v1/member', AccountsPage)
json_balances = URL(r'/api/servicing/v1/financials/balances', JsonBalances)
json_balances2 = URL(r'/api/servicing/v1/financials/transaction_summary\?type=split_by_cardmember&statement_end_date=(?P<date>[\d-]+)', JsonBalances2)
json_pending = URL(
r'/api/servicing/v1/financials/transactions\?limit=1000&offset=(?P<offset>\d+)&status=pending',
JsonHistory
)
json_posted = URL(
r'/api/servicing/v1/financials/transactions\?limit=1000&offset=(?P<offset>\d+)&statement_end_date=(?P<end>[0-9-]+)&status=posted',
JsonHistory
)
json_periods = URL(r'/api/servicing/v1/financials/statement_periods', JsonPeriods)
currency_page = URL(r'https://www.aexp-static.com/cdaas/axp-app/modules/axp-balance-summary/4.7.0/(?P<locale>\w\w-\w\w)/axp-balance-summary.json', CurrencyPage)
no_card = URL(r'https://www.americanexpress.com/us/content/no-card/',
r'https://www.americanexpress.com/us/no-card/', NoCardPage)
not_found = URL(r'/accounts/error', NotFoundPage)
SUMMARY_CARD_LABEL = [
'PAYMENT RECEIVED - THANK YOU',
'PRELEVEMENT AUTOMATIQUE ENREGISTRE-MERCI',
HAS_CREDENTIALS_ONLY = True
def __init__(self, *args, **kwargs):
super(AmericanExpressBrowser, self).__init__(*args, **kwargs)
# State to keep during OTP
self.authentication_action_id = None
self.application_id = None
self.account_token = None
self.mfa_id = None
self.auth_trusted = None
self.__states__ += (
'authentication_action_id',
'application_id',
'account_token',
'mfa_id',
'auth_trusted',
)
self.AUTHENTICATION_METHODS = {
'otp': self.handle_otp,
}
def init_login(self):
now = datetime.datetime.utcnow()
transaction_id = 'LOGIN-%s' % uuid.uuid4() # Randomly generated in js
self.register_transaction_id(transaction_id, now)
data = {
'request_type': 'login',
'Face': 'fr_FR',
'Logon': 'Logon',
'version': 4,
'inauth_profile_transaction_id': transaction_id,
'DestPage': urljoin(self.BASEURL,'dashboard'),
'UserID': self.username,
'Password': self.password,
'REMEMBERME': 'on',
'b_hour': now.hour,
'b_minute': now.minute,
'b_second': now.second,
'b_dayNumber': now.day,
'b_month': now.month,
'b_year': now.year,
'b_timeZone': '0',
'devicePrint': self.make_device_print(),
self.send_login_request(data)
def send_login_request(self, data):
# Match the headers on website to prevent LGON011 error
headers_for_login = {
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.5',
'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8',
'Origin': 'https://www.americanexpress.com',
'Host': 'global.americanexpress.com',
# Setting headers to None to remove them from the request
'Referer': None,
'Upgrade-Insecure-Requests': None,
}
self.login.go(data=data, headers=headers_for_login)
if self.page.get_status_code() != 0:
error_code = self.page.get_error_code()
message = self.page.get_error_message()
if any(code in error_code for code in ('LGON001', 'LGON003')):
raise BrowserIncorrectPassword(message)
elif error_code == 'LGON004':
# This error happens when the website needs the user to
# enter his card information and reset his password.
# There is no message returned when this error happens.
raise ActionNeeded()
elif error_code == 'LGON008':
# Don't know what this error means, but if we follow the redirect
# url it allows us to be correctly logged.
self.location(self.page.get_redirect_url())
elif error_code == 'LGON010':
raise BrowserUnavailable(message)
elif error_code == 'LGON011':
# this kind of error is for mystical reasons,
# but until now it was headers related, it could be :
# - headers not in the right order
# - headers with value that doesn't match the one from website
# - headers missing
# - IP blacklisted
# What's next ?
raise AssertionError('Error code "LGON011" (msg:"%s")' % message)
elif error_code == 'LGON013':
self.raise_otp()
raise AssertionError('Error code "%s" (msg:"%s") not handled' % (error_code, message))
def prepare_request(self, req):
# Get all headers in alphabetical order to prevent LGON011 error
prep = super(AmericanExpressBrowser, self).prepare_request(req)
prep.headers = OrderedDict(sorted(prep.headers.items(), key=lambda i: i[0].lower()))
return prep
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def locate_browser(self, url):
# For some reason, it looks like we cannot reconnect from storage.
pass
def clear_init_cookies(self):
# Keep the device-id to prevent an SCA
for cookie in self.session.cookies:
if cookie.name == "device-id":
device = cookie
break
else:
device = None
self.session.cookies.clear()
if device:
self.session.cookies.set_cookie(device)
def register_transaction_id(self, transaction_id, now):
self.fingerprint.go(transaction_id=transaction_id)
payload = self.page.make_payload_for_s2(transaction_id, now)
self.open('https://www.cdn-path.com/s2', method="POST",
params={
't': self.page.get_t(),
'x': 1, # Not seen change yet
'sid': 'ee490b8fb9a4d570', # Not seen change yet
'tid': transaction_id,
},
files = {
'_f': payload,
},
headers = {
'Accept-Encoding': 'gzip, deflate, br',
'Host': 'www.cdn-path.com',
'Origin': 'https://www.americanexpress.com',
'Referer': 'https://www.americanexpress.com/',
'Pragma': 'no-cache',
'TE': 'Trailers',
},
)
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def make_device_print(self):
d = OrderedDict()
d['version'] = "3.4.0.0_1"
d['pm_fpua'] = self.session.headers['User-Agent'] + '|5.0 (X11)|Linux x86_64'
d['pm_fpsc'] = '24|1650|498|498'
d['pm_fptw'] = ''
d['pm_fptz'] = 0
d['pm_fpln'] = 'lang=en-US|syslang=|userlang='
d['pm_fpjv'] = 0
d['pm_fpco'] = 1
d['pm_fpasw'] = ''
d['pm_fpan'] = "Netscape"
d['pm_fpacn'] = "Mozilla"
d['pm_fpol'] = 'true'
d['pm_fposp'] = ''
d['pm_fpup'] = ''
d['pm_fpsaw'] = '1920'
d['pm_fpspd'] = '24'
d['pm_fpsbd'] = ''
d['pm_fpsdx'] = ''
d['pm_fpsdy'] = ''
d['pm_fpslx'] = ''
d['pm_fpsly'] = ''
d['pm_fpsfse'] = ''
d['pm_fpsui'] = ''
d['pm_os'] = 'Linux'
d['pm_brmjv'] = 78
d['pm_br'] = 'Firefox'
d['pm_inpt'] = ''
d['pm_expt'] = ''
return (
urlencode(d,quote_via=quote) # using quote to prevent encoding space as +
# The next four character are not quoted by quote
.replace('~', "%7E")
.replace('-', "%2D")
.replace('_', "%5F")
.replace('.', "%2E")
# These replace are to remove the & and = included by urlencode
.replace('=', "%3D")
.replace('&', "%26")
)
def raise_otp(self):
self.check_interactive()
reauth = self.page.get_reauth()
self.authentication_action_id = reauth["actionId"]
self.application_id = reauth["applicationId"]
self.mfa_id = reauth["mfaId"]
self.auth_trusted = reauth["trust"]
if not self.auth_trusted:
self.logger.warning(
"We are not trusted. There could be a problem with the fingerprinting of cc.js"
)
read_auth_challenges_payload = [{
"authenticationActionId": self.authentication_action_id,
"applicationId": self.application_id,
"locale": self.locale,
}]
self.read_auth_challenges.go(json=read_auth_challenges_payload)
challenge = self.page.get_challenge()
assert challenge == "OTP", "We don't know how to handle '%s' challenge." % challenge
self.account_token = self.page.get_account_token()
methods = self.page.get_otp_methods()
delivery_payload, message = self.make_otp_delivery_payload(methods)
self.create_otp_uri.go(json=delivery_payload)
raise BrowserQuestion(
Value('otp', label=message)
)
def make_otp_delivery_payload(self, methods):
known_methods = ["SMS", "EMAIL"] # This is also our preference order.
methods = {m["deliveryMethod"]: m for m in methods}
chosen_method = None
# Select the 2FA method for this authentification.
# Search for them in the order of known_methods.
for known_method in known_methods:
chosen_method = methods.get(known_method)
if chosen_method:
break
if chosen_method is None:
assert methods != {}, "Received no challenge option"
raise AuthMethodNotImplemented(', '.join(methods.keys()))
delivery_method = chosen_method["deliveryMethod"]
delivery_payload = [{
"authenticationActionId": self.authentication_action_id,
"applicationId": self.application_id,
"accountToken": self.account_token,
"locale": self.locale,
"deliveryMethod": delivery_method,
"channelType": chosen_method["channelType"],
"channelEncryptedValue": chosen_method["channelEncryptedValue"],
}]
display_value = chosen_method["channelDisplayValue"]
if delivery_method == "EMAIL":
message = "Veuillez entrer le code d’authentification qui vous a été envoyé à l'adresse courriel %s." % display_value
else:
message = "Veuillez entrer le code d’authentification qui vous a été envoyé au %s." % display_value
return delivery_payload, message
def handle_otp(self):
update_auth_token_payload = [{
"authenticationActionId": self.authentication_action_id,
"applicationId": self.application_id,
"accountToken": self.account_token,
"locale": self.locale,
"fieldName": "OTP",
"fieldValue": self.otp,
}]
try:
self.update_auth_token.go(json=update_auth_token_payload)
pending_challenge = self.page.get_pending_challenges()
except ClientError as e:
self.drop_2fa_state()
if e.response.status_code == 400 and "UEVE008" in e.response.text:
# {"description":"Invalid Claim: Data does not match SOR","errorCode":"UEVE008"}
raise BrowserIncorrectPassword("Mauvais code lors de l'authentification forte.")
raise
if pending_challenge != "":
self.drop_2fa_state()
raise AssertionError("Multiple challenge not handled by the module yet.")
self.enrol_device()
self.tfa_login()
self.drop_2fa_state()
def drop_2fa_state(self):
self.account_token = None
self.application_id = None
self.authentication_action_id = None
self.mfa_id = None
self.auth_trusted = None
def enrol_device(self):
if self.auth_trusted:
enrol_payload = [{
"locale": self.locale,
"trust": self.auth_trusted,
"deviceName":"Accès Budget Insight pour agrégation",
}]
self.create_2fa_uri.go(json=enrol_payload)
else:
self.logger.info("Cannot enrol when we are not trusted.")
def tfa_login(self):
data = {
'request_type': "login",
'Face': 'fr_FR',
'Logon': 'Logon',
'version': 4,
'mfaId': self.mfa_id,
}
self.send_login_request(data)
@property
def locale(self):
return self.session.cookies.get_dict(domain=".americanexpress.com")['axplocale']
def iter_accounts(self):
self.currency_page.go(locale=self.locale.lower())
currency = self.page.get_currency()
self.accounts.go()
account_list = list(self.page.iter_accounts(currency=currency))
for account in account_list:
self.json_balances.go(headers={'account_tokens': account.id})
except HTTPNotFound:
self.json_periods.go(headers={'account_token': account._history_token})
periods = self.page.get_periods()
period_index = 1
if len(periods) == 1: # Recently created accounts have only one period
period_index = 0
self.json_balances2.go(date=periods[period_index], headers={'account_tokens': account.id})
self.page.fill_balances(obj=account)
def iter_history(self, account):
self.json_periods.go(headers={'account_token': account._history_token})
periods = self.page.get_periods()
today = datetime.date.today()
# TODO handle pagination
self.json_posted.go(offset=0, end=p, headers={'account_token': account._history_token})
for tr in self.page.iter_history(periods=periods):
# As the website is very handy, passing account_token is not enough:
# it will return every transactions of each account, so we
# have to match them manually
if tr._owner == account._idforJSON and tr.date <= today:
yield tr
def iter_coming(self, account):
# Coming transactions can be found in a 'pending' JSON if it exists
# ('En attente' tab on the website), as well as in a 'posted' JSON
# ('Enregistrées' tab on the website)
# "pending" have no vdate and debit date is in future
self.json_periods.go(headers={'account_token': account._history_token})
periods = self.page.get_periods()
date = parse_date(periods[0]).date()
today = datetime.date.today()
# when the latest period ends today we can't know the coming debit date
if date != today:
try:
self.json_pending.go(offset=0, headers={'account_token': account._history_token})
except ServerError as exc:
# At certain times of the month a connection might not have pendings;
# in that case, `json_pending.go` would throw a 502 error Bad Gateway
error_code = exc.response.json().get('code')
error_message = exc.response.json().get('message')
self.logger.warning('No pendings page to access to, got error %s and message "%s" instead.', error_code, error_message)
else:
for tr in self.page.iter_history(periods=periods):
if tr._owner == account._idforJSON:
tr.date = date
yield tr
# "posted" have a vdate but debit date can be future or past
self.json_posted.go(offset=0, end=p, headers={'account_token': account._history_token})
for tr in self.page.iter_history(periods=periods):
if tr.date > today or not tr.date:
if tr._owner == account._idforJSON:
yield tr