# -*- coding: utf-8 -*-
# Copyright(C) 2017 Théo Dorée
#
# This file is part of a weboob module.
#
# This weboob module is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This weboob module is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this weboob module. If not, see .
from __future__ import unicode_literals
from datetime import date
from weboob.browser.browsers import LoginBrowser, URL, need_login, StatesMixin
from .compat.weboob_exceptions import (
BrowserIncorrectPassword, BrowserUnavailable, ImageCaptchaQuestion, BrowserQuestion, ActionNeeded,
WrongCaptchaResponse
)
from weboob.tools.value import Value
from weboob.browser.browsers import ClientError
from .pages import (
LoginPage, SubscriptionsPage, DocumentsPage, DownloadDocumentPage, HomePage, PanelPage, SecurityPage,
LanguagePage, HistoryPage
)
class AmazonBrowser(LoginBrowser, StatesMixin):
BASEURL = 'https://www.amazon.fr'
CURRENCY = 'EUR'
LANGUAGE = 'fr-FR'
L_SIGNIN = 'Identifiez-vous'
L_LOGIN = 'Connexion'
L_SUBSCRIBER = 'Nom : (.*) Modifier E-mail'
WRONGPASS_MESSAGES = [
"Votre mot de passe est incorrect",
"Saisissez une adresse e-mail ou un numéro de téléphone portable valable",
"Impossible de trouver un compte correspondant à cette adresse e-mail"
]
WRONG_CAPTCHA_RESPONSE = "Saisissez les caractères tels qu'ils apparaissent sur l'image."
login = URL(r'/ap/signin(.*)', LoginPage)
home = URL(r'/$', r'/\?language=\w+$', HomePage)
panel = URL('/gp/css/homepage.html/ref=nav_youraccount_ya', PanelPage)
subscriptions = URL(r'/ap/cnep(.*)', SubscriptionsPage)
documents = URL(r'/gp/your-account/order-history\?opt=ab&digitalOrders=1(.*)&orderFilter=year-(?P.*)',
r'https://www.amazon.fr/gp/your-account/order-history',
DocumentsPage)
download_doc = URL(r'/gp/shared-cs/ajax/invoice/invoice.html', DownloadDocumentPage)
security = URL('/ap/dcq',
'/ap/cvf/',
'/ap/mfa',
SecurityPage)
language = URL(r'/gp/customer-preferences/save-settings/ref=icp_lop_(?P.*)_tn', LanguagePage)
history = URL(r'/gp/your-account/order-history\?ref_=ya_d_c_yo', HistoryPage)
__states__ = ('otp_form', 'otp_url', 'otp_style', 'otp_headers')
STATE_DURATION = 10
otp_form = None
otp_url = None
otp_style = None
otp_headers = None
def __init__(self, config, *args, **kwargs):
self.config = config
kwargs['username'] = self.config['email'].get()
kwargs['password'] = self.config['password'].get()
super(AmazonBrowser, self).__init__(*args, **kwargs)
def locate_browser(self, state):
if '/ap/cvf/verify' not in state['url']:
# don't perform a GET to this url, it's the otp url, which will be reached by otp_form
self.location(state['url'])
def push_security_otp(self, pin_code):
res_form = self.otp_form
res_form['rememberDevice'] = ""
if self.otp_style == 'amazonDFA':
res_form['code'] = pin_code
self.location(self.otp_url, data=res_form, headers=self.otp_headers)
else:
res_form['otpCode'] = pin_code
self.location('/ap/signin', data=res_form, headers=self.otp_headers)
def handle_security(self):
otp_type = self.page.get_otp_type()
if otp_type == '/ap/signin':
# this otp will be always present until user deactivate it
raise ActionNeeded('You have enabled otp in your options, please deactivate it before synchronize')
if self.page.doc.xpath('//span[@class="a-button-text"]'):
self.page.send_code()
form = self.page.get_response_form()
self.otp_form = form['form']
self.otp_url = self.url
self.otp_style = form['style']
self.otp_headers = dict(self.session.headers)
raise BrowserQuestion(Value('pin_code', label=self.page.get_otp_message() if self.page.get_otp_message() else 'Please type the OTP you received'))
def handle_captcha(self, captcha):
self.otp_form = self.page.get_response_form()
self.otp_url = self.url
image = self.open(captcha[0]).content
raise ImageCaptchaQuestion(image)
def do_login(self):
if self.config['pin_code'].get():
# Resolve pin_code
self.push_security_otp(self.config['pin_code'].get())
if self.security.is_here() or self.login.is_here():
# Something went wrong, probably a wrong OTP code
raise BrowserIncorrectPassword('OTP incorrect')
else:
# Means security was passed, we're logged
return
if self.config['captcha_response'].get():
# Resolve captcha code
self.page.login(self.username, self.password, self.config['captcha_response'].get())
if self.security.is_here():
# Raise security management
self.handle_security()
if self.login.is_here():
msg = self.page.get_error_message()
if any(wrongpass_message in msg for wrongpass_message in self.WRONGPASS_MESSAGES):
raise BrowserIncorrectPassword(msg)
elif self.WRONG_CAPTCHA_RESPONSE in msg:
raise WrongCaptchaResponse(msg)
else:
assert False, msg
else:
return
# Change language so everything is handled the same way
self.to_english(self.LANGUAGE)
# To see if we're connected. If not, we land on LoginPage
try:
self.history.go()
except ClientError:
pass
if not self.login.is_here():
return
self.page.login(self.username, self.password)
if self.security.is_here():
# Raise security management
self.handle_security()
if self.login.is_here():
captcha = self.page.has_captcha()
if captcha and not self.config['captcha_response'].get():
self.handle_captcha(captcha)
else:
msg = self.page.get_error_message()
assert any(wrongpass_message in msg for wrongpass_message in self.WRONGPASS_MESSAGES), msg
raise BrowserIncorrectPassword(msg)
def is_login(self):
if self.login.is_here():
self.do_login()
else:
raise BrowserUnavailable()
def to_english(self, language):
# We put language in english
datas = {
'_url': '/?language=' + language.replace('-', '_'),
'LOP': language.replace('-', '_'),
}
self.language.go(method='POST', data=datas, language=language)
@need_login
def iter_subscription(self):
self.location(self.panel.go().get_sub_link())
if self.home.is_here():
if self.page.get_login_link():
self.is_login()
self.location(self.page.get_panel_link())
elif not self.subscriptions.is_here():
self.is_login()
yield self.page.get_item()
@need_login
def iter_documents(self, subscription):
year = date.today().year
old_year = year - 2
while year >= old_year:
self.documents.go(year=year)
request_id = self.page.response.headers['x-amz-rid']
for doc in self.page.iter_documents(subid=subscription.id, currency=self.CURRENCY, request_id=request_id):
yield doc
year -= 1