From 3b20a59306277318dc0e4e41ba90ea451017ce77 Mon Sep 17 00:00:00 2001 From: Quentin Defenouillere Date: Tue, 10 Mar 2020 15:06:11 +0100 Subject: [PATCH] [cragr] Remove /regions folder (old website is obsolete) --- modules/cragr/regions/__init__.py | 0 modules/cragr/regions/browser.py | 900 -------------------- modules/cragr/regions/netfinca_browser.py | 12 - modules/cragr/regions/pages.py | 984 ---------------------- modules/cragr/regions/transfer_pages.py | 402 --------- 5 files changed, 2298 deletions(-) delete mode 100644 modules/cragr/regions/__init__.py delete mode 100644 modules/cragr/regions/browser.py delete mode 100644 modules/cragr/regions/netfinca_browser.py delete mode 100644 modules/cragr/regions/pages.py delete mode 100644 modules/cragr/regions/transfer_pages.py diff --git a/modules/cragr/regions/__init__.py b/modules/cragr/regions/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/modules/cragr/regions/browser.py b/modules/cragr/regions/browser.py deleted file mode 100644 index a0c3409db4..0000000000 --- a/modules/cragr/regions/browser.py +++ /dev/null @@ -1,900 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright(C) 2012-2019 Budget Insight -# -# This file is part of a weboob module. -# -# This weboob module is free software: you can redistribute it and/or modify -# it under the terms of the GNU Lesser General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This weboob module is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public License -# along with this weboob module. If not, see . - -# yapf-compatible - -from __future__ import unicode_literals - -import re -from datetime import timedelta, datetime - -from weboob.browser import LoginBrowser, URL, need_login -from weboob.browser.url import BrowserParamURL -from weboob.browser.exceptions import ServerError, BrowserHTTPNotFound -from weboob.exceptions import BrowserIncorrectPassword, ActionNeeded -from weboob.tools.compat import urlparse -from weboob.tools.capabilities.bank.transactions import sorted_transactions -from weboob.tools.capabilities.bank.investments import create_french_liquidity -from weboob.tools.capabilities.bank.iban import is_iban_valid -from weboob.tools.date import LinearDateGuesser -from weboob.tools.value import Value - -from weboob.capabilities.base import empty, find_object -from weboob.capabilities.bank import ( - Account, AccountNotFound, RecipientInvalidLabel, AddRecipientStep, Recipient, - AddRecipientBankError, -) - -from .pages import ( - HomePage, LoginPage, LoggedOutPage, PasswordExpiredPage, PerimeterDetailsPage, PerimeterPage, RibPage, - AccountsPage, WealthPage, LoansPage, CardsPage, MultipleCardsPage, CheckingHistoryPage, - SavingsHistoryPage, OtherSavingsHistoryPage, FailedHistoryPage, PredicaRedirectionPage, - PredicaInvestmentsPage, NetfincaRedirectionPage, NetfincaLanding, NetfincaDetailsPage, NetfincaReturnPage, - NetfincaToCragr, BGPIRedirectionPage, BGPISpace, BGPIInvestmentPage, ProfilePage, -) -from .transfer_pages import ( - TransferInit, TransferPage, RecipientPage, RecipientListPage, SendSMSPage, RecipientMiscPage, -) - -from .netfinca_browser import NetfincaBrowser - -__all__ = ['CragrRegion'] - - -class CragrRegion(LoginBrowser): - # Login - home = URL(r'/$', r'/particuliers.html', HomePage) - logged_out = URL(r'.*', LoggedOutPage) # must be first to catch the page - login = URL(r'/stb/entreeBam$', LoginPage) - password_expired = URL( - r'/stb/entreeBam\?sessionSAG=(?P[^&]+)&stbpg=pagePU&act=Interstitielle', - PasswordExpiredPage - ) - - # Perimeters - perimeter_details_page = URL(r'/stb/.*act=Perimetre.*', PerimeterDetailsPage) - perimeter_page = URL(r'/stb/.*act=ChgPerim.*', PerimeterPage) - - # Credit cards - cards_page = URL(r'/stb/.*fwkaid=.*fwkpid=.*', r'/stb.*(fwkaction|sessionAPP)=Cartes.*', CardsPage) - multiple_cards_page = URL(r'/stb/.*fwkaid=.*fwkpid=.*', r'/stb.*(fwkaction|sessionAPP)=Cartes.*', MultipleCardsPage) - - # History & account details - rib_page = URL(r'.*action=Rib.*', RibPage) - checking_history = URL(r'/stb/.*fwkaid=.*fwkpid=.*', CheckingHistoryPage) - savings_history = URL(r'/stb/.*fwkaid=.*fwkpid=.*', SavingsHistoryPage) - other_savings_history = URL(r'/stb/.*fwkaid=.*fwkpid=.*', OtherSavingsHistoryPage) - failed_history = URL(r'/stb/.*fwkaid=.*fwkpid=.*', FailedHistoryPage) - - # Various investment spaces (Predica, Netfinca, BGPI) - predica_redirection = BrowserParamURL( - r'/stb/entreeBam\?sessionSAG=(?P[^&]+)&stbpg=pagePU&site=(?P[^&]+)&typeaction=reroutage_aller&sdt=(?P[^&]+)¶mpartenaire=(?P[^&]+)', - PredicaRedirectionPage - ) - predica_investments = URL( - r'https://npcprediweb.predica.credit-agricole.fr/rest/detailEpargne/contrat/', PredicaInvestmentsPage - ) - - netfinca_redirection = BrowserParamURL( - r'/stb/entreeBam\?sessionSAG=(?P[^&]+)&stbpg=pagePU&site=CATITRES&typeaction=reroutage_aller', - NetfincaRedirectionPage - ) - netfinca_details = URL( - r'https://www.cabourse.credit-agricole.fr/netfinca-titres/servlet/com.netfinca.frontcr.account.WalletVal\?nump=(?P[^&]+):(?P[^&]+)', - NetfincaDetailsPage - ) - netfinca_return = URL( - r'https://www.cabourse.credit-agricole.fr/netfinca-titres/servlet/com.netfinca.frontcr.login.ContextTransferDisconnect', - NetfincaReturnPage - ) - netfinca_landing = URL( - r'https://www.cabourse.credit-agricole.fr/netfinca-titres/servlet/com.netfinca.frontcr.navigation.AccueilBridge.*', - NetfincaLanding - ) - netfinca_to_cragr = URL(r'/stb/entreeBam\?identifiantBAM=.*', NetfincaToCragr) - - bgpi_redirection = BrowserParamURL( - r'/stb/entreeBam\?sessionSAG=(?P[^&]+)&stbpg=pagePU&site=BGPI&typeaction=reroutage_aller&sdt=BGPI¶mpartenaire=', - BGPIRedirectionPage - ) - bgpi_space = URL(r'https://bgpi-gestionprivee.credit-agricole.fr/bgpi/Logon.do.*', BGPISpace) - bgpi_investments = URL(r'https://bgpi-gestionprivee.credit-agricole.fr/bgpi/CompteDetail.do.*', BGPIInvestmentPage) - - # Transfer & Recipient - transfer_init_page = BrowserParamURL( - r'/stb/entreeBam\?sessionSAG=(?P[^&]+)&stbpg=pagePU&act=Virementssepa&stbzn=bnt&actCrt=Virementssepa', - TransferInit - ) - transfer_page = URL(r'/stb/collecteNI\?fwkaid=.*&fwkpid=.*$', TransferPage) - - recipient_list = URL(r'/stb/collecteNI\?.*&act=Vilistedestinataires.*', RecipientListPage) - recipient_page = URL( - r'/stb/collecteNI\?.*fwkaction=Ajouter.*', - r'/stb/collecteNI.*&IDENT=LI_VIR_RIB1&VIR_VIR1_FR3_LE=0&T3SEF_MTT_EURO=&T3SEF_MTT_CENT=&VICrt_REFERENCE=$', - RecipientPage - ) - recipient_misc = URL(r'/stb/collecteNI\?fwkaid=.*&fwkpid=.*$', RecipientMiscPage) - send_sms_page = URL(r'/stb/collecteNI\?fwkaid=.*&fwkpid=.*', SendSMSPage) - - # Accounts - wealth = BrowserParamURL( - r'/stb/entreeBam\?sessionSAG=(?P[^&]+)&stbpg=pagePU&act=Synthepargnes', - r'/stb/(collecteNI|entreeBam)\?fwkaid=.*&fwkpid=.*Synthepargnes.*', - WealthPage - ) - - loans = BrowserParamURL( - r'/stb/entreeBam\?sessionSAG=(?P[^&]+)&stbpg=pagePU&act=Synthcredits', - r'/stb/(collecteNI|entreeBam)\?fwkaid=.*&fwkpid=.*Synthcredits.*', - LoansPage - ) - - accounts = BrowserParamURL( - r'/stb/entreeBam\?sessionSAG=(?P[^&]+)&stbpg=pagePU&act=Synthcomptes.*', - r'/stb/entreeBam\?sessionSAG=(?P[^&]+)&stbpg=pagePU&act=Releves.*', - r'/stb/(collecteNI|entreeBam)\?fwkaid=.*&fwkpid=.*Synthcomptes.*', - r'/stb/.*fwkaid=.*fwkpid=.*', - AccountsPage - ) - - # Profile - profile = BrowserParamURL( - r'/stb/entreeBam\?sessionSAG=(?P[^&]+)&stbpg=pagePU&act=Coordonnees', ProfilePage - ) - - def __init__(self, website, *args, **kwargs): - super(CragrRegion, self).__init__(*args, **kwargs) - self.BASEURL = 'https://%s' % website - self.ORIGIN_URL = self.BASEURL - self.website = website - self.session_value = None - self.cragr_code = None - self.perimeters = [] - - # Netfinca browser: - self.weboob = kwargs.pop('weboob') - dirname = self.responses_dirname - if dirname: - dirname += '/netfinca' - self.netfinca = NetfincaBrowser( - '', '', logger=self.logger, weboob=self.weboob, responses_dirname=dirname, proxy=self.PROXIES - ) - - def deinit(self): - super(CragrRegion, self).deinit() - self.netfinca.deinit() - - def do_login(self): - if not self.username or not self.password: - raise BrowserIncorrectPassword() - - # Re-set the BASEURL to the origin URL in case of logout - self.BASEURL = self.ORIGIN_URL - - # From the home page, fetch the login url to go to login page - login_url = self.home.go().get_login_url() - assert login_url, "L'adresse URL %s n'est pas gérée actuellement." % self.ORIGIN_URL - - parsed_url = urlparse(login_url) - self.BASEURL = '%s://%s' % (parsed_url.scheme, parsed_url.netloc) - - # Go to login page and POST the username - login_data = { - 'CCPTE': self.username, - 'urlOrigine': self.ORIGIN_URL, - 'typeAuthentification': 'CLIC_ALLER', - 'situationTravail': 'BANCAIRE', - 'origine': 'vitrine', - 'matrice': 'true', - 'canal': 'WEB', - } - self.login.go(data=login_data) - assert self.login.is_here() - - # POST the password and fetch the URL after login - self.page.submit_password(self.username, self.password) - url_after_login = self.page.get_accounts_url() - - # For some connections, the first session_value is contained in the URL - # after login, so we must set it before going to the accounts page. - m = re.search(r'sessionSAG=([^&]+)', url_after_login) - if m: - self.session_value = m.group(1) - - # In case of wrongpass, instead of a URL, the node will contain a message such as - # 'Votre identification est incorrecte, veuillez ressaisir votre numéro de compte et votre code d'accès' - if not url_after_login.startswith('https'): - raise BrowserIncorrectPassword(url_after_login) - - # The session value is necessary for correct navigation. - self.location(url_after_login) - - self.accounts.go() - assert self.accounts.is_here() - - # No need to get perimeters in case of re-login - if not self.perimeters: - self.get_all_perimeters() - - def access_perimeter_details(self): - params = { - 'sessionSAG': self.session_value, - 'stbpg': 'pagePU', - 'act': 'Perimetre', - } - self.login.go(params=params) - - def switch_perimeter(self): - params = { - 'sessionSAG': self.session_value, - 'stbpg': 'pagePU', - 'act': 'ChgPerim', - 'typeaction': 'ChgPerim', - 'stbzn': 'global', - } - self.login.go(params=params) - - def get_all_perimeters(self): - # Multi-perimeters connections have an 'Espace Autres Comptes' button - if self.page.no_other_perimeter(): - self.logger.warning('This connection has only 1 perimeter.') - self.perimeters = ['main'] - return - # If the button exists, go to the perimeters details: - self.access_perimeter_details() - if self.page.has_two_perimeters(): - self.logger.warning('This connection has 2 perimeters.') - self.perimeters.append(self.page.get_perimeter_name()) - self.accounts.stay_or_go() - self.access_perimeter_details() - self.switch_perimeter() - self.perimeters.append(self.page.get_perimeter_name()) - else: - self.logger.warning('This connection has multiple perimeters.') - self.perimeters.append(self.page.get_perimeter_name()) - for perimeter in self.page.get_multiple_perimeters(): - self.accounts.go() - self.access_perimeter_details() - perimeter_url = self.page.get_perimeter_url(perimeter) - if perimeter_url: - self.location(perimeter_url) - self.switch_perimeter() - if self.page.broken_perimeter(): - # Broken perimeters cause logouts, there is no way - # to predict that a perimeter will be broken before - # accessing it so we raise ActionNeeded to warn the user. - raise ActionNeeded( - '''Le périmètre "%s" n'est pas accessible et provoque l'expiration de la session. - Merci de contacter votre agence Crédit Agricole pour résoudre ce problème.''' % perimeter - ) - self.perimeters.append(self.page.get_perimeter_name()) - else: - self.logger.warning('Perimeter %s has no URL, this perimeter will be skipped.', perimeter) - - @need_login - def iter_accounts(self): - ''' - Each perimeter has 3 accounts pages: Regular, Wealth/Savings and Loans. - We must handle two different perimeter cases: - - Unique perimeter: we already are on the accounts page, simply - return the unique perimeter accounts. - - Multiple perimeters: visit all perimeters one by one and return all accounts. - ''' - accounts_list = [] - - # Sometimes the URL of the page after login has a session_value=None, - # so we must set it correctly otherwise the next requests will crash. - if not self.session_value: - m = re.search(r'sessionSAG=([^&]+)', self.url) - if m: - self.session_value = m.group(1) - - if len(self.perimeters) == 1: - self.accounts.stay_or_go() - for account in self.iter_perimeter_accounts(iban=True, all_accounts=True): - account._perimeter = 'main' - accounts_list.append(account) - else: - for perimeter in self.perimeters: - # Ignore perimeters with empty labels, they are unaccessible even on the website - if perimeter: - self.go_to_perimeter(perimeter) - for account in self.iter_perimeter_accounts(iban=True, all_accounts=True): - account._perimeter = perimeter - accounts_list.append(account) - - # Do not return accounts with empty balances or invalid IDs - valid_accounts = [] - for account in accounts_list: - if empty(account.balance): - self.logger.warning( - 'Account %s %s will be skipped because it has no balance.', account.label, account.id - ) - else: - valid_accounts.append(account) - - return valid_accounts - - @need_login - def iter_perimeter_regular_accounts(self, iban): - unique_ids = set() - self.accounts.stay_or_go() - self.page.set_cragr_code() - for account in self.page.iter_accounts(): - self.accounts.stay_or_go() - if iban and account._form: - # Refresh account form in case it expired - refreshed_account = find_object(self.page.iter_accounts(), id=account.id) - account.iban = self.get_account_iban(refreshed_account._form) - - if account.id not in unique_ids: - # Do not yield accounts with duplicate IDs - unique_ids.add(account.id) - yield account - - @need_login - def iter_perimeter_accounts(self, iban, all_accounts): - ''' - In order to use this method, we must pass the 3 accounts URLs: Regular, Wealth and Loans. - Accounts may appear on several URLs: we must check for duplicates before adding to cragr_accounts. - Once we fetched all cragr accounts, we go to the Netfinca space to get Netfinca accounts. - If there are account duplicates, we preferably yield the Netfinca version because it is more - complete ; in addition, Netfinca may contain accounts that do not appear on the cragr website. - ''' - # Regular accounts (Checking & Savings) - cragr_accounts = list(self.iter_perimeter_regular_accounts(iban)) - - # Wealth accounts (PEA, Market, Life Insurances, PERP...) - self.wealth.go() - wealth_accounts = [] - if not self.wealth.is_here(): - # Sometimes we land on an error page so we try again: - self.logger.warning('Failed to access wealth page, trying a second time') - self.wealth.go() - assert self.wealth.is_here(), 'We failed to go to the wealth accounts page twice.' - - # We first store the wealth accounts in a list because we - # must avoid requests to BGPI during account pagination - for account in self.page.iter_wealth_accounts(): - if account.id not in [a.id for a in cragr_accounts] and account.id != '0': - wealth_accounts.append(account) - - for account in wealth_accounts: - if all_accounts and account.url == 'BGPI': - # Accounts from the BGPI space require going - # to the BGPI space to get account details - self.bgpi_redirection.go() - bgpi_url = self.page.get_bgpi_url() - if bgpi_url: - self.location(bgpi_url) - account.balance, account.currency, account.label, account.url = self.page.get_account_details(account.id) - if account.type == Account.TYPE_UNKNOWN: - BGPI_TYPES = { - 'VENDOME OPTIMUM EURO': Account.TYPE_LIFE_INSURANCE, - } - account.type = BGPI_TYPES.get(account.label, Account.TYPE_UNKNOWN) - - if account.type == Account.TYPE_UNKNOWN: - # BGPI accounts must be typed in order to fetch their investments - self.logger.warning( - 'Account %s is untyped: please add "%s" to the BGPI_TYPES dictionary.', - account.id, - account.label - ) - # Go back to the main Cragr website afterwards - self.wealth.go() - - # Sometimes the balance is not displayed here, so when possible, - # we go to the account details to fetch it - if all_accounts and empty(account.balance) and account.url and 'fwkaid' in account.url: - self.location(account.url) - account.balance = self.page.get_account_balance() - - cragr_accounts.append(account) - - # Loans & revolving credits - self.loans.go() - if not self.loans.is_here(): - # Sometimes we land on an error page so we try again: - self.logger.warning('Failed to access loans page, trying a second time') - self.loans.go() - assert self.loans.is_here(), 'We failed to go to the loans accounts page twice.' - - for loan in self.page.iter_loans(): - if loan.id not in [a.id for a in cragr_accounts]: - cragr_accounts.append(loan) - - # Deferred cards - self.accounts.go() - for card in self.iter_deferred_cards(cragr_accounts): - if card.id not in [a.id for a in cragr_accounts]: - cragr_accounts.append(card) - - # This method is also used to update the account forms - # but there is no need to go to Netfinca in this case - if all_accounts: - perimeter_accounts = [] - for netfinca_account in self.get_netfinca_accounts(): - netfinca_account.number = netfinca_account.id - netfinca_account.url = 'CATITRES' - netfinca_account._form = None - - # For PEA accounts, we must go to the PEA detail and fetch the balance - # without liquidities because they are already on the DAV PEA: - if netfinca_account.type == Account.TYPE_PEA and netfinca_account.label != 'DAV PEA': - self.netfinca_details.go(account_id=netfinca_account.id, code=self.cragr_code) - netfinca_account.balance = self.page.get_balance() - - perimeter_accounts.append(netfinca_account) - - for cragr_account in cragr_accounts: - if cragr_account.id not in [a.id for a in perimeter_accounts]: - perimeter_accounts.append(cragr_account) - else: - perimeter_accounts = cragr_accounts - - return perimeter_accounts - - @need_login - def get_account_iban(self, form): - form.submit() - rib_url = self.page.get_rib_url() - if rib_url: - self.location(rib_url) - assert self.rib_page.is_here(), 'RIB URL led to an unhandled page.' - return self.page.get_iban() - - @need_login - def get_netfinca_accounts(self): - try: - self.netfinca_redirection.go() - except BrowserHTTPNotFound: - pass - else: - if self.page.no_netfinca_access(): - # This perimeter has no available Netfinca space - return - url = self.page.get_url() - if 'netfinca' in url: - self.location(url) - self.netfinca.session.cookies.update(self.session.cookies) - self.netfinca.accounts.go() - for account in self.netfinca.iter_accounts(): - yield account - self.return_from_netfinca() - - @need_login - def return_from_netfinca(self): - # If we do not POST the return form correctly, we will be logged out. - self.netfinca_return.go().return_from_netfinca() - - @need_login - def iter_deferred_cards(self, perimeter_accounts): - cards_list = [] - for card_link, parent_account in self.page.get_cards_parameters(): - self.page.go_to_card(card_link) - if self.accounts.is_here(): - self.logger.warning( - 'Could not access card details for parent account %s, it will be skipped.', parent_account - ) - continue - if self.multiple_cards_page.is_here(): - # There are multiple credit cards on this account - card_parent = find_object(perimeter_accounts, id=parent_account) - for card in self.page.iter_multiple_cards(): - card.parent = card_parent - card._card_link = card_link - cards_list.append(card) - elif self.cards_page.is_here(): - # There is only one credit card for this account - card = self.page.get_unique_card() - card.parent = find_object(perimeter_accounts, id=parent_account) - card._card_link = card_link - cards_list.append(card) - self.accounts.stay_or_go() - - return cards_list - - def go_to_perimeter(self, perimeter): - ''' - This method enables correct navigation between the perimeters. - The behavior is really sensitive: for example, if you call - switch_perimeter() whereas you are already on the correct perimeter, - all the account forms will systematically fail. - ''' - if len(self.perimeters) == 1: - # There is only one perimeter, no need to switch. - return - elif len(self.perimeters) == 2: - self.accounts.stay_or_go() - if perimeter == self.page.get_perimeter_name(): - # We are already on the correct perimeter. - return - else: - # Going to the other perimeter. - self.access_perimeter_details() - self.switch_perimeter() - else: - # This connection has multiple perimeters. - self.accounts.stay_or_go() - if perimeter == self.page.get_perimeter_name(): - # We are already on the correct perimeter. - return - self.access_perimeter_details() - perimeter_name = perimeter.split(':')[1].strip() - perimeter_url = self.page.get_perimeter_url(perimeter_name) - if perimeter_url: - self.location(perimeter_url) - self.switch_perimeter() - else: - self.logger.warning('No available link for perimeter %s: this perimeter will be skipped.', perimeter) - - @need_login - def iter_history(self, account, coming=False): - handled_history_types = ( - Account.TYPE_CHECKING, - Account.TYPE_CARD, - Account.TYPE_SAVINGS, - Account.TYPE_PEA, - ) - if account.type not in handled_history_types: - self.unhandled_method(account.id) - return - - if account.type == Account.TYPE_CARD: - self.go_to_perimeter(account._perimeter) - self.accounts.go() - self.page.go_to_card(account._card_link) - - assert (self.cards_page.is_here() or self.multiple_cards_page.is_here()), \ - 'Failed to reach card details for card %s.' % account.id - - if self.multiple_cards_page.is_here(): - # We need to go to the correct card transactions with its number. - card_url = self.page.get_transactions_link(account._raw_number) - self.location(card_url) - - # When there are several future coming summaries, - # we must skip the ongoing one but fetch the other ones - # even if they are in the future. - ongoing_coming = self.page.get_ongoing_coming() - if not ongoing_coming: - # This card has no available history or coming. - return - - card_transactions = [] - latest_date = None - for tr in self.page.get_card_transactions(latest_date, ongoing_coming): - card_transactions.append(tr) - - if not card_transactions: - return - - # Pagination: we must fetch the date of the last transaction - # because the summary of next transactions may not - # be available on the next page - latest_date = card_transactions[-1].date - next_page_url = self.page.get_next_page() - while next_page_url: - self.location(next_page_url) - for tr in self.page.get_card_transactions(latest_date, ongoing_coming): - card_transactions.append(tr) - next_page_url = self.page.get_next_page() - - for tr in sorted_transactions(card_transactions): - yield tr - return - - # Transactions of accounts without form/url or with 'CATITRES' and 'bgpi' in url cannot be handled. - if not account._form and (not account.url or 'CATITRES' in account.url or 'bgpi' in account.url): - self.unhandled_method(account.id) - return - - # Access acount details: - if account.url: - # Refresh the session_value before going to the account URL - new_session_value = 'sessionSAG=' + self.session_value - updated_url = re.sub(r'sessionSAG=([^&]+)', new_session_value, account.url) - self.location(updated_url) - - elif account._form: - # We cannot use forms if we are not on the account's perimeter: - # we need to go to the correct perimeter and refresh forms. - # The form submission sometimes fails so we try several - # times until we get to the account history page. - for form in range(3): - self.accounts.stay_or_go() - self.go_to_perimeter(account._perimeter) - - # Only fetch the perimeter's regular accounts (Checking & Savings) - # No need to go to Wealth, Loans or Netfinca for transactions - refreshed_account = find_object( - self.iter_perimeter_regular_accounts(iban=False), AccountNotFound, id=account.id - ) - refreshed_account._form.submit() - if self.failed_history.is_here(): - self.logger.warning('Form submission failed to reach the account history, we try again.') - continue - break - - # 3 types of history pages were identified so far - if not ( - self.checking_history.is_here() or self.savings_history.is_here() or self.other_savings_history.is_here() - ): - self.unhandled_method(account.id) - - date_guesser = LinearDateGuesser(date_max_bump=timedelta(30)) - for tr in self.page.iter_history(date_guesser=date_guesser): - yield tr - - @need_login - def iter_investment(self, account): - if account.balance == 0: - return - - handled_invest_accounts = ( - Account.TYPE_MARKET, - Account.TYPE_PEA, - Account.TYPE_LIFE_INSURANCE, - Account.TYPE_CAPITALISATION, - Account.TYPE_PERP, - Account.TYPE_PERCO, - ) - if account.type not in handled_invest_accounts: - self.unhandled_method(account.id) - return - - if account.label == 'DAV PEA': - # 'PEA Espèces' - yield create_french_liquidity(account.balance) - return - - if account.url: - if 'PREDICA' in account.url: - # Fetch investments on Predica space - for inv in self.get_predica_investments(account): - yield inv - - elif 'CATITRES' in account.url: - # Fetch investments on Netfinca space - for inv in self.get_netfinca_investments(account): - yield inv - - elif 'bgpi' in account.url: - # Fetch investments on BGPI space - self.location(account.url) - if self.bgpi_investments.is_here(): - for inv in self.page.iter_investments(): - yield inv - - # Go back to the main Cragr website afterwards - self.accounts.stay_or_go() - - def get_predica_investments(self, account): - # We need to extract the account values from a string that has the format - # "javascript:lancerPuPartenaireParam('PREDICA2','CONTRAT','96732184641');" - m = re.search(r'\((.*)\)', account.url) - if m: - self.go_to_perimeter(account._perimeter) - values = m.group(1).replace("'", "").split(',') - try: - self.predica_redirection.go(website=values[0], sdt=values[1], partenaire=values[2]) - except ServerError: - self.logger.warning('Server returned error when fetching investments for account id %s', account.id) - else: - self.predica_investments.go() - return self.page.iter_investments() - self.logger.warning('Could not reach the investments for account %s', account.id) - return [] - - def get_netfinca_investments(self, account): - self.go_to_perimeter(account._perimeter) - try: - self.netfinca_redirection.go() - except BrowserHTTPNotFound: - pass - else: - url = self.page.get_url() - if 'netfinca' in url: - self.location(url) - self.netfinca.session.cookies.update(self.session.cookies) - self.netfinca.accounts.go() - investments = [] - for inv in self.netfinca.iter_investments(account): - if inv.code == 'XX-liquidity' and account.type == Account.TYPE_PEA: - # Liquidities are already fetched on the "PEA espèces" - continue - investments.append(inv) - self.return_from_netfinca() - return investments - - self.logger.warning('Could not reach the investments for account %s', account.id) - return [] - - def unhandled_method(self, account_id): - # This method avoids code duplication for all accounts - # that have no available history or investments. - self.logger.warning('This method is not handled for account %s.', account_id) - raise NotImplementedError() - - @need_login - def iter_transfer_recipients(self, account): - # perimeters have their own recipients - self.go_to_perimeter(account._perimeter) - self.transfer_init_page.go() - - if self.page.get_error() == 'Fonctionnalité Indisponible': - self.accounts.go() - return - - for emitter_acc in self.page.iter_emitters(): - if emitter_acc.id == account.id: - break - else: - # couldn't find the account as emitter - return - - # set of recipient id to not return or already returned - seen = set([account.id]) - for rcpt in self.page.iter_recipients(): - if (rcpt.id in seen) or (rcpt.iban and not is_iban_valid(rcpt.iban)): - # skip seen recipients and recipients with invalid iban - continue - seen.add(rcpt.id) - yield rcpt - - @need_login - def init_transfer(self, transfer, **params): - accounts = list(self.iter_accounts()) - - assert transfer.recipient_id - assert transfer.account_id - - account = find_object(accounts, id=transfer.account_id, error=AccountNotFound) - - self.go_to_perimeter(account._perimeter) - self.transfer_init_page.go() - assert self.transfer_init_page.is_here() - - currency = transfer.currency or 'EUR' - self.page.submit_accounts(transfer.account_id, transfer.recipient_id, transfer.amount, currency) - - assert self.page.is_reason() - - if transfer.label: - label = transfer.label[:33].encode('ascii', errors='ignore').decode('ascii') - transfer.label = re.sub(r'[+!]', '', label) - - self.page.submit_more(transfer.label, transfer.exec_date) - - assert self.page.is_confirm() - res = self.page.get_transfer() - - if not res.account_iban: - for acc in accounts: - self.logger.warning('%r %r', res.account_id, acc.id) - if res.account_id == acc.id: - res.account_iban = acc.iban - break - - if not res.recipient_iban: - for acc in accounts: - if res.recipient_id == acc.id: - res.recipient_iban = acc.iban - break - return res - - @need_login - def execute_transfer(self, transfer, **params): - assert self.transfer_page.is_here() - assert self.page.is_confirm() - - self.page.submit_confirm() - self.page.check_error() - - assert self.page.is_sent() - return self.page.get_transfer() - - def build_recipient(self, recipient): - r = Recipient() - r.iban = recipient.iban - r.id = recipient.iban - r.label = recipient.label - r.category = recipient.category - r.enabled_at = datetime.now().replace(microsecond=0) - r.currency = u'EUR' - r.bank_name = recipient.bank_name - return r - - @need_login - def new_recipient(self, recipient, **params): - if not re.match(u"^[-+.,:/?() éèêëïîñàâäãöôòõùûüÿ0-9a-z']+$", recipient.label, re.I): - raise RecipientInvalidLabel('Recipient label contains invalid characters') - - if 'sms_code' in params and not re.match(r'^[a-z0-9]{6}$', params['sms_code'], re.I): - # check before send sms code because it can crash website if code is invalid - raise AddRecipientBankError("SMS code %s is invalid" % params['sms_code']) - - # avoid `iter_accounts` if there is only one perimeter - if len(self.perimeters) > 1: - accounts = list(self.iter_accounts()) - assert recipient.origin_account_id, 'Origin account id is mandatory for multispace' - account = find_object(accounts, id=recipient.origin_account_id, error=AccountNotFound) - self.go_to_perimeter(account._perimeter) - - self.transfer_init_page.go() - assert self.transfer_init_page.is_here() - - if not self.page.add_recipient_is_allowed(): - if not [rec for rec in self.page.iter_recipients() if rec.category == 'Externe']: - raise AddRecipientBankError( - 'Vous ne pouvez pas ajouter de bénéficiaires, veuillez contacter votre banque.' - ) - assert False, 'Xpath for a recipient add is not catched' - - self.location(self.page.url_list_recipients()) - # there are 2 pages from where we can add a new recipient: - # - RecipientListPage, but the link is sometimes missing - # - TransferPage, start making a transfer with a new recipient but don't complete the transfer - # but it seems dangerous since we have to set an amount, etc. - # so we implement it in 2 ways with a preference for RecipientListPage - if self.page.url_add_recipient(): - self.logger.debug('good, we can add a recipient from the recipient list') - else: - # in this case, the link was missing - self.logger.warning( - 'cannot add a recipient from the recipient list page, pretending to make a transfer in order to add it' - ) - self.transfer_init_page.go() - assert self.transfer_init_page.is_here() - - self.location(self.page.url_add_recipient()) - - if not ('sms_code' in params and self.page.can_send_code()): - self.page.send_sms() - # go to a GET page, so StatesMixin can reload it - self.accounts.go() - raise AddRecipientStep( - self.build_recipient(recipient), Value('sms_code', label='Veuillez saisir le code SMS') - ) - else: - self.page.submit_code(params['sms_code']) - - err = hasattr(self.page, 'get_sms_error') and self.page.get_sms_error() - if err: - raise AddRecipientBankError(message=err) - - self.page.submit_recipient(recipient.label, recipient.iban) - self.page.confirm_recipient() - self.page.check_recipient_error() - if self.transfer_page.is_here(): - # in this case, we were pretending to make a transfer, just to add the recipient - # go back to transfer page to abort the transfer and see the new recipient - self.transfer_init_page.go() - assert self.transfer_init_page.is_here() - - res = self.page.find_recipient(recipient.iban) - assert res, 'Recipient with iban %s could not be found' % recipient.iban - return res - - @need_login - def get_profile(self): - self.profile.go() - if self.profile.is_here(): - return self.page.get_profile() diff --git a/modules/cragr/regions/netfinca_browser.py b/modules/cragr/regions/netfinca_browser.py deleted file mode 100644 index 9f36e2c697..0000000000 --- a/modules/cragr/regions/netfinca_browser.py +++ /dev/null @@ -1,12 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright(C) 2012-2019 Budget Insight - -# yapf-compatible - -from weboob.browser import AbstractBrowser - - -class NetfincaBrowser(AbstractBrowser): - PARENT = 'netfinca' - BASEURL = 'https://www.cabourse.credit-agricole.fr' diff --git a/modules/cragr/regions/pages.py b/modules/cragr/regions/pages.py deleted file mode 100644 index 5a189cb6db..0000000000 --- a/modules/cragr/regions/pages.py +++ /dev/null @@ -1,984 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright(C) 2012-2019 Budget Insight -# -# This file is part of a weboob module. -# -# This weboob module is free software: you can redistribute it and/or modify -# it under the terms of the GNU Lesser General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This weboob module is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public License -# along with this weboob module. If not, see . - -# yapf-compatible - -from __future__ import unicode_literals - -from decimal import Decimal -import re - -from weboob.exceptions import BrowserIncorrectPassword, ActionNeeded, BrowserPasswordExpired -from weboob.browser.pages import HTMLPage, LoggedPage, JsonPage, FormNotFound, pagination -from weboob.browser.elements import ListElement, TableElement, DictElement, ItemElement, method - -from weboob.capabilities import NotAvailable -from weboob.capabilities.profile import Person -from weboob.capabilities.bank import Account, Loan, Investment -from weboob.tools.capabilities.bank.transactions import FrenchTransaction -from weboob.tools.capabilities.bank.investments import is_isin_valid -from weboob.tools.date import LinearDateGuesser, parse_french_date - -from weboob.browser.filters.standard import ( - CleanText, CleanDecimal, Format, Regexp, Map, Field, Currency, - Date, DateGuesser, Eval, Env, Lower, Coalesce, -) -from weboob.browser.filters.json import Dict -from weboob.browser.filters.html import ( - TableCell, Link, Attr, -) - - -def float_to_decimal(f): - return Decimal(str(f)) - - -class Transaction(FrenchTransaction): - PATTERNS = [ - (re.compile(r'^Retrait Au Distributeur.*'), FrenchTransaction.TYPE_WITHDRAWAL), - (re.compile(r'^Virement.*((?P
\d{2})/(?P\d{2})/(?P\d+))?$'), FrenchTransaction.TYPE_TRANSFER), - (re.compile(r'^Cheque.*'), FrenchTransaction.TYPE_CHECK), - (re.compile(r'^Remise De Cheque.*'), FrenchTransaction.TYPE_DEPOSIT), - (re.compile(r'^Frais.*'), FrenchTransaction.TYPE_BANK), - (re.compile(r'^Interets Crediteurs.*'), FrenchTransaction.TYPE_BANK), - (re.compile(r'^Cotisation.*'), FrenchTransaction.TYPE_BANK), - (re.compile(r'^Prelevt.*'), FrenchTransaction.TYPE_ORDER), - (re.compile(r'^Prelevmnt.*'), FrenchTransaction.TYPE_ORDER), - (re.compile(r'^Prelevement.*'), FrenchTransaction.TYPE_ORDER), - ( - re.compile(r'^Prelevement Carte.*(?P
\d{2})/(?P\d{2})$', re.IGNORECASE), - FrenchTransaction.TYPE_CARD_SUMMARY - ), - (re.compile(r'^Remise Carte.*'), FrenchTransaction.TYPE_CARD), - (re.compile(r'^Paiement Par Carte.*(?P
\d{2})/(?P\d{2})$'), FrenchTransaction.TYPE_CARD), - (re.compile(r'^Remboursement De Pret.*'), FrenchTransaction.TYPE_LOAN_PAYMENT), - (re.compile(r'^Versement.*'), FrenchTransaction.TYPE_DEPOSIT), - ] - - -class CragrPage(HTMLPage): - ENCODING = 'iso8859-15' - ''' - The on_load() automatically updates the session_value for all Cragr pages - to avoid being logged out by doing requests with an expired session_value. - This is essential for example when coming back from the Netfinca space. - ''' - def on_load(self): - new_session_value = Regexp( - CleanText('//script[@language="JavaScript"][contains(text(), "idSessionSag")]'), - r'idSessionSag = "([^"]+)', - default=None - )(self.doc) - if new_session_value: - self.browser.session_value = new_session_value - - def get_perimeter_name(self): - return Lower(CleanText('//div[@id="libPerimetre_2"]//span[@class="textePerimetre_2"]', default=''))(self.doc) - - def get_rib_url(self): - rib_nodes = self.doc.xpath('//a[text()="Edition de RIB"]/@href') - if rib_nodes: - m = re.search(r"javascript:ouvrePOPUP\('(.*)',", rib_nodes[0]) - if m: - return m.group(1) - return None - - def get_account_balance(self): - return CleanDecimal.French('//tr[td[contains(text(), "Solde")]]//td[2]', default=NotAvailable)(self.doc) - - -class HomePage(CragrPage): - ''' - This page depends on the selected region. It is the first - visited page, from which we fetch the login URL from the - JavaScript in order to access LoginPage. - ''' - def get_login_url(self): - login_script = CleanText('//script[contains(text(), "acces_aux_comptes")]')(self.doc) - url_search = re.search(r'([^"]+)" \|\|', login_script) - if url_search: - return url_search.group(1) - return None - - -class LoginPage(CragrPage): - def submit_password(self, username, password): - # If there is no login_form on the page, it means the submitted login is incorrect - try: - login_form = self.get_form(name='formulaire') - except FormNotFound: - raise BrowserIncorrectPassword() - - # The 'CCCRYC2' value should always be '000000' or shorter - login_form['CCCRYC2'] = '0' * len(password) - login_form['CCCRYC'] = self.get_positions(password) - login_form.submit() - - def get_positions(self, password): - positions = {} - for position in self.doc.xpath('//table[@id="pave-saisie-code"]//td'): - value = CleanText('.')(position) - if value: - tab_index = CleanDecimal('./a/@tabindex')(position) - 1 - # Add '0' in front of single digits ('7' becomes '07', but '17' remains '17') - tab_index = str(tab_index).zfill(2) - positions[value] = tab_index - - password_positions = [positions[digit] for digit in password] - - # Submitted string has the format '17,01,15,06,10,03' - return ','.join(password_positions) - - def get_accounts_url(self): - return CleanText('//body')(self.doc) - - -class LoggedOutPage(CragrPage): - def is_here(self): - return CleanText('//form[@class="ca-forms"]//h1[text()="Fin de connexion"]')(self.doc) - - -class PasswordExpiredPage(CragrPage): - def on_load(self): - error_msg = CleanText('//fieldset//font[1]/text()', default='')(self.doc) - if 'Le code personnel que vous allez choisir' in error_msg: - raise BrowserPasswordExpired() - assert False, 'Unhandled error on PasswordExpiredPage: %s' % error_msg - - -class PerimeterDetailsPage(LoggedPage, CragrPage): - def has_two_perimeters(self): - # This message appears when there are only two perimeters. - return CleanText('//div[@id="e-doc" and contains(text(), "Périmètre en cours de chargement")]')(self.doc) - - def get_multiple_perimeters(self): - perimeters = [] - for perimeter in self.doc.xpath('//tr[@class="ca-forms"]//label[@class="gauche"]'): - perimeters.append(CleanText(perimeter)(self)) - return perimeters - - def get_perimeter_url(self, perimeter): - # We need to search for the perimeter name in the list of perimeters, - # However we must put the strings to lowercase and remove multiple spaces. - return Link( - '//p[label[contains(normalize-space(lower-case(text())), "%s")]]//a' % perimeter.lower(), default=None - )(self.doc) - - -class PerimeterPage(LoggedPage, CragrPage): - def on_load(self): - if self.doc.xpath('//div[@class="validation"]'): - # There is no complete message to fetch on the website but this node appears - # when we land on a perimeter that has never been visited before. - raise ActionNeeded( - "Certains de vos périmètres n'ont encore jamais été visités. " + - "Merci de parcourir tous les périmètres disponibles sur le site Crédit Agricole et de réaliser les réglages requis pour chaque périmètre." - ) - - def broken_perimeter(self): - error_msg = CleanText('//h1[@class="h1-erreur"]')(self.doc) - if error_msg: - return 'Connexion Indisponible' in error_msg - - -class RibPage(LoggedPage, CragrPage): - def is_here(self): - return CleanText('//b[contains(text(), "IDENTITÉ BANCAIRE")]')(self.doc) - - def get_iban(self): - return CleanText( - '//div[@id="trPagePu"]//table[2]//td[font[b[contains(text(), "IBAN")]]]//tr//b/text()', - replace=[(' ', '')], - default=NotAvailable - )(self.doc) - - -ACCOUNT_TYPES = { - 'CCHQ': Account.TYPE_CHECKING, - 'CCOU': Account.TYPE_CHECKING, - 'AUTO ENTRP': Account.TYPE_CHECKING, - 'AUTO ENTRS': Account.TYPE_CHECKING, - 'DEVISE USD': Account.TYPE_CHECKING, - 'EKO': Account.TYPE_CHECKING, - 'DEVISE CHF': Account.TYPE_CHECKING, - 'GEST IMMO': Account.TYPE_CHECKING, - 'LFDJ': Account.TYPE_CHECKING, - 'PMU': Account.TYPE_CHECKING, - 'DAV NANTI': Account.TYPE_SAVINGS, - 'LIV A': Account.TYPE_SAVINGS, - 'LIV A ASS': Account.TYPE_SAVINGS, - 'LIVCR': Account.TYPE_SAVINGS, - 'LDD': Account.TYPE_SAVINGS, - 'PEL': Account.TYPE_SAVINGS, - 'CEL': Account.TYPE_SAVINGS, - 'CEL2': Account.TYPE_SAVINGS, - 'CODEBIS': Account.TYPE_SAVINGS, - 'LJMO': Account.TYPE_SAVINGS, - 'CSL': Account.TYPE_SAVINGS, - 'CSLB5': Account.TYPE_SAVINGS, - 'LEP': Account.TYPE_SAVINGS, - 'LEF': Account.TYPE_SAVINGS, - 'TIWI': Account.TYPE_SAVINGS, - 'CSL LSO': Account.TYPE_SAVINGS, - 'CSL CSP': Account.TYPE_SAVINGS, - 'DAV TIGERE': Account.TYPE_SAVINGS, - 'CPTEXCPRO': Account.TYPE_SAVINGS, - 'CPTEXCPRO2': Account.TYPE_SAVINGS, - 'CPTEXCENT': Account.TYPE_SAVINGS, - 'DAT': Account.TYPE_DEPOSIT, - 'DATG': Account.TYPE_DEPOSIT, - 'LIS': Account.TYPE_SAVINGS, - 'PRET PERSO': Account.TYPE_LOAN, - 'P. ENTREPR': Account.TYPE_LOAN, - 'P. HABITAT': Account.TYPE_LOAN, - 'P. CONV.': Account.TYPE_LOAN, - 'PRET 0%': Account.TYPE_LOAN, - 'INV PRO': Account.TYPE_LOAN, - 'TRES. PRO': Account.TYPE_LOAN, - 'CT ATT HAB': Account.TYPE_LOAN, - 'PRET CEL': Account.TYPE_LOAN, - 'PRET PEL': Account.TYPE_LOAN, - 'PEA': Account.TYPE_PEA, - 'PEAP': Account.TYPE_PEA, - 'DAV PEA': Account.TYPE_PEA, - 'CPS': Account.TYPE_MARKET, - 'TITR': Account.TYPE_MARKET, - 'TITR CTD': Account.TYPE_MARKET, - 'PVERT VITA': Account.TYPE_PERP, - 'réserves de crédit': Account.TYPE_CHECKING, - 'prêts personnels': Account.TYPE_LOAN, - 'crédits immobiliers': Account.TYPE_LOAN, - 'ESC COM.': Account.TYPE_LOAN, - 'LIM TRESO': Account.TYPE_LOAN, - 'P.ETUDIANT': Account.TYPE_LOAN, - 'épargne disponible': Account.TYPE_SAVINGS, - 'épargne à terme': Account.TYPE_DEPOSIT, - 'épargne boursière': Account.TYPE_MARKET, - 'assurance vie et capitalisation': Account.TYPE_LIFE_INSURANCE, - 'PRED': Account.TYPE_LIFE_INSURANCE, - 'PREDI9 S2': Account.TYPE_LIFE_INSURANCE, - 'V.AVENIR': Account.TYPE_LIFE_INSURANCE, - 'FLORIA': Account.TYPE_LIFE_INSURANCE, - 'FLORIANE 2': Account.TYPE_LIFE_INSURANCE, - 'CAP DECOUV': Account.TYPE_LIFE_INSURANCE, - 'ESPACE LIB': Account.TYPE_LIFE_INSURANCE, - 'ESPACELIB3': Account.TYPE_LIFE_INSURANCE, - 'ESP LIB 2': Account.TYPE_LIFE_INSURANCE, - 'AST SELEC': Account.TYPE_LIFE_INSURANCE, - 'PRGE': Account.TYPE_LIFE_INSURANCE, - 'CONF': Account.TYPE_LIFE_INSURANCE, - 'ESPGESTCAP': Account.TYPE_CAPITALISATION, - 'ATOUT LIB': Account.TYPE_REVOLVING_CREDIT, - 'SUPPLETIS': Account.TYPE_REVOLVING_CREDIT, - 'PAGR': Account.TYPE_MADELIN, - 'ACCOR MULT': Account.TYPE_MADELIN, - 'PERASSUR': Account.TYPE_PER, -} - - -class AccountsPage(LoggedPage, CragrPage): - def no_other_perimeter(self): - return not CleanText('//a[@title="Espace Autres Comptes"]')(self.doc) - - def set_cragr_code(self): - # This security code enables access to Netfinca account details - raw_text = self.doc.xpath('//script[contains(text(), "var codeCaisse =")]')[0].text - m = re.search(r'var +codeCaisse *= *"(\d+)"', raw_text) - if m: - self.browser.cragr_code = m.group(1) - - @pagination - @method - class iter_accounts(TableElement): - head_xpath = '//table[@class="ca-table"]//tr[@class="tr-thead"]/th' - item_xpath = '''//table[@class="ca-table"]//tr[contains(@class, "autre-devise") - or contains(@class, "colcelligne") - or contains(@class, "ligne-connexe")]''' - next_page = Link('//a[@class="btnsuiteliste"]', default=None) - - col_id = 'N° de compte' - col_label = 'Type de compte' - col_value_balance = 'En valeur' - col_operation_balance = 'En opération' - col_currency = 'Devise' - - class item(ItemElement): - klass = Account - - def condition(self): - # Skip card coming lines - return 'Encours carte' not in CleanText(TableCell('label', colspan=True))(self) - - obj_id = CleanText(TableCell('id', colspan=True)) - obj_number = Field('id') - obj_label = CleanText(TableCell('label', colspan=True)) - obj_type = Map(Field('label'), ACCOUNT_TYPES, Account.TYPE_UNKNOWN) - obj_currency = Currency(TableCell('currency', colspan=True)) - obj_url = None - - # Accounts may have an 'Operations' balance or a 'Value' balance - def obj_balance(self): - value_balance = CleanText(TableCell('value_balance', default='', colspan=True))(self) - # Skip invalid balance values in the 'Value' column (for example for Revolving credits) - if value_balance not in ('', 'Montant disponible'): - return CleanDecimal.French().filter(value_balance) - return CleanDecimal.French(CleanText(TableCell('operation_balance', default='', colspan=True)))(self) - - def obj__form(self): - # Account forms look like 'javascript:fwkPUAvancerForm('Releves','frm1')' - # From this we extract the name (frm1) and fetch the form name on the page. - script = Link('.//a', default='')(TableCell('id', colspan=True)(self)[0]) - if 'javascript' in script: - form_search = re.search(r'frm\d+', script) - if form_search: - account_form = self.page.get_form(name=form_search.group(0)) - return self.page.fill_form(account_form, card=False) - return None - - def fill_form(self, form, card): - form['fwkaction'] = 'Cartes' if card else 'Releves' - form['fwkcodeaction'] = 'Executer' - return form - - def get_cards_parameters(self): - ''' - The only way to get all deferred cards is to check for - the presence of 'coming' lines within the accounts table. - However, there might several 'coming' lines for the same card, - for example if there are summaries for next month and the month after. - The 'cards_parameters' set contains pairs of (card_link, card_parent) values. - ''' - cards_parameters = set() - for coming in self.doc.xpath('//table[@class="ca-table"]//tr[contains(@class, "ligne-connexe")]'): - if coming.xpath('./preceding-sibling::tr/@class')[-1] == 'ligne-connexe': - # The preceding line was already a 'coming' so we skip this one. - continue - raw_link = Link(coming.xpath('.//a'), default=None)(self) - if not raw_link: - # Ignore coming lines without a link - continue - - assert 'javascript' in raw_link, 'No form associated' - # We extract the form name (e.g. 'frmc6') from a pattern - # such as "javascript:fwkPUAvancerForm('Cartes','frmc6')" - form_search = re.search(r"\('Cartes','(.*)'\)", raw_link) - if form_search: - card_link = form_search.group(1) - else: - # This link does not correspond to a card - continue - - # The id of the card parent account is the closest - # upper node containing an account id: - coming_info = coming.xpath('./preceding-sibling::tr') - assert coming_info, "Couldn't find card info" - parent_id = None - for regex in (r'> (\d+) ', r'\s(\d+)\s'): - m = re.search(regex, CleanText('.')(coming_info[-1])) - if m: - parent_id = m.group(1) - break - assert parent_id is not None, "Couldn't find the id of current card's parent account" - cards_parameters.add((card_link, parent_id)) - return cards_parameters - - def go_to_card(self, card_link): - try: - card_form = self.get_form(name=card_link) - self.fill_form(card_form, card=True).submit() - except FormNotFound: - assert False, 'This card has no form, please check if there is an available link.' - - -class CardsPage(LoggedPage, CragrPage): - def is_here(self): - return CleanText('//div[@class="boutons-act"]//h1[contains(text(), "Cartes - détail")]')(self.doc) - - def has_unique_card(self): - return not CleanText( - '//table[@summary]//caption[@class="ca-table caption"or @class="caption tdb-cartes-caption"]' - )(self.doc) - - @method - class get_unique_card(ItemElement): - item_xpath = '//table[@class="ca-table"][@summary]' - - klass = Account - - # Transform 'n° 4999 78xx xxxx xx72' into '499978xxxxxxxx72' - obj_number = CleanText( - '//table[@class="ca-table"][@summary]//tr[@class="ligne-impaire"]/td[@class="cel-texte"][1]', - replace=[(' ', ''), ('n°', '')] - ) - - # Card ID is formatted as '499978xxxxxxxx72MrFirstnameLastname-' - obj_id = Format( - '%s%s', - Field('number'), - CleanText('//table[@class="ca-table"][@summary]//caption[@class="caption"]//b', replace=[(' ', '')]) - ) - - # Card label is formatted as 'Carte VISA Premier - Mr M Lastname' - obj_label = Format( - '%s - %s', - CleanText('//table[@class="ca-table"][@summary]//tr[@class="ligne-impaire ligne-bleu"]/th[@id="compte-1"]'), - CleanText('//table[@class="ca-table"][@summary]//caption[@class="caption"]//b') - ) - - obj_balance = CleanDecimal(0) - obj_coming = CleanDecimal.French( - '//table[@class="ca-table"][@summary]//tr[@class="ligne-paire"]//td[@class="cel-num"]', default=0 - ) - obj_currency = Currency(Regexp(CleanText('//th[contains(text(), "Montant en")]'), r'^Montant en (.*)')) - obj_type = Account.TYPE_CARD - obj__form = None - - def get_next_page(self): - return Link('//a[@class="liennavigationcorpspage" and text()="[>]"]', default=None)(self.doc) - - def get_ongoing_coming(self): - # The title of the coming is usually 'Opérations débitées' but if - # the coming is positive, it will become 'Opérations créditées' - raw_date = Regexp( - CleanText( - '//table[@class="ca-table"]//tr[1]//b[contains(text(), "Opérations débitées") or contains(text(), "Opérations créditées")]' - ), - r'le (.*) :', - default=None - )(self.doc) - if not raw_date: - return None - return parse_french_date(raw_date).date() - - def get_card_transactions(self, latest_date, ongoing_coming): - for item in self.doc.xpath('//table[@class="ca-table"][2]//tr[td]'): - if CleanText('./td[2]/b')(item): - # This node is a summary containing the 'date' for all following transactions. - raw_date = Regexp(CleanText('./td[2]/b/text()'), r'le (.*) :')(item) - if latest_date and parse_french_date(raw_date).date() > latest_date: - # This summary has already been fetched - continue - latest_date = parse_french_date(raw_date).date() - if latest_date < ongoing_coming: - # This summary is anterior to the ongoing_coming so we create a transaction from it - tr = FrenchTransaction() - tr.date = tr.rdate = latest_date - tr.raw = tr.label = CleanText('./td[2]/b/text()')(item) - tr.amount = -CleanDecimal.French('./td[position()=last()]')(item) - tr.type = FrenchTransaction.TYPE_CARD_SUMMARY - yield tr - else: - # This node is a real transaction. - # Its 'date' is the date of the most recently encountered summary node. - tr = FrenchTransaction() - tr.date = latest_date - date_guesser = LinearDateGuesser(latest_date) - tr.rdate = tr.bdate = DateGuesser(CleanText('./td[1]//text()'), date_guesser=date_guesser)(item) - tr.label = tr.raw = CleanText('./td[2]')(item) - tr.amount = CleanDecimal.French('./td[last()]')(item) - tr.type = FrenchTransaction.TYPE_DEFERRED_CARD - yield tr - - -class MultipleCardsPage(CardsPage): - def is_here(self): - return CleanText('//div[@class="boutons-act"]//h1[contains(text(), "Cartes")]')(self.doc) - - @method - class iter_multiple_cards(ListElement): - item_xpath = '//table[@summary][caption[@class="ca-table caption"or @class="caption tdb-cartes-caption"]]' - - class item(ItemElement): - klass = Account - - def condition(self): - # Ignore cards that do not have a coming - return CleanText('.//tr[1]/td[@class="cel-num"]')(self) - - # Transform 'n° 4999 78xx xxxx xx72' into '499978xxxxxxxx72' - obj_number = CleanText('.//caption/span[@class="tdb-cartes-num"]', replace=[(' ', ''), ('n°', '')]) - # The raw number is used to access multiple cards details - obj__raw_number = CleanText('.//caption/span[@class="tdb-cartes-num"]') - - # Multiple card IDs are formatted as '499978xxxxxxxx72MrFirstnameLastname' - obj_id = Format( - '%s%s', Field('number'), CleanText('.//caption/span[@class="tdb-cartes-prop"]', replace=[(' ', '')]) - ) - - # Card label is formatted as 'Carte VISA Premier - Mr M Lastname' - obj_label = Format( - '%s - %s', - CleanText('.//caption/span[has-class("tdb-cartes-carte")]'), - CleanText('.//caption/span[has-class("tdb-cartes-prop")]') - ) - - obj_type = Account.TYPE_CARD - obj_balance = CleanDecimal(0) - obj_coming = CleanDecimal.French('.//tr[1]/td[position() = last()]', default=0) - obj_currency = Currency(Regexp(CleanText('//span[contains(text(), "Montants en")]'), r'^Montants en (.*)')) - obj__form = None - - def get_transactions_link(self, raw_number): - # We cannot use Link() because the @href attribute contains line breaks and spaces. - if len(self.doc.xpath('//table[@class="ca-table"][caption[span[text()="%s"]]]//tr' % raw_number)) == 1: - # There is only one coming line (no card information link) - return CleanText( - '//table[@class="ca-table"][caption[span[text()="%s"]]]//tr[position()=last()]/th/a/@href' % raw_number, - replace=[(' ', '')] - )(self.doc) - elif self.doc.xpath( - '//table[@class="ca-table"][caption[span[text()="%s"]]]//tr//a[contains(text(), "Infos carte")]' - % raw_number - ): - # There is a card information line, select the before the last - return CleanText( - '//table[@class="ca-table"][caption[span[text()="%s"]]]//tr[position()=last()-1]/th/a/@href' - % raw_number, - replace=[(' ', '')] - )(self.doc) - else: - # There is no information line, return the last - return CleanText( - '//table[@class="ca-table"][caption[span[text()="%s"]]]//tr[position()=last()]/th/a/@href' % raw_number, - replace=[(' ', '')] - )(self.doc) - - -class WealthPage(LoggedPage, CragrPage): - @pagination - @method - class iter_wealth_accounts(ListElement): - # The is divided in many sub-heads and sub-tables so - # it is easier to point directly to accounts and use ListElement - - item_xpath = '//tr[contains(@class, "colcelligne")][td]' - next_page = Link('//a[@class="btnsuiteliste"]', default=None) - - class item(ItemElement): - klass = Account - - obj_id = CleanText('./td[2]') - obj_number = Field('id') - obj_label = CleanText('./td/span[@class="gras"]') - obj_type = Map(Field('label'), ACCOUNT_TYPES, Account.TYPE_UNKNOWN) - # Accounts without balance will be skipped later on - obj_balance = CleanDecimal.French('./td//*[@class="montant3"]', default=NotAvailable) - obj_currency = Currency('./td[@class="cel-devise"]') - obj_iban = None - obj__form = None - - def obj_url(self): - url = Link('./td[2]/a', default=None)(self) - if url and 'BGPI' in url: - # This URL is just the BGPI home page, not the account itself. - # The real account URL will be set by get_account_details() in BGPISpace. - return 'BGPI' - return url - - def validate(self, obj): - # Skip 'ESPE INTEG' accounts, these liquidities are already available - # on the associated Market account on the Netfinca website - return obj.label != 'ESPE INTEG' - - -class LoansPage(LoggedPage, CragrPage): - @pagination - @method - class iter_loans(ListElement): - # The
is divided in many sub-heads and sub-tables so - # it is easier to point directly to accounts and use ListElement - item_xpath = '//tr[contains(@class, "colcelligne")][td]' - next_page = Link('//a[@class="btnsuiteliste"]', default=None) - - class item(ItemElement): - klass = Loan - - def condition(self): - return 'Billet financier' not in CleanText('./td[1]')(self) - - obj_id = CleanText('./td[2]') - obj_number = Field('id') - obj_label = CleanText('./td[1]') - obj_type = Map(Field('label'), ACCOUNT_TYPES, Account.TYPE_LOAN) - obj_next_payment_amount = Env('next_payment_amount') - obj_total_amount = Env('total_amount') - obj_currency = Currency('./td[@class="cel-devise"]') - obj_url = Link('./td[2]/a', default=None) - obj_iban = None - obj__form = None - - def obj_balance(self): - balance = Env('balance')(self) - return -abs(balance) - - def parse(self, obj): - # We must handle Loan tables with 5 or 6 columns - if CleanText('self::node()[count(td)=5]')(self): - # History table with 4 columns (no loan details) - self.env['next_payment_amount'] = NotAvailable - self.env['total_amount'] = NotAvailable - self.env['balance'] = CleanDecimal.French( - './td[4]//*[@class="montant3" or @class="montant4"]', default=NotAvailable - )(self) - elif CleanText('self::node()[count(td)=6]')(self): - # History table with 5 columns (contains next_payment_amount & total_amount) - self.env['next_payment_amount'] = CleanDecimal.French( - './td[3]//*[@class="montant3"]', default=NotAvailable - )(self) - self.env['total_amount'] = CleanDecimal.French( - './td[4]//*[@class="montant3"]', default=NotAvailable - )(self) - self.env['balance'] = CleanDecimal.French('./td[5]//*[@class="montant3"]', - default=NotAvailable)(self) - - -class CheckingHistoryPage(LoggedPage, CragrPage): - def is_here(self): - return CleanText('//table[@class="ca-table"][caption[span[b[text()="Historique des opérations"]]]]')(self.doc) - - @pagination - @method - class iter_history(ListElement): - item_xpath = '//table[@class="ca-table"][caption[span[b[text()="Historique des opérations"]]]]//tr[contains(@class, "ligne-")]' - next_page = Link('//a[@class="liennavigationcorpspage"][img[@alt="Page suivante"]]', default=None) - - class item(ItemElement): - klass = Transaction - - obj_date = Env('date') - obj_vdate = Env('vdate') - obj_raw = Transaction.Raw(Env('raw')) - obj_amount = Env('amount') - - def parse(self, obj): - self.env['date'] = DateGuesser(CleanText('./td[1]'), Env('date_guesser'))(self) - self.env['vdate'] = NotAvailable - if CleanText( - '//table[@class="ca-table"][caption[span[b[text()="Historique des opérations"]]]]//tr[count(td) = 4]' - )(self): - # History table with 4 columns - self.env['raw'] = CleanText('./td[2]')(self) - self.env['amount'] = CleanDecimal.French('./td[last()]')(self) - - elif CleanText( - '//table[@class="ca-table"][caption[span[b[text()="Historique des opérations"]]]]//tr[count(td) = 5]' - )(self): - # History table with 5 columns - self.env['raw'] = CleanText('./td[3]')(self) - self.env['amount'] = CleanDecimal.French('./td[last()]')(self) - - elif CleanText( - '//table[@class="ca-table"][caption[span[b[text()="Historique des opérations"]]]]//tr[count(td) = 6]' - )(self): - # History table with 6 columns (contains vdate) - self.env['raw'] = CleanText('./td[4]')(self) - self.env['vdate'] = DateGuesser(CleanText('./td[2]'), Env('date_guesser'))(self) - self.env['amount'] = CleanDecimal.French('./td[last()]')(self) - - elif CleanText( - '//table[@class="ca-table"][caption[span[b[text()="Historique des opérations"]]]]//tr[count(td) = 7]' - )(self): - # History table with 7 columns - self.env['amount'] = Coalesce( - CleanDecimal.French('./td[6]', sign=lambda x: -1, default=None), - CleanDecimal.French('./td[7]', default=None) - )(self) - if CleanText( - '//table[@class="ca-table"][caption[span[b[text()="Historique des opérations"]]]]//th[a[contains(text(), "Valeur")]]' - )(self): - # With vdate column ('Valeur') - self.env['raw'] = CleanText('./td[4]')(self) - self.env['vdate'] = DateGuesser(CleanText('./td[2]'), Env('date_guesser'))(self) - else: - # Without any vdate column - self.env['raw'] = CleanText('./td[3]')(self) - else: - assert False, 'This type of history table is not handled yet!' - - -class SavingsHistoryPage(LoggedPage, CragrPage): - def is_here(self): - return CleanText('//span[@class="tdb-cartes-prop"]/b[contains(text(), "HISTORIQUE DES VERSEMENTS")]')(self.doc) - - @pagination - @method - class iter_history(ListElement): - item_xpath = '''//table[@class="ca-table"][caption[span[b[contains(text(), "HISTORIQUE DES VERSEMENTS")]]]] - //tr[contains(@class, "ligne-")]''' - next_page = Link('//a[@class="liennavigationcorpspage"][img[@alt="Page suivante"]]', default=None) - - class item(ItemElement): - klass = Transaction - - def obj_date(self): - date = CleanText('./td[1]/font//text()')(self) - if len(date) == 10: - return Date(CleanText('./td[1]/font//text()'), dayfirst=True)(self) - elif len(date) == 5: - # Date has no indicated year. - return DateGuesser(CleanText('./td[1]//text()'), Env('date_guesser'))(self) - - obj_raw = Transaction.Raw(CleanText('./td[2]/font//text()')) - obj_amount = CleanDecimal.French('./td[3]/font//text()') - obj_rdate = Field('date') - - -class OtherSavingsHistoryPage(LoggedPage, CragrPage): - def is_here(self): - return CleanText( - '//span[@class="tdb-cartes-prop"]/b[contains(text(), "HISTORIQUE DES OPERATIONS") or text()="OPERATIONS"]' - )(self.doc) - - @pagination - @method - class iter_history(ListElement): - item_xpath = '''//table[@class="ca-table"][caption[span[b[contains(text(), "HISTORIQUE DES OPERATIONS")]]]] - //tr[contains(@class, "ligne-")]''' - next_page = Link('//a[@class="liennavigationcorpspage"][img[@alt="Page suivante"]]', default=None) - - class item(ItemElement): - klass = Transaction - - def fill_env(self, page, parent=None): - # This *Element's parent has only the dateguesser in its env and we want to - # use the same object, not copy it. - self.env = parent.env - - def obj_date(self): - # Dates in the first column may appear as '12/01/2019' or '12/01' - date = CleanText('./td[1]/font//text()')(self) - if len(date) == 10: - return Date(CleanText('./td[1]/font//text()'), dayfirst=True)(self) - elif len(date) == 5: - # Date has no indicated year. - return DateGuesser(CleanText('./td[1]//text()'), Env('date_guesser'))(self) - - obj_raw = Transaction.Raw(CleanText('./td[2]/font//text()')) - obj_amount = CleanDecimal.French('./td[3]/font//text()') - obj_rdate = Field('date') - - -class FailedHistoryPage(LoggedPage, CragrPage): - def is_here(self): - return CleanText('//form[@class="ca-forms"]//h1[contains(text(), "Service indisponible")]')(self.doc) - - -class PredicaRedirectionPage(LoggedPage, CragrPage): - def on_load(self): - form = self.get_form() - form.submit() - - -class PredicaInvestmentsPage(LoggedPage, JsonPage): - @method - class iter_investments(DictElement): - item_xpath = 'listeSupports/support' - - class item(ItemElement): - klass = Investment - - obj_label = CleanText(Dict('lcspt')) - obj_valuation = Eval(float_to_decimal, Dict('mtvalspt')) - - def obj_portfolio_share(self): - portfolio_share = Dict('txrpaspt', default=None)(self) - if portfolio_share: - return Eval(lambda x: float_to_decimal(x / 100), portfolio_share)(self) - return NotAvailable - - def obj_unitvalue(self): - unit_value = Dict('mtliqpaaspt', default=None)(self) - if unit_value: - return Eval(float_to_decimal, unit_value)(self) - return NotAvailable - - def obj_quantity(self): - quantity = Dict('qtpaaspt', default=None)(self) - if quantity: - return Eval(float_to_decimal, quantity)(self) - return NotAvailable - - def obj_code(self): - code = Dict('cdsptisn')(self) - if is_isin_valid(code): - return code - return NotAvailable - - def obj_code_type(self): - if Field('code')(self) == NotAvailable: - return NotAvailable - return Investment.CODE_TYPE_ISIN - - -class NetfincaRedirectionPage(LoggedPage, HTMLPage): - def no_netfinca_access(self): - return CleanText('//p[@class="gras" and contains(text(), "service CA-Titres est actuellement indisponible")]')( - self.doc - ) - - def get_url(self): - return Regexp(Attr('//body', 'onload', default=None), r'document.location="([^"]+)"')(self.doc) - - -class NetfincaLanding(LoggedPage, HTMLPage): - pass - - -class NetfincaDetailsPage(LoggedPage, HTMLPage): - def get_balance(self): - # This method returns the PEA balance without the liquidities - return CleanDecimal.French('//tr[td[contains(text(), "Valorisation titres")]]/td[2]/span')(self.doc) - - -class NetfincaReturnPage(LoggedPage, HTMLPage): - def return_from_netfinca(self): - return_form = self.get_form(name='formulaire') - return_form.submit() - - -class NetfincaToCragr(LoggedPage, CragrPage): - def on_load(self): - new_session_value = Regexp( - CleanText('//script[@language="JavaScript"][contains(text(), "idSessionSag")]'), - r'idSessionSag = "([^"]+)', - default=None - )(self.doc) - if new_session_value: - self.browser.session_value = new_session_value - # Automatically go back to the accounts page - self.browser.accounts.go() - - -class BGPIRedirectionPage(LoggedPage, HTMLPage): - def get_bgpi_url(self): - # The HTML is broken so we cannot use a regular Attr('xpath') - m = re.search(r'document.location="([^"]+)"', self.text) - if m: - return m.group(1) - - -class BGPISpace(LoggedPage, HTMLPage): - def get_account_details(self, account_id): - balance = CleanDecimal.French( - '//a[div[div[span[span[contains(text(), "%s")]]]]]/div[1]/div[2]/span/span' % account_id, - default=NotAvailable - )(self.doc) - - currency = Currency( - '//a[div[div[span[span[contains(text(), "%s")]]]]]/div[1]/div[2]/span/span' % account_id, - default=NotAvailable - )(self.doc) - - label = CleanText( - '//a[div[div[span[span[contains(text(), "%s")]]]]]/div[1]/div[1]/span/span' % account_id, - default=NotAvailable - )(self.doc) - - url = Link('//a[div[div[span[span[contains(text(), "%s")]]]]]' % account_id, default=None)(self.doc) - if url: - account_url = 'https://bgpi-gestionprivee.credit-agricole.fr' + url - else: - account_url = None - - return balance, currency, label, account_url - - -class BGPIInvestmentPage(LoggedPage, HTMLPage): - @method - class iter_investments(ListElement): - item_xpath = '//div[div[ul[count(li) > 5]]]' - - class item(ItemElement): - - klass = Investment - - obj_label = CleanText('.//span[@class="uppercase"]') - obj_valuation = CleanDecimal.French( - './/span[@class="box"][span[span[text()="Montant estimé"]]]/span[2]/span' - ) - obj_quantity = CleanDecimal.French( - './/span[@class="box"][span[span[text()="Nombre de part"]]]/span[2]/span' - ) - obj_unitvalue = CleanDecimal.French( - './/span[@class="box"][span[span[text()="Valeur liquidative"]]]/span[2]/span' - ) - obj_unitprice = CleanDecimal.French( - './/span[@class="box"][span[span[text()="Prix de revient"]]]/span[2]/span', default=NotAvailable - ) - obj_portfolio_share = Eval( - lambda x: x / 100, - CleanDecimal.French('.//span[@class="box"][span[span[text()="Répartition"]]]/span[2]/span') - ) - - def obj_diff_ratio(self): - # Euro funds have '-' instead of a diff_ratio value - if ( - CleanText('.//span[@class="box"][span[span[text()="+/- value latente (%)"]]]/span[2]/span')(self) == - '-' - ): - return NotAvailable - return Eval( - lambda x: x / 100, - CleanDecimal.French( - './/span[@class="box"][span[span[text()="+/- value latente (%)"]]]/span[2]/span', - ) - )(self) - - def obj_diff(self): - if Field('diff_ratio')(self) == NotAvailable: - return NotAvailable - return CleanDecimal.French( - './/span[@class="box"][span[span[text()="+/- value latente"]]]/span[2]/span' - )(self) - - def obj_code(self): - code = CleanText('.//span[@class="cl-secondary"]')(self) - if is_isin_valid(code): - return code - return NotAvailable - - def obj_code_type(self): - if Field('code')(self) == NotAvailable: - return NotAvailable - return Investment.CODE_TYPE_ISIN - - -class ProfilePage(LoggedPage, CragrPage): - @method - class get_profile(ItemElement): - klass = Person - - obj_email = Regexp(CleanText('//font/b/script', default=""), r'formatMail\(\'(.*)\'\)', default=NotAvailable) - obj_job = CleanText('//td[contains(text(), "Type de profession")]/following::td[1]', default=NotAvailable) - obj_name = Format( - '%s %s', - CleanText('//td[contains(text(), "Prénom")]/following::td[1]', default=NotAvailable), - CleanText('//td[contains(text(), "Nom")]/following::td[1]', default=NotAvailable) - ) - - def obj_address(self): - # The address is spread accross several / - # So we must fetch them all and reconstitute it - address_items = [] - for item in self.page.doc.xpath( - '//table[tr[td[contains(text(), "Adresse")]]]/tr[position()>3 and position()<8]/td[3]' - ): - if CleanText(item)(self): - address_items.append(CleanText(item)(self)) - return ' '.join(address_items) or NotAvailable diff --git a/modules/cragr/regions/transfer_pages.py b/modules/cragr/regions/transfer_pages.py deleted file mode 100644 index 394e36b9b8..0000000000 --- a/modules/cragr/regions/transfer_pages.py +++ /dev/null @@ -1,402 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright(C) 2012-2019 Budget Insight -# -# This file is part of a weboob module. -# -# This weboob module is free software: you can redistribute it and/or modify -# it under the terms of the GNU Lesser General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This weboob module is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public License -# along with this weboob module. If not, see . - -# yapf-compatible - -from __future__ import unicode_literals - -from datetime import date as ddate, datetime -from decimal import Decimal -import re - -from weboob.browser.pages import LoggedPage, HTMLPage, FormNotFound -from weboob.capabilities.base import Currency -from weboob.capabilities.bank import ( - Recipient, Transfer, TransferError, TransferBankError, - AddRecipientBankError, RecipientInvalidOTP -) -from weboob.browser.filters.standard import ( - Date, CleanText, CleanDecimal, Currency as CleanCurrency, Regexp, -) -from weboob.browser.filters.html import Link - - -def get_text_lines(el): - lines = [re.sub(r'\s+', ' ', line).strip() for line in el.text_content().split('\n')] - return [l for l in lines if l] - - -def MyDate(*args, **kwargs): - kwargs.update(dayfirst=True) - return Date(*args, **kwargs) - - -class HandleErrorHTMLPage(HTMLPage): - def get_error(self): - error = CleanText('//h1[@class="h1-erreur"]')(self.doc) - if error: - self.logger.error('Error detected: %s', error) - return error - - -class CollectePageMixin(object): - """ - Multiple pages have the same url pattern: "/stb/collecteNI?fwkaid=...&fwkpid=...". - Use some page text to determine which page it is. - """ - - IS_HERE_TEXT = None - - def is_here(self): - for el in self.doc.xpath('//div[@class="boutons-act"]//h1'): - labels = self.IS_HERE_TEXT - if not isinstance(labels, (list, tuple)): - labels = [labels] - - for label in labels: - if label in CleanText('.')(el): - return True - return False - - -class TransferInit(LoggedPage, HandleErrorHTMLPage): - def iter_emitters(self): - items = self.doc.xpath('//select[@name="VIR_VIR1_FR3_LE"]/option') - return self.parse_recipients(items, assume_internal=True) - - def iter_recipients(self): - items = self.doc.xpath('//select[@name="VIR_VIR1_FR3_LB"]/option') - return self.parse_recipients(items) - - def parse_recipients(self, items, assume_internal=False): - for opt in items: - lines = get_text_lines(opt) - - if opt.attrib['value'].startswith('I') or assume_internal: - for n, line in enumerate(lines): - if line.strip().startswith('n°'): - rcpt = Recipient() - rcpt._index = opt.attrib['value'] - rcpt._raw_label = ' '.join(lines) - rcpt.category = 'Interne' - rcpt.id = CleanText().filter(line[2:].strip()) - # we don't have iban here, use account number - rcpt.label = ' '.join(lines[:n]) - rcpt.currency = Currency.get_currency(lines[-1]) - rcpt.enabled_at = datetime.now().replace(microsecond=0) - yield rcpt - break - elif opt.attrib['value'].startswith('E'): - if len(lines) > 1: - # In some cases we observed beneficiaries without label, we skip them - rcpt = Recipient() - rcpt._index = opt.attrib['value'] - rcpt._raw_label = ' '.join(lines) - rcpt.category = 'Externe' - rcpt.label = lines[0] - rcpt.iban = lines[1].upper() - rcpt.id = rcpt.iban - rcpt.enabled_at = datetime.now().replace(microsecond=0) - yield rcpt - else: - self.logger.warning('The recipient associated with the iban %s has got no label' % lines[0]) - - def submit_accounts(self, account_id, recipient_id, amount, currency): - emitters = [rcpt for rcpt in self.iter_emitters() if rcpt.id == account_id and not rcpt.iban] - if len(emitters) != 1: - raise TransferError('Could not find emitter %r' % account_id) - recipients = [rcpt for rcpt in self.iter_recipients() if rcpt.id and rcpt.id == recipient_id] - # for recipient with same IBAN, first matched recipient is the default value - if len(recipients) < 1: - raise TransferError('Could not find recipient %r' % recipient_id) - - form = self.get_form(name='frm_fwk') - assert amount > 0 - amount = str(amount.quantize(Decimal('0.00'))) - form['T3SEF_MTT_EURO'], form['T3SEF_MTT_CENT'] = amount.split('.') - form['VIR_VIR1_FR3_LE'] = emitters[0]._index - form['VIR_VIR1_FR3_LB'] = recipients[0]._index - form['DEVISE'] = currency or emitters[0].currency - form['VIR_VIR1_FR3_LE_HID'] = emitters[0]._raw_label - form['VIR_VIR1_FR3_LB_HID'] = recipients[0]._raw_label - form['fwkaction'] = 'Confirmer' # mandatory - form['fwkcodeaction'] = 'Executer' - form.submit() - - def url_list_recipients(self): - return CleanText(u'(//a[contains(text(),"Liste des bénéficiaires")])[1]/@href')(self.doc) - - def add_recipient_is_allowed(self): - return bool( - self.doc.xpath('//a[text()="+ Saisir un autre compte bénéficiaire"]') - or self.doc.xpath('//a[contains(text(),"Liste des bénéficiaires")]') - ) - - def url_add_recipient(self): - link = Link('//a[text()="+ Saisir un autre compte bénéficiaire"]')(self.doc) - return link + '&IDENT=LI_VIR_RIB1&VIR_VIR1_FR3_LE=0&T3SEF_MTT_EURO=&T3SEF_MTT_CENT=&VICrt_REFERENCE=' - - -class RecipientListPage(LoggedPage, HandleErrorHTMLPage): - def url_add_recipient(self): - return CleanText(u'//a[contains(text(),"Ajouter un compte destinataire")]/@href')(self.doc) - - -class RecipientAddingMixin(object): - def submit_recipient(self, label, iban): - try: - form = self.get_form(name='frm_fwk') - except FormNotFound: - assert False, 'An error occurred before sending recipient' - - form['NOM_BENEF'] = label - for i in range(9): - form['CIBAN%d' % (i + 1)] = iban[i * 4:(i + 1) * 4] - form['fwkaction'] = 'VerifCodeIBAN' - form['fwkcodeaction'] = 'Executer' - form.submit() - - -class TransferPage(RecipientAddingMixin, CollectePageMixin, LoggedPage, HandleErrorHTMLPage): - IS_HERE_TEXT = 'Virement' - - ### for transfers - def get_step(self): - return CleanText('//div[@id="etapes"]//li[has-class("encours")]')(self.doc) - - def is_sent(self): - return self.get_step().startswith('Récapitulatif') - - def is_confirm(self): - return self.get_step().startswith('Confirmation') - - def is_reason(self): - return self.get_step().startswith('Informations complémentaires') - - def get_transfer(self): - transfer = Transfer() - - # FIXME all will probably fail if an account has a user-chosen label with "IBAN :" or "n°" - - amount_xpath = '//fieldset//p[has-class("montant")]' - transfer.amount = CleanDecimal.French(amount_xpath)(self.doc) - transfer.currency = CleanCurrency(amount_xpath)(self.doc) - - if self.is_sent(): - transfer.account_id = Regexp( - CleanText('//p[@class="nomarge"][span[contains(text(),' - '"Compte émetteur")]]/text()'), r'n°(\d+)' - )(self.doc) - - base = CleanText( - '//fieldset//table[.//span[contains(text(), "Compte bénéficiaire")]]' - + '//td[contains(text(),"n°") or contains(text(),"IBAN :")]//text()', - newlines=False - )(self.doc) - transfer.recipient_id = Regexp(None, r'IBAN : ([^\n]+)|n°(\d+)').filter(base) - transfer.recipient_id = transfer.recipient_id.replace(' ', '') - if 'IBAN' in base: - transfer.recipient_iban = transfer.recipient_id - - transfer.exec_date = MyDate( - CleanText('//p[@class="nomarge"][span[contains(text(), "Date de l\'ordre")]]/text()') - )(self.doc) - else: - transfer.account_id = Regexp( - CleanText('//fieldset[.//h3[contains(text(), "Compte émetteur")]]//p'), r'n°(\d+)' - )(self.doc) - - base = CleanText('//fieldset[.//h3[contains(text(), "Compte bénéficiaire")]]//text()', - newlines=False)(self.doc) - transfer.recipient_id = Regexp(None, r'IBAN : ([^\n]+)|n°(\d+)').filter(base) - transfer.recipient_id = transfer.recipient_id.replace(' ', '') - if 'IBAN' in base: - transfer.recipient_iban = transfer.recipient_id - - transfer.exec_date = MyDate( - CleanText('//fieldset//p[span[contains(text(), "Virement unique le :")]]/text()') - )(self.doc) - - transfer.label = CleanText('//fieldset//p[span[contains(text(), "Référence opération")]]')(self.doc) - transfer.label = re.sub(r'^Référence opération(?:\s*):', '', transfer.label).strip() - - return transfer - - def submit_more(self, label, date=None): - if date is None: - date = ddate.today() - - form = self.get_form(name='frm_fwk') - form['VICrt_CDDOOR'] = label - form['VICrtU_DATEVRT_JJ'] = date.strftime('%d') - form['VICrtU_DATEVRT_MM'] = date.strftime('%m') - form['VICrtU_DATEVRT_AAAA'] = date.strftime('%Y') - form['DATEC'] = date.strftime('%d/%m/%Y') - form['PERIODE'] = 'U' - form['fwkaction'] = 'Confirmer' - form['fwkcodeaction'] = 'Executer' - form.submit() - - def submit_confirm(self): - form = self.get_form(name='frm_fwk') - form['fwkaction'] = 'Confirmer' - form['fwkcodeaction'] = 'Executer' - form.submit() - - def on_load(self): - super(TransferPage, self).on_load() - # warning: the "service indisponible" message (not catched here) is not a real BrowserUnavailable - err = CleanText('//form//div[has-class("blc-choix-erreur")]//p', default='')(self.doc) - if err: - raise TransferBankError(message=err) - - ### add a recipient by faking a transfer - def confirm_recipient(self): - # pretend to make a transfer - form = self.get_form(name='frm_fwk') - form['AJOUT_BENEF_CHECK'] = 'on' - form['fwkcodeaction'] = 'Executer' - form['fwkaction'] = 'Suite' - form['T3SEF_MTT_EURO'] = '1' - form['DEVISE'] = 'EUR' - form.submit() - - def check_error(self): - # this is for transfer error, it's not a `AddRecipientBankError` but a `TransferBankError` - - msg = CleanText('//tr[@bgcolor="#C74545"]', default='')(self.doc) # there is no id, class or anything... - if msg: - raise TransferBankError(message=msg) - - def check_recipient_error(self): - # this is a copy-paste from RecipientMiscPage, i can't test if it works on this page... - # this is for add recipient by initiate transfer - - msg = CleanText('//tr[@bgcolor="#C74545"]', default='')(self.doc) # there is no id, class or anything... - if msg: - raise AddRecipientBankError(message=msg) - - -class RecipientMiscPage(RecipientAddingMixin, CollectePageMixin, LoggedPage, HandleErrorHTMLPage): - IS_HERE_TEXT = 'Liste des comptes bénéficiaires' - - ### for adding recipients - def send_sms(self): - form = self.get_form(name='frm_fwk') - - assert 'code' not in form - form['fwkaction'] = 'DemandeCodeSMSVerifID' - form['fwkcodeaction'] = 'Executer' - form.submit() - - def get_sms_error(self): - return CleanText('//div[@class="blc-choix-wrap-erreur"]')(self.doc) - - def confirm_recipient(self): - try: - form = self.get_form(name='frm_fwk') - except FormNotFound: - assert False, 'An error occurred before finishing adding recipient' - - form['fwkaction'] = 'ConfirmerAjout' - form['fwkcodeaction'] = 'Executer' - form.submit() - - def check_recipient_error(self): - msg = CleanText('//tr[@bgcolor="#C74545"]', default='')(self.doc) # there is no id, class or anything... - if msg: - raise AddRecipientBankError(message=msg) - - def get_iban_col(self): - for index, td in enumerate(self.doc.xpath('//table[starts-with(@summary,"Nom et IBAN")]//th')): - if 'Numéro de compte' in CleanText('.')(td): - # index start at 0 - return index + 1 - - def find_recipient(self, iban): - iban = iban.upper() - iban_col = self.get_iban_col() - - for tr in self.doc.xpath('//table[starts-with(@summary,"Nom et IBAN")]/tbody/tr'): - iban_text = re.sub(r'\s', '', CleanText('./td[%s]' % iban_col)(tr)) - if iban_text.upper() == 'IBAN:%s' % iban: - res = Recipient() - res.iban = iban - res.id = iban - res.label = CleanText('./td[%s]' % (iban_col - 1))(tr) - return res - - -class RecipientPage(LoggedPage, HandleErrorHTMLPage): - def can_send_code(self): - form = self.get_form(name='frm_fwk') - return 'code' in form - - def send_sms(self): - form = self.get_form(name='frm_fwk') - - if 'code' in form: - # a code is still pending, ask a new one - form['fwkaction'] = 'NouvelleDemandeCodeSMS' - form['fwkcodeaction'] = 'Executer' - new_page = form.submit().page - assert isinstance(new_page, TransferPage) or isinstance(new_page, SendSMSPage) - return new_page.send_sms() - else: - form['fwkaction'] = 'DemandeCodeSMSVerifID' - - form['fwkcodeaction'] = 'Executer' - form.submit() - - def submit_code(self, code): - form = self.get_form(name='frm_fwk') - form['fwkaction'] = 'Confirmer' - form['fwkcodeaction'] = 'Executer' - form['code'] = code - form.submit() - - -class SendSMSPage(LoggedPage, CollectePageMixin, HandleErrorHTMLPage): - IS_HERE_TEXT = 'Authentification par sms - demande' - - def on_load(self): - # if the otp is incorrect - error_msg = CleanText('//div[has-class("blc-choix-erreur")]//span')(self.doc) - if error_msg: - raise AddRecipientBankError(message=error_msg) - - def send_sms(self): - # when a code is still pending - # resend sms to validate recipient - form = self.get_form(name='frm_fwk') - form['fwkaction'] = 'DemandeCodeSMSVerifID' - form['fwkcodeaction'] = 'Executer' - form.submit() - - -class SubmitSMSPage(LoggedPage, RecipientAddingMixin, HandleErrorHTMLPage): - IS_HERE_TEXT = 'Authentification par sms - code' - - -class SendSMSErrorPage(LoggedPage, CollectePageMixin, HTMLPage): - IS_HERE_TEXT = 'Authentification par sms - erreur code' - - def on_load(self): - error_msg = CleanText('//font[contains(text(), "Le code SMS saisi n\'est pas exploitable")]')(self.doc) - raise RecipientInvalidOTP(message=error_msg) -- GitLab