Skip to content
Commits on Source (83)
......@@ -17,9 +17,11 @@
# You should have received a copy of the GNU Affero General Public License
# along with this woob module. If not, see <http://www.gnu.org/licenses/>.
from woob.browser.exceptions import BrowserHTTPNotFound
import re
from woob.browser import PagesBrowser, URL
from .pages import RecipePage, ResultsPage
from .pages import RecipePage, ResultsPage, CommentPage
from woob.tools.compat import quote_plus
__all__ = ['SevenFiftyGramsBrowser']
......@@ -28,29 +30,26 @@
class SevenFiftyGramsBrowser(PagesBrowser):
BASEURL = 'https://www.750g.com'
search = URL('/recettes_(?P<pattern>.*).htm', ResultsPage)
comment = URL('/recipe/(?P<_id>.*)/sort/lastest/comments.json', CommentPage)
search = URL(r'/recherche/\?q=(?P<pattern>.*)&page=(?P<page>\d*)', ResultsPage)
recipe = URL('/(?P<id>.*).htm', RecipePage)
def iter_recipes(self, pattern):
try:
self.search.go(pattern=pattern.replace(' ', '_'))
except BrowserHTTPNotFound:
return []
if isinstance(self.page, ResultsPage):
return self.page.iter_recipes()
return [self.get_recipe_content()]
return self.search.go(pattern=quote_plus(pattern.encode('utf-8')), page=1).iter_recipes()
def get_recipe(self, id, recipe=None):
try:
self.recipe.go(id=id)
return self.get_recipe_content(recipe)
except BrowserHTTPNotFound:
return
self.recipe.go(id=id)
return self.get_recipe_content(recipe)
def get_comments(self, id):
m = re.match(r'.*r(\d*)', id, re.DOTALL)
if m:
_id = m.group(1)
return self.comment.go(_id=_id).get_comments()
def get_recipe_content(self, recipe=None):
recipe = self.page.get_recipe(obj=recipe)
comments = list(self.page.get_comments())
comments = self.get_comments(recipe.id)
if comments:
recipe.comments = comments
recipe.comments = list(comments)
return recipe
......@@ -19,15 +19,9 @@
from woob.capabilities.recipe import CapRecipe, Recipe
from woob.tools.backend import Module
from woob.tools.compat import unicode
from .browser import SevenFiftyGramsBrowser
import unicodedata
def strip_accents(s):
return ''.join(c for c in unicodedata.normalize('NFD', s) if unicodedata.category(c) != 'Mn')
__all__ = ['SevenFiftyGramsModule']
......@@ -45,7 +39,7 @@ def get_recipe(self, id):
return self.browser.get_recipe(id)
def iter_recipes(self, pattern):
return self.browser.iter_recipes(strip_accents(unicode(pattern)).encode('utf-8'))
return self.browser.iter_recipes(pattern)
def fill_recipe(self, recipe, fields):
if 'nb_person' in fields or 'instructions' in fields:
......
......@@ -21,9 +21,9 @@
from woob.capabilities.recipe import Recipe, Comment
from woob.capabilities.base import NotAvailable
from woob.capabilities.image import BaseImage, Thumbnail
from woob.browser.pages import HTMLPage, pagination
from woob.browser.elements import ItemElement, ListElement, method
from woob.browser.filters.standard import CleanText, Regexp, Env, CleanDecimal, Eval
from woob.browser.pages import HTMLPage, JsonPage, pagination
from woob.browser.elements import DictElement, ItemElement, ListElement, method
from woob.browser.filters.standard import CleanText, Regexp, Env, CleanDecimal, Eval, BrowserURL
from woob.browser.filters.json import Dict, NotFound
from datetime import datetime, date, time
from dateutil.parser import parse as parse_date
......@@ -47,54 +47,58 @@ class ResultsPage(HTMLPage):
@pagination
@method
class iter_recipes(ListElement):
item_xpath = '//section[has-class("c-recipe-row")]'
item_xpath = '//article/div'
def next_page(self):
return CleanText('//li[@class="suivante"]/a/@href')(self)
suivant = CleanText(
'//li[@class="pagination-item"]/span/span[@class="pagination-txt" and text()="Suivant"]',
default="")(self)
if suivant == "Suivant":
page = Env('page')(self)
return BrowserURL('search', pattern=Env('pattern'), page=int(page) + 1)(self)
class item(ItemElement):
klass = Recipe
def condition(self):
return not CleanText('./div[@class="c-recipe-row__media"]/span[@class="c-recipe-row__video"]/@class',
default=None)(self) and CleanText('./div/h2/a/@href')(self)
obj_id = Regexp(CleanText('./div[@class="card-content"]/strong/a/@href'),
'https://www.750g.com/(.*).htm')
obj_id = Regexp(CleanText('./div/h2/a/@href'),
'/(.*).htm')
obj_title = CleanText('./div[@class="card-content"]/strong/a')
obj_title = CleanText('./div/h2/a')
obj_short_description = CleanText('./div[@class="card-content"]/p[@class="card-text"]')
class obj_picture(ItemElement):
klass = BaseImage
obj_thumbnail = Eval(Thumbnail, CleanText('./div/img/@src'))
obj_thumbnail = Eval(Thumbnail,
CleanText('./div[@class="card-media-wrapper"]/div/picture/@data-srcset'))
obj_short_description = CleanText('./div/p')
class RecipePage(HTMLPage):
""" Page which contains a recipe
class CommentPage(JsonPage):
""" Page which contains a comments
"""
@method
class get_comments(ListElement):
item_xpath = '//div[has-class("c-comment__row")]'
class get_comments(DictElement):
item_xpath = "comments"
class item(ItemElement):
klass = Comment
def validate(self, obj):
return obj.id
obj_id = Dict('@id')
obj_author = Dict('author/nickname')
obj_text = Dict('content')
obj_id = CleanText('./@data-id')
obj_author = CleanText('./article/div/header/strong/span[@itemprop="author"]')
obj_text = CleanText('./article/div/div/p')
class RecipePage(HTMLPage):
""" Page which contains a recipe
"""
@method
class get_recipe(ItemElement):
klass = Recipe
def parse(self, el):
json_content = CleanText('//head/script[@type="application/ld+json"]')(el)
json_content = CleanText('(//script[@type="application/ld+json"])[1]')(el)
self.el = json.loads(json_content)
obj_id = Env('id')
......@@ -104,15 +108,12 @@ def parse(self, el):
obj_preparation_time = Time('prepTime')
def obj_nb_person(self):
return [CleanDecimal(Dict('recipeYield'), default=0)(self)]
return [CleanDecimal(Dict('recipeYield', default=0))(self)]
obj_instructions = Dict('recipeInstructions')
obj_author = Dict('author/name', default=NotAvailable)
def obj_picture(self):
img = BaseImage()
try:
img.url = self.el['image']
except KeyError:
return
img.url = self.el['image']['url']
return img
......@@ -241,7 +241,7 @@ def do_login(self):
# Means security was passed, we're logged
return
if self.config['resume'].get():
if self.config['resume'].get() or self.approval_page.is_here():
self.check_app_validation()
# we are logged
return
......@@ -326,7 +326,6 @@ def is_login(self):
self.do_login()
else:
if self.approval_page.is_here():
self.check_interactive()
self.check_app_validation()
return
raise BrowserUnavailable()
......
......@@ -46,7 +46,9 @@ class AmeliModule(Module, CapDocument):
BROWSER = AmeliBrowser
CONFIG = BackendConfig(
ValueBackendPassword('login', label='Mon numero de sécurité sociale', regexp=r'\d{13}', masked=False),
ValueBackendPassword(
'login', label='Mon numéro de sécurité sociale (13 chiffres)', regexp=r'\d{13}', masked=False
),
ValueBackendPassword('password', label='Mon code personnel', regexp=r'\S{8,50}', masked=True),
)
......
......@@ -30,7 +30,9 @@
from woob.browser.filters.html import Attr
from woob.browser.filters.json import Dict
from woob.browser.pages import LoggedPage, JsonPage, HTMLPage
from woob.capabilities.bank import Account, Transaction
from woob.capabilities.bank import (
Account, Transaction, AccountOwnerType,
)
from woob.capabilities.wealth import Investment, Pocket
from woob.capabilities.base import NotAvailable, empty
from woob.exceptions import NoAccountsException
......@@ -82,6 +84,7 @@ class item(ItemElement):
obj_balance = CleanDecimal(Dict('mtBrut'))
obj_currency = 'EUR'
obj_type = Map(Dict('typeDispositif'), ACCOUNT_TYPES, Account.TYPE_LIFE_INSURANCE)
obj_owner_type = AccountOwnerType.PRIVATE
def obj_number(self):
# just the id is a kind of company id so it can be unique on a backend but not unique on multiple backends
......
......@@ -56,7 +56,7 @@
ConfirmTransferPage, RecipientConfirmationPage, ScheduledTransfersPage,
ScheduledTransferDetailsPage,
)
from .pages.document import DocumentsPage, DownloadPage
from .pages.document import DocumentsPage, DownloadPage, DocumentDetailsPage
class AXABrowser(LoginBrowser):
......@@ -92,6 +92,17 @@ def do_login(self):
if self.password.isdigit():
self.account_space_login.go()
error_message = self.page.get_error_message()
if error_message:
is_website_unavailable = re.search(
"Veuillez nous excuser pour la gêne occasionnée",
error_message
)
if is_website_unavailable:
raise BrowserUnavailable(error_message)
if self.page.get_error_link():
# Go on information page to get possible error message
self.location(self.page.get_error_link())
......@@ -778,7 +789,7 @@ class AXAAssurance(AXABrowser):
performance_monaxa = URL(r'https://monaxaweb-gp.axa.fr/MonAxa/ContratPerformance/', PerformanceMonAxaPage)
documents_life_insurance = URL(
r'/content/espace-client/accueil/mes-documents/situations-de-contrats-assurance-vie.content-inner.din_SAVINGS_STATEMENT.html',
r'/content/espace-client/accueil/mes-documents/contrats.content-inner.din_POLICY.html',
DocumentsPage
)
documents_certificates = URL(
......@@ -793,9 +804,13 @@ class AXAAssurance(AXABrowser):
r'/content/espace-client/accueil/mes-documents/avis-d-echeance.content-inner.din_PREMIUM_STATEMENT.html',
DocumentsPage
)
document_details = URL(
r'/content/ecc-popin-cards/technical/detailed/dam-document.content-inner',
DocumentDetailsPage
)
download = URL(
r'/content/ecc-popin-cards/technical/detailed/download-document.downloadPdf.html',
r'/content/dam/axa/ecc/pdf',
DownloadPage
)
profile = URL(r'/content/ecc-popin-cards/transverse/userprofile.content-inner.html\?_=\d+', ProfilePage)
......@@ -932,9 +947,20 @@ def iter_documents(self, subscription):
yield doc
@need_login
def download_document(self, download_id):
self.download.go(data={'documentId': download_id})
return self.page.content
def download_document(self, document):
# "On request" documents are not downloadable, they are sent by physical mail
if 'onrequest-document' in document.url:
return
# These documents have a direct download URL instead of a download ID.
elif 'dam-document' in document.url:
self.location(document.url)
document_url = self.page.get_download_url()
self.location(document_url)
return self.page.content
# These documents are obtained with a generic URL and a download ID as a parameter.
elif document._download_id:
self.download.go(data={'documentId': document._download_id})
return self.page.content
@need_login
def get_profile(self):
......
......@@ -170,7 +170,7 @@ def iter_documents(self, subscription):
def download_document(self, document):
if not isinstance(document, Document):
document = self.get_document(document)
return self.browser.download_document(document._download_id)
return self.browser.download_document(document)
def iter_resources(self, objs, split_path):
if Account in objs:
......
......@@ -20,8 +20,10 @@
from __future__ import unicode_literals
from woob.browser.pages import HTMLPage, LoggedPage
from woob.browser.filters.html import Attr
from woob.browser.filters.standard import CleanText, Env, Regexp, Format, Date
from woob.browser.elements import ListElement, ItemElement, method
from woob.capabilities import NotAvailable
from woob.capabilities.bill import Document
from woob.tools.date import parse_french_date
......@@ -37,7 +39,7 @@ class item(ItemElement):
obj_id = Format(
'%s_%s',
Env('subid'),
Regexp(CleanText('./@data-module-open-link--link'), '#/details/(.*)'),
Regexp(Attr('.', 'data-module-open-link--link'), r'#/details/(.*)'),
)
obj_format = 'pdf'
# eg when formatted (not complete list):
......@@ -50,12 +52,18 @@ class item(ItemElement):
CleanText('.//div[@class="sticker-content"]//strong'),
CleanText('.//p[@class="contract-info"]'),
)
obj_date = Date(CleanText('.//p[@class="card-date"]'), parse_func=parse_french_date)
obj_date = Date(CleanText('.//p[@class="card-date"]'), parse_func=parse_french_date, default=NotAvailable)
obj_type = 'document'
obj__download_id = Regexp(CleanText('./@data-url'), r'.did_(.*?)\.')
obj_url = Attr('.', 'data-url')
obj__download_id = Regexp(Attr('.', 'data-url'), r'.[dp]id_(.*?)\.', default=None)
class DownloadPage(LoggedPage, HTMLPage):
def create_document(self):
form = self.get_form(xpath='//form[has-class("form-download-pdf")]')
form.submit()
class DocumentDetailsPage(LoggedPage, HTMLPage):
def get_download_url(self):
return Attr('//button', 'data-module-open-link--link')(self.doc)
......@@ -128,6 +128,9 @@ class AccountSpaceLogin(JsonPage):
def get_error_link(self):
return self.doc.get('informationUrl')
def get_error_message(self):
return self.doc.get('informationMessage')
class ErrorPage(PartialHTMLPage):
def on_load(self):
......
......@@ -24,7 +24,6 @@
import json
import re
from uuid import uuid4
from datetime import datetime
from collections import OrderedDict
from functools import wraps
......@@ -39,6 +38,7 @@
from woob.capabilities.base import NotAvailable, find_object
from woob.tools.capabilities.bank.investments import create_french_liquidity
from woob.tools.compat import urlparse, parse_qs
from woob.tools.date import now_as_tz
from .pages import (
LoggedOut,
......@@ -51,7 +51,7 @@
NewLoginPage, JsFilePage, AuthorizePage, LoginTokensPage, VkImagePage,
AuthenticationMethodPage, AuthenticationStepPage, CaissedepargneVirtKeyboard,
AccountsNextPage, GenericAccountsPage, InfoTokensPage, NatixisUnavailablePage,
RedirectErrorPage, BPCEPage, AuthorizeErrorPage,
RedirectErrorPage, BPCEPage, AuthorizeErrorPage, UnavailableDocumentsPage,
)
from .document_pages import BasicTokenPage, SubscriberPage, SubscriptionsPage, DocumentsPage
from .linebourse_browser import LinebourseAPIBrowser
......@@ -619,6 +619,7 @@ def iter_accounts(self, get_iban=True):
# we want the iban number or not thanks to stateful website
next_pages = []
accounts = []
owner_type = self.get_owner_type()
profile = self.get_profile()
if profile:
......@@ -639,6 +640,7 @@ def iter_accounts(self, get_iban=True):
self.go_on_accounts_list()
for a in self.page.iter_accounts(next_pages):
a.owner_type = owner_type
if owner_name:
self.set_account_ownership(a, owner_name)
......@@ -682,6 +684,7 @@ def iter_accounts(self, get_iban=True):
next_with_params=next_with_params
)
for a in accounts_iter:
a.owner_type = owner_type
self.set_account_ownership(a, owner_name)
accounts.append(a)
if not get_iban:
......@@ -692,6 +695,7 @@ def iter_accounts(self, get_iban=True):
if get_iban:
for a in accounts:
a.owner_type = owner_type
a.iban = self.get_iban_number(a)
yield a
......@@ -1052,11 +1056,11 @@ def iter_subscriptions(self):
@need_login
def iter_documents(self, subscription):
now = datetime.now()
now = now_as_tz('Europe/Paris')
# website says we can't get documents more than one year range, even if we can get 5 years
# but they tell us this overload their server
first_date = now - relativedelta(years=1)
start_date = first_date.strftime('%Y-%m-%dT00:00:00.000+00:00')
start_date = first_date.strftime('%Y-%m-%dT%H:%M:%S.000+00:00')
end_date = now.strftime('%Y-%m-%dT%H:%M:%S.000+00:00')
body = {
'inTypeRecherche': {'type': 'typeRechercheDocument', 'code': 'DEMAT'},
......@@ -1079,6 +1083,10 @@ def iter_documents(self, subscription):
self.documents_page.go(json=body, headers=self.documents_headers)
except ClientError as e:
if e.response.status_code == 400:
unavailable_page = UnavailableDocumentsPage(self, e.response)
if unavailable_page.is_here():
raise BrowserUnavailable()
body['inListeTypesDocuments'][1] = {
'typeDocument': {
# two spaces at the end of 'RLVCB ' is mandatory
......@@ -1100,10 +1108,16 @@ def download_document(self, document):
def get_current_subbank(self):
match = re.search(r'icgauth.(?P<domaine>[\.a-z]*).fr', self.url)
if match:
self.current_subbank = match['domaine']
self.current_subbank = match.group('domaine')
else:
self.current_subbank = 'banquepopulaire'
@need_login
def get_owner_type(self):
self.first_login_page.go()
if self.home_page.is_here():
return self.page.get_owner_type()
class iter_retry(object):
# when the callback is retried, it will create a new iterator, but we may already yielded
......
......@@ -44,7 +44,7 @@
HTMLPage, LoggedPage, FormNotFound, JsonPage, RawPage, XMLPage,
AbstractPage,
)
from woob.capabilities.bank import Account
from woob.capabilities.bank import Account, AccountOwnerType
from woob.capabilities.wealth import Investment
from woob.capabilities.profile import Person
from woob.capabilities.contact import Advisor
......@@ -339,6 +339,11 @@ def on_load(self):
self.browser.location(a)
class UnavailableDocumentsPage(LoggedPage, JsonPage):
def is_here(self):
return Dict('code')(self.doc) == 'service_indisponible'
class NewLoginPage(AbstractPage):
PARENT = 'caissedepargne'
PARENT_URL = 'new_login'
......@@ -734,6 +739,21 @@ def get_token(self):
args = dict(parse_qsl(v.query))
return args['token']
def get_owner_type(self):
# This link gives us the default client space/universe of the connection.
# We use this information for setting the owner_type.
url = Link('//li[@class="homeButton"]/a', default='')(self.doc)
match = re.search(r'([^/]+).aspx', url)
if match:
name = match.group(1)
if re.search('privee|particulier|fonctionnaire|jeune', name):
return AccountOwnerType.PRIVATE
elif re.search('entreprise|profession|artisan', name):
# profession liberale
# artisans commercants
return AccountOwnerType.ORGANIZATION
return NotAvailable
class GenericAccountsPage(LoggedPage, MyHTMLPage):
ACCOUNT_TYPES = {
......
......@@ -67,7 +67,7 @@ class BforbankBrowser(TwoFactorBrowser):
)
loan_history = URL('/espace-client/livret/consultation.*', LoanHistoryPage)
history = URL('/espace-client/consultation/operations/.*', HistoryPage)
coming = URL(r'/espace-client/consultation/operationsAVenir/(?P<account>\d+)$', HistoryPage)
coming = URL(r'/espace-client/consultation/operationsAVenir/(?P<account>[^/]+)$', HistoryPage)
card_history = URL('espace-client/consultation/encoursCarte/.*', CardHistoryPage)
card_page = URL(r'/espace-client/carte/(?P<account>\d+)$', CardPage)
......@@ -371,7 +371,7 @@ def get_history(self, account):
@need_login
def get_coming(self, account):
if account.type == Account.TYPE_CHECKING:
self.coming.go(account=account.id)
self.coming.go(account=account._url_code)
return self.page.get_operations()
elif account.type == Account.TYPE_CARD:
self.location(account.url.replace('operations', 'encoursCarte') + '/%s' % account._index)
......@@ -432,7 +432,7 @@ def goto_spirica(self, account):
def get_bourse_account(self, account):
owner_name = self.get_profile().name.upper().split(' ', 1)[1]
self.bourse_login.go(id=account.id) # "login" to bourse page
self.location(account.url)
self.bourse.go()
assert self.bourse.is_here()
......
......@@ -170,8 +170,13 @@ class item(ItemElement):
obj_type = Map(Regexp(Field('label'), r'^([^ ]*)'), TYPE, default=Account.TYPE_UNKNOWN)
def obj_url(self):
return urljoin(self.page.url, CleanText('./@data-href')(self))
path = Attr('.', 'data-href')(self)
if path == '/espace-client/titres':
path = Attr('.', 'data-urlcatitre')(self)
return urljoin(self.page.url, path)
# Looks like a variant of base64: ASKHJLHWF272jhk22kjhHJQ1_ufad892hjjj122j348=
obj__url_code = Regexp(Field('url'), r'/espace-client/consultation/operations/(.*)', default=None)
obj__card_balance = CleanDecimal('./td//div[@class="synthese-encours"][last()]/div[2]', default=None)
def obj_balance(self):
......
......@@ -308,8 +308,8 @@ def load_state(self, state):
super(BNPParibasBrowser, self).load_state(state)
def change_pass(self, oldpass, newpass):
res = self.open('/identification-wspl-pres/grille?accessible=false')
url = '/identification-wspl-pres/grille/%s' % res.json()['data']['idGrille']
res = self.open('/mcs-wspl/rpc/grille?accessible=false')
url = '/mcs-wspl/rpc/grille/%s' % res.json()['data']['idGrille']
keyboard = self.open(url)
vk = BNPKeyboard(self, keyboard)
data = {}
......@@ -353,13 +353,20 @@ def iter_accounts(self):
raise ActionNeeded("Veuillez réaliser l'authentification forte depuis votre navigateur.")
ibans = self.page.get_ibans_dict()
is_pro = {}
# This page might be unavailable.
try:
ibans.update(self.transfer_init.go(json={'modeBeneficiaire': '0'}).get_ibans_dict('Crediteur'))
self.transfer_init.go(json={'modeBeneficiaire': '0'})
ibans.update(self.page.get_ibans_dict('Crediteur'))
is_pro = self.page.get_pro_accounts('Crediteur')
except (TransferAssertionError, AttributeError):
pass
accounts = list(self.accounts.go().iter_accounts(ibans=ibans))
self.accounts.go()
accounts = list(self.page.iter_accounts(
ibans=ibans,
is_pro=is_pro,
))
self.market_syn.go(json={})
market_accounts = self.page.get_list() # get the list of 'Comptes Titres'
checked_accounts = set()
......
......@@ -21,6 +21,7 @@
from __future__ import unicode_literals
from collections import Counter
import re
from io import BytesIO
from decimal import Decimal
......@@ -30,7 +31,7 @@
from woob.browser.filters.json import Dict
from woob.browser.filters.standard import (
Format, Eval, Regexp, CleanText, Date, CleanDecimal,
Field, Coalesce, Map, MapIn, Env, Currency, FromTimestamp,
Field, Coalesce, Map, MapIn, Env, Currency, FromTimestamp, Lower,
)
from woob.browser.filters.html import TableCell
from woob.browser.pages import JsonPage, LoggedPage, HTMLPage, PartialHTMLPage, RawPage
......@@ -40,6 +41,7 @@
AddRecipientBankError, AccountOwnership,
Emitter, EmitterNumberType, TransferStatus,
TransferDateType, TransferInvalidAmount,
AccountOwnerType,
)
from woob.capabilities.wealth import (
Investment, MarketOrder, MarketOrderDirection,
......@@ -49,7 +51,7 @@
from woob.capabilities.profile import Person, ProfileMissing
from woob.exceptions import (
BrowserUnavailable, AppValidationCancelled, AppValidationExpired,
AuthMethodNotImplemented,
AuthMethodNotImplemented, BrowserPasswordExpired,
)
from woob.tools.capabilities.bank.iban import rib2iban, rebuild_rib, is_iban_valid
from woob.tools.capabilities.bank.transactions import FrenchTransaction, parse_with_patterns
......@@ -74,8 +76,92 @@ class TransferAssertionError(Exception):
class ConnectionThresholdPage(HTMLPage):
# WIP: Temporarily remove change pass feature
pass
NOT_REUSABLE_PASSWORDS_COUNT = 3
"""BNP disallows to reuse one of the three last used passwords."""
def make_date(self, yy, m, d):
current = datetime.now().year
if yy > current - 2000:
yyyy = 1900 + yy
else:
yyyy = 2000 + yy
return datetime(yyyy, m, d)
def looks_legit(self, password):
# the site says:
# have at least 3 different digits
if len(Counter(password)) < 3:
return False
# not the birthdate (but we don't know it)
first, mm, end = map(int, (password[0:2], password[2:4], password[4:6]))
now = datetime.now()
try:
delta = now - self.make_date(first, mm, end)
except ValueError:
pass
else:
if 10 < delta.days / 365 < 70:
return False
try:
delta = now - self.make_date(end, mm, first)
except ValueError:
pass
else:
if 10 < delta.days / 365 < 70:
return False
# no sequence (more than 4 digits?)
password = list(map(int, password))
up = 0
down = 0
for a, b in zip(password[:-1], password[1:]):
up += int(a + 1 == b)
down += int(a - 1 == b)
if up >= 4 or down >= 4:
return False
return True
def on_load(self):
msg = (
CleanText('//div[@class="confirmation"]//span[span]')(self.doc)
or CleanText('//p[contains(text(), "Vous avez atteint la date de fin de vie de votre code secret")]')(
self.doc
)
)
self.logger.warning('Password expired.')
if not self.browser.rotating_password:
raise BrowserPasswordExpired(msg)
if not self.looks_legit(self.browser.password):
# we may not be able to restore the password, so reject it
self.logger.warning('Unable to restore it, it is not legit.')
raise BrowserPasswordExpired(msg)
new_passwords = []
for i in range(self.NOT_REUSABLE_PASSWORDS_COUNT):
new_pass = ''.join(str((int(char) + i + 1) % 10) for char in self.browser.password)
if not self.looks_legit(new_pass):
self.logger.warning('One of rotating password is not legit')
raise BrowserPasswordExpired(msg)
new_passwords.append(new_pass)
current_password = self.browser.password
for new_pass in new_passwords:
self.logger.warning('Renewing with temp password')
if not self.browser.change_pass(current_password, new_pass):
self.logger.warning('New temp password is rejected, giving up')
raise BrowserPasswordExpired(msg)
current_password = new_pass
if not self.browser.change_pass(current_password, self.browser.password):
self.logger.error('Could not restore old password!')
self.logger.warning('Old password restored.')
# we don't want to try to rotate password two times in a row
self.browser.rotating_password = 0
def cast(x, typ, default=None):
......@@ -289,6 +375,25 @@ def obj_iban(self):
return iban
return None
def obj_owner_type(self):
is_pro = Map(Dict('key'), Env('is_pro')(self), default=NotAvailable)(self)
# For checking account, we can often get
# the true value from the transfer page response
if is_pro:
return AccountOwnerType.ORGANIZATION
elif is_pro != NotAvailable:
return AccountOwnerType.PRIVATE
# Loan and savings accounts "often" include
# this information in their label.
label = Lower(Field('label'))(self)
if 'professionnel' in label:
return AccountOwnerType.ORGANIZATION
elif re.search('particulier|personnel', label):
return AccountOwnerType.PRIVATE
return NotAvailable
def obj_ownership(self):
indic = Dict('titulaire/indicTitulaireCollectif', default=None)(self)
# The boolean is in the form of a string ('true' or 'false')
......@@ -360,6 +465,13 @@ def get_ibans_dict(self, account_type):
return dict([(a['ibanCrypte'], a['iban'])
for a in self.path('data.infoVirement.listeComptes%s.*' % account_type)])
def get_pro_accounts(self, account_type):
comptes = self.path('data.infoVirement.listeComptes%s.*' % account_type)
return {
compte['ibanCrypte']: compte.get('indicComptePro', False)
for compte in comptes
}
def can_transfer_to_recipients(self, origin_account_id):
return next(
a['eligibleVersBenef']
......
......@@ -19,10 +19,12 @@
from __future__ import unicode_literals
import requests
from woob.browser import AbstractBrowser, LoginBrowser, URL, need_login
from woob.capabilities.bank import Account
from woob.capabilities.wealth import Per
from woob.exceptions import BrowserIncorrectPassword, ActionNeeded
from woob.exceptions import BrowserIncorrectPassword, ActionNeeded, BrowserUnavailable
from .pages import (
LoginPage, LoginStep2Page, LoginErrorPage, ProfilePage,
......@@ -73,7 +75,18 @@ def do_login(self):
raise AssertionError('Unknown error on LoginErrorPage: %s.' % message)
assert self.login_second_step.is_here(), 'Should be on the page of the second step of login'
self.page.send_form()
# for some users the website is unavailable
# we are in a redirection loop on '/Account/LogOff'
try:
max_redirects = self.session.max_redirects
self.session.max_redirects = 10
self.page.send_form()
except requests.exceptions.TooManyRedirects:
raise BrowserUnavailable()
finally:
# set the redirection limit back to default
self.session.max_redirects = max_redirects
if self.term_page.is_here():
raise ActionNeeded()
......
......@@ -28,7 +28,9 @@
Field, Env, MapIn,
)
from woob.capabilities.base import NotAvailable, empty
from woob.capabilities.bank import Account, Transaction
from woob.capabilities.bank import (
Account, Transaction, AccountOwnerType,
)
from woob.capabilities.wealth import Investment, PerVersion
from woob.browser.filters.html import Attr
from woob.capabilities.profile import Profile
......@@ -102,6 +104,7 @@ def obj_id(self):
obj_label = CleanText('./td[2]', replace=[(' o ', ' ')])
obj_currency = Currency('./td[6]')
obj_company_name = CleanText('./td[3]')
obj_owner_type = AccountOwnerType.PRIVATE
def obj__sublabel(self):
# Use the second part of the label to determine account index
......
......@@ -141,7 +141,7 @@ def obj_type(self):
@method
class fill_account(ItemElement):
obj_balance = CleanDecimal.French('//table[contains(@class,"compteInventaire")]//tr[td[b[text()="TOTAL"]]]/td[2]')
obj_balance = CleanDecimal.French('//b[text()="TOTAL"]/ancestor::*[position()=1]/following-sibling::td[1]')
class InvestPage(RawPage):
......
......@@ -41,6 +41,7 @@
AddRecipientStep, Rate, TransferBankError, AccountOwnership, RecipientNotFound,
AddRecipientTimeout, TransferDateType, Emitter, TransactionType,
AddRecipientBankError, TransferStep, TransferTimeout,
AccountOwnerType,
)
from woob.capabilities.base import NotLoaded, empty, find_object, strict_find_object
from woob.capabilities.contact import Advisor
......@@ -48,6 +49,7 @@
from woob.tools.compat import urlsplit
from woob.tools.capabilities.bank.transactions import sorted_transactions
from woob.tools.capabilities.bank.bank_transfer import sorted_transfers
from woob.tools.capabilities.bill.documents import merge_iterators
from .pages import (
VirtKeyboardPage, AccountsPage, AsvPage, HistoryPage, AuthenticationPage,
......@@ -317,7 +319,9 @@ def init_login(self):
is_website_unavailable = re.search(
"vous pouvez actuellement rencontrer des difficultés pour accéder à votre Espace Client"
+ "|Une erreur est survenue. Veuillez réessayer ultérieurement"
+ "|Oups, Il semble qu'une erreur soit survenue de notre côté",
+ "|Maintenance en cours, merci de réessayer ultérieurement."
+ "|Oups, Il semble qu'une erreur soit survenue de notre côté"
+ "|Service momentanément indisponible",
error
)
......@@ -394,7 +398,7 @@ def get_accounts_list(self):
has_account = False
self.pro_accounts.go()
if self.pro_accounts.is_here():
accounts_list.extend(self.get_filled_accounts())
accounts_list.extend(self.get_filled_accounts(pro=True))
has_account = True
else:
# We dont want to let has_account=False if we landed on an unknown page
......@@ -468,21 +472,29 @@ def get_accounts_list(self):
self.ownership_guesser(accounts_list)
return accounts_list
def get_filled_accounts(self):
def get_filled_accounts(self, pro=False):
accounts_list = []
if pro:
owner_type = AccountOwnerType.ORGANIZATION
else:
owner_type = AccountOwnerType.PRIVATE
for account in self.page.iter_accounts():
account.owner_type = owner_type
try:
self.location(account.url)
except requests.exceptions.HTTPError as e:
# We do not yield life insurance accounts with a 404 error. Since we have verified, that
# We do not yield life insurance accounts with a 404 or 503 error. Since we have verified, that
# it is a website scoped problem and not a bad request from our part.
# 404 is the original behavior. We could remove it in the future if it does not happen again.
status_code = e.response.status_code
if (
e.response.status_code == 404
status_code in (404, 503)
and account.type == Account.TYPE_LIFE_INSURANCE
):
self.logger.warning(
'404 ! Broken link for life insurance account (%s). Account will be skipped',
account.label
'%s ! Broken link for life insurance account (%s). Account will be skipped',
status_code,
account.label,
)
continue
raise
......@@ -1153,20 +1165,37 @@ def iter_subscriptions(self):
page = self.page.submit_form(**page_info)
yield page.get_subscription()
def iter_statements(self, subscription, acctype, card_id=None):
params = {
'account': subscription._account_key,
'type': acctype,
}
if card_id:
params['creditCard'] = card_id
page = self.page.submit_form(**params)
return page.iter_documents(subid=subscription.id, statement_type=acctype)
@need_login
def iter_documents(self, subscription):
self.statements_page.go()
document_generator_list = []
r = self.open(
'/documents/comptes-doc-type',
params={'accountKey': subscription._account_key}
)
for acctype in r.json().keys():
page = self.page.submit_form(
account=subscription._account_key,
type=acctype,
)
for doc in page.iter_documents(subid=subscription.id, statement_type=acctype):
yield doc
if acctype == 'cb':
cards_id = self.open(
'/documents/comptes/cbs',
params={'accountKey': subscription._account_key}
)
for card_id in cards_id.json().keys():
document_generator_list.append(self.iter_statements(subscription, acctype, card_id))
else:
document_generator_list.append(self.iter_statements(subscription, acctype))
for doc in merge_iterators(*document_generator_list):
yield doc
self.rib_page.go()
for doc in self.page.get_document(subid=subscription.id):
......