Skip to content
pages.py 99.5 KiB
Newer Older
# -*- coding: utf-8 -*-

# Copyright(C) 2012 Romain Bignon
#
# This file is part of a weboob module.
# This weboob module is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This weboob module is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this weboob module. If not, see <http://www.gnu.org/licenses/>.
ntome's avatar
ntome committed
# flake8: compatible

from __future__ import division
from __future__ import unicode_literals
from base64 import b64decode
from collections import OrderedDict
from PIL import Image, ImageFilter
from io import BytesIO
from decimal import Decimal
from datetime import datetime
from lxml import html
from weboob.browser.pages import (
    LoggedPage, HTMLPage, JsonPage, pagination,
    FormNotFound, RawPage, XMLPage,
)
from weboob.browser.elements import ItemElement, method, ListElement, TableElement, SkipItem, DictElement
from weboob.browser.filters.standard import (
    Date, CleanDecimal, Regexp, CleanText, Env, Upper,
    Field, Eval, Format, Currency, Coalesce,
)
from weboob.browser.filters.html import Link, Attr, TableCell
ntome's avatar
ntome committed
from weboob.capabilities.base import NotAvailable, empty
from weboob.capabilities.bank import (
    Account, Loan, AccountOwnership,
    Transfer, TransferBankError, TransferInvalidOTP,
    Recipient, AddRecipientBankError, RecipientInvalidOTP,
    Emitter, EmitterNumberType, AddRecipientError,
from weboob.capabilities.wealth import Investment
from weboob.capabilities.bill import DocumentTypes, Subscription, Document
from weboob.tools.capabilities.bank.investments import is_isin_valid, IsinCode, IsinType
from weboob.tools.capabilities.bank.transactions import FrenchTransaction
from weboob.tools.capabilities.bank.iban import is_rib_valid, rib2iban, is_iban_valid
from weboob.tools.captcha.virtkeyboard import SplitKeyboard, GridVirtKeyboard
from weboob.tools.compat import unicode, urlparse, parse_qsl
from weboob.exceptions import (
    NoAccountsException, BrowserUnavailable, ActionNeeded, BrowserIncorrectPassword,
    BrowserPasswordExpired,
from weboob.browser.filters.json import Dict
from weboob.browser.exceptions import ClientError
from .base_pages import fix_form, BasePage

def MyDecimal(*args, **kwargs):
    kwargs.update(replace_dots=True)
    return CleanDecimal(*args, **kwargs)

class MyTableCell(TableCell):
    def __init__(self, *names, **kwargs):
        super(MyTableCell, self).__init__(*names, **kwargs)
        self.td = './tr[%s]/td'
def float_to_decimal(f):
    return Decimal(str(f))


class NewLoginPage(HTMLPage):
    def get_main_js_file_url(self):
        return Attr('//script[contains(@src, "main-")]', 'src')(self.doc)


class LoginPage(JsonPage):
    def on_load(self):
        error_msg = self.doc.get('error')
        if error_msg and 'Le service est momentanément indisponible' in error_msg:
            raise BrowserUnavailable(error_msg)

    def get_response(self):
        return self.doc

    def get_wrongpass_message(self):
        error_msg = Dict('error')(self.doc)
        if (
ntome's avatar
ntome committed
            "Nous n'avons pas réussi à vous authentifier" in error_msg
            or 'abonnement est bloqué' in error_msg
            or "numéro d'usager est obligatoire" in error_msg
            or "Délégué inexistant" in error_msg
ntome's avatar
ntome committed
        raise AssertionError('Other error message to catch on LoginPage')
    def get_connection_type(self):
        next_login_url = dict(parse_qsl(urlparse(self.doc['url']).query))
        return next_login_url['type_srv']

class JsFilePage(RawPage):
    def get_client_id(self):
        return Regexp(pattern=r'{authenticated:{clientId:"([^"]+)"').filter(self.text)

    def get_nonce(self):
        return Regexp(pattern=r'\("nonce","([a-z0-9]+)"\)').filter(self.text)


class AuthorizePage(HTMLPage):
    def send_form(self):
        form = self.get_form(id='submitMe')
        form.submit()


class AuthenticationMethodPage(JsonPage):
    @property
    def logged(self):
        try:
            context, = list(self.doc.get('context', {}))
        except ValueError:
            self.logger.warning("oops, we don't know if we're at login or during other authentication")
            return False

        return (
            # can be VIR_SEPA_FR or VIR_SEPA
            context.startswith('VIR_')
            # adding a recipient
            or context == 'AJOUT_CPT'
        )

    def get_validation_id(self):
        return Dict('id')(self.doc)

    def get_wrong_pre_login_status(self):
        if (
            not Dict('step/validationUnits', default=None)(self.doc)
            and not Dict('validationUnits', default=None)(self.doc)
        ):
            # 'validationUnits' informs about auth method
            # not having any is faulty for the connection
            status = self.doc['response']['status']
ntome's avatar
ntome committed
            assert status in ('AUTHENTICATION_FAILED',), (
                'Unhandled status when checking if authentication method is informed: %s' % status
            )
    @property
    def validation_units(self):
        units = Coalesce(
            Dict('step/validationUnits', default=None),
            Dict('validationUnits', default=None),
        )(self.doc)
        return units[0]
    @property
    def validation_unit_id(self):
        assert len(self.validation_units) == 1
        # The data we are looking for is in a dict with a random uuid key.
        return next(iter(self.validation_units))
    def get_authentication_method_info(self):
        # The data we are looking for is in a dict with a random uuid key.
        return self.validation_units[self.validation_unit_id][0]
    def get_authentication_method_type(self):
        return self.get_authentication_method_info()['type']
    def login_errors(self, error):
        # AUTHENTICATION_LOCKED is a BrowserIncorrectPassword because there is a key
        # 'unlockingDate', in the json, that tells when the account will be unlocked.
        # So it does not require any action from the user and is automatic.
        if error in ('FAILED_AUTHENTICATION', 'AUTHENTICATION_LOCKED', 'AUTHENTICATION_FAILED'):
            raise BrowserIncorrectPassword()
        if error in ('ENROLLMENT', ):
            raise BrowserPasswordExpired()

    def transfer_errors(self, error):
        if error == 'FAILED_AUTHENTICATION':
            # For the moment, only otp sms is handled
            raise TransferInvalidOTP(message="Le code SMS que vous avez renseigné n'est pas valide")

    def recipient_errors(self, error):
        if error == 'FAILED_AUTHENTICATION':
            # For the moment, only otp sms is handled
            raise RecipientInvalidOTP(message="Le code SMS que vous avez renseigné n'est pas valide")
        elif error == 'AUTHENTICATION_CANCELED':
            raise AddRecipientError(message="L'ajout a été annulée via l'application mobile.")

    def check_errors(self, feature):
        if 'response' in self.doc:
            result = self.doc['response']['status']
        elif 'step' in self.doc:
            # Can have error at first authentication request,
            # error will be handle in `if` case.
            # If there is no error, it will retrive 'AUTHENTICATION' as result value.
            result = self.doc['step']['phase']['state']
        elif 'phase' in self.doc and self.get_authentication_method_type() == 'PASSWORD_ENROLL':
            result = self.doc['phase']['state']
        else:
            result = self.doc['phase']['previousResult']

        if result in ('AUTHENTICATION', 'AUTHENTICATION_SUCCESS'):
            return
        FEATURES_ERRORS = {
            'login': self.login_errors,
            'transfer': self.transfer_errors,
            'recipient': self.recipient_errors,
        }
        FEATURES_ERRORS[feature](error=result)

ntome's avatar
ntome committed
        raise AssertionError('Error during %s authentication is not handled yet: %s' % (feature, result))


class AuthenticationStepPage(AuthenticationMethodPage):
    def get_redirect_data(self):
        # In case of wrongpass the response key does not exist
        # So it needs a default value
        return Dict('response/saml2_post', default=NotAvailable)(self.doc)
class VkImagePage(JsonPage):
    def get_all_images_data(self):
        return self.doc


class ValidationPageOption(LoggedPage, HTMLPage):
class LoginTokensPage(JsonPage):
    def get_access_token(self):
        return Dict('parameters/access_token')(self.doc)

    def get_id_token(self):
        return Dict('parameters/id_token')(self.doc)


class CaissedepargneNewKeyboard(SplitKeyboard):
    char_to_hash = {
        '0': '66ec79b200706e7f9c14f2b6d35dbb05',
        '1': ('529819241cce382b429b4624cb019b56', '0ea8c08e52d992a28aa26043ffc7c044'),
        '2': 'fab68678204198b794ce580015c8637f',
        '3': '3fc5280d17cf057d1c4b58e4f442ceb8',
ntome's avatar
ntome committed
        '4': (
            'dea8800bdd5fcaee1903a2b097fbdef0', 'e413098a4d69a92d08ccae226cea9267',
            '61f720966ccac6c0f4035fec55f61fe6', '2cbd19a4b01c54b82483f0a7a61c88a1',
        ),
        '5': 'ff1909c3b256e7ab9ed0d4805bdbc450',
        '6': '7b014507ffb92a80f7f0534a3af39eaa',
        '7': '7d598ff47a5607022cab932c6ad7bc5b',
        '8': ('4ed28045e63fa30550f7889a18cdbd81', '88944bdbef2e0a49be9e0c918dd4be64'),
        '9': 'dd6317eadb5a0c68f1938cec21b05ebe',
    }
    codesep = ' '

    def __init__(self, browser, images):
        code_to_filedata = {}
        for img_item in images:
            img_content = browser.location(img_item['uri']).content
            img = Image.open(BytesIO(img_content))
            img = img.filter(ImageFilter.UnsharpMask(
                radius=2,
                percent=150,
                threshold=3,
            ))
            img = img.convert('L', dither=None)
ntome's avatar
ntome committed

            def threshold(px):
                if px < 20:
                    return 0
                return 255

            img = Image.eval(img, threshold)
            b = BytesIO()
            img.save(b, format='PNG')
            code_to_filedata[img_item['value']] = b.getvalue()
        super(CaissedepargneNewKeyboard, self).__init__(code_to_filedata)


class CaissedepargneKeyboard(GridVirtKeyboard):
    color = (255, 255, 255)
    margin = 3, 3
    symbols = {
        '0': 'ef8d775a73b751c5fbee06e2d537785c',
        '1': 'bf51842846c3045f76355de32e4689c7',
        '2': 'e4c057317b7ceb17241a0ae4c26844c4',
        '3': 'c28c0c109a63f034d0f7c0f7ffdb364c',
        '4': '6ea6a5152efb1d12c33f9cbf9476caec',
        '5': '7ec4b424b5db7e7b2a54e6300fdb7515',
        '6': 'a1fa95fc856804f978f20ad42c60f6d7',
        '7': '64646adaa5a0b2506880970d8e928156',
        '8': '4abcc6b24fa77f3756b96257962615eb',
        '9': '3f41daf8ca5f250be5df91fe24079735',
    }

    def __init__(self, image, symbols):
        image = BytesIO(b64decode(image.encode('ascii')))
        super(CaissedepargneKeyboard, self).__init__(symbols, 5, 3, image, self.color, convert='RGB')

    def check_color(self, pixel):
        for c in pixel:
            if c < 250:
                return True


class GarbagePage(LoggedPage, HTMLPage):
    def on_load(self):
        go_back_link = Link('//a[@class="btn" or @class="cta_stroke back"]', default=NotAvailable)(self.doc)
        if go_back_link is not NotAvailable:
ntome's avatar
ntome committed
            go_back_link = re.search(r'\(~deibaseurl\)(.*)$', go_back_link).group(1)

            self.browser.location('%s%s' % (self.browser.BASEURL, go_back_link))
class MessagePage(GarbagePage):
    def get_message(self):
        return CleanText('//form[contains(@name, "leForm")]//span')(self.doc)

    def submit(self):
        form = self.get_form(name='leForm')
        form['signatur1'] = ['on']
        form.submit()


class _LogoutPage(HTMLPage):
    def on_load(self):
        raise BrowserUnavailable(CleanText('//*[@class="messErreur"]')(self.doc))
class ErrorPage(_LogoutPage):
class Transaction(FrenchTransaction):
ntome's avatar
ntome committed
        (
            re.compile(r'^CB (?P<text>.*?) FACT (?P<dd>\d{2})(?P<mm>\d{2})(?P<yy>\d{2})\b', re.IGNORECASE),
            FrenchTransaction.TYPE_CARD,
        ),
        (re.compile(r'^RET(RAIT)? DAB (?P<dd>\d+)-(?P<mm>\d+)-.*', re.IGNORECASE), FrenchTransaction.TYPE_WITHDRAWAL),
ntome's avatar
ntome committed
        (
            re.compile(
                r'^RET(RAIT)? DAB (?P<text>.*?) (?P<dd>\d{2})(?P<mm>\d{2})(?P<yy>\d{2}) (?P<HH>\d{2})H(?P<MM>\d{2})\b',
                re.IGNORECASE
            ),
            FrenchTransaction.TYPE_WITHDRAWAL,
        ),
        (re.compile(r'^VIR(EMENT)?(\.PERIODIQUE)? (?P<text>.*)', re.IGNORECASE), FrenchTransaction.TYPE_TRANSFER),
        (re.compile(r'^PRLV (?P<text>.*)', re.IGNORECASE), FrenchTransaction.TYPE_ORDER),
        (re.compile(r'^CHEQUE.*', re.IGNORECASE), FrenchTransaction.TYPE_CHECK),
        (re.compile(r'^(CONVENTION \d+ )?COTIS(ATION)? (?P<text>.*)', re.IGNORECASE), FrenchTransaction.TYPE_BANK),
        (re.compile(r'^\* ?(?P<text>.*)', re.IGNORECASE), FrenchTransaction.TYPE_BANK),
        (re.compile(r'^REMISE (?P<text>.*)', re.IGNORECASE), FrenchTransaction.TYPE_CHECK),
        (re.compile(r'^Depot Esp (?P<text>.*)', re.IGNORECASE), FrenchTransaction.TYPE_DEPOSIT),
        (re.compile(r'^(?P<text>.*)( \d+)? QUITTANCE .*', re.IGNORECASE), FrenchTransaction.TYPE_ORDER),
        (re.compile(r'^CB [\d\*]+ TOT DIF .*', re.IGNORECASE), FrenchTransaction.TYPE_CARD_SUMMARY),
        (re.compile(r'^CB [\d\*]+ (?P<text>.*)', re.IGNORECASE), FrenchTransaction.TYPE_CARD),
ntome's avatar
ntome committed
        (
            re.compile(r'^CB (?P<text>.*?) (?P<dd>\d{2})(?P<mm>\d{2})(?P<yy>\d{2})\b', re.IGNORECASE),
            FrenchTransaction.TYPE_CARD,
        ),
        (
            re.compile(r'\*CB (?P<text>.*?) (?P<dd>\d{2})(?P<mm>\d{2})(?P<yy>\d{2})\b', re.IGNORECASE),
            FrenchTransaction.TYPE_CARD,
        ),
        (
            re.compile(r'^FAC CB (?P<text>.*?) (?P<dd>\d{2})/(?P<mm>\d{2})\b', re.IGNORECASE),
            FrenchTransaction.TYPE_CARD,
        ),
        (re.compile(r'^\*?CB (?P<text>.*)', re.IGNORECASE), FrenchTransaction.TYPE_CARD),
        # For life insurances and capitalisation contracts
        (re.compile(r'^VERSEMENT', re.IGNORECASE), FrenchTransaction.TYPE_DEPOSIT),
        (re.compile(r'^Réinvestissement', re.IGNORECASE), FrenchTransaction.TYPE_DEPOSIT),
        (re.compile(r'^REVALORISATION', re.IGNORECASE), FrenchTransaction.TYPE_BANK),
        (re.compile(r'^ARBITRAGE', re.IGNORECASE), FrenchTransaction.TYPE_BANK),
        (re.compile(r'^RACHAT PARTIEL', re.IGNORECASE), FrenchTransaction.TYPE_BANK),
        (re.compile(r'^(?P<text>INTERETS.*)', re.IGNORECASE), FrenchTransaction.TYPE_BANK),
class IndexPage(LoggedPage, BasePage):
    ACCOUNT_TYPES = {
        'Epargne liquide': Account.TYPE_SAVINGS,
        'Compte Courant': Account.TYPE_CHECKING,
        'COMPTE A VUE': Account.TYPE_CHECKING,
        'COMPTE CHEQUE': Account.TYPE_CHECKING,
        'Mes comptes': Account.TYPE_CHECKING,
        'CPT DEPOT PART.': Account.TYPE_CHECKING,
        'CPT DEPOT PROF.': Account.TYPE_CHECKING,
        'Mon épargne': Account.TYPE_SAVINGS,
        'Mes autres comptes': Account.TYPE_SAVINGS,
        'Compte Epargne et DAT': Account.TYPE_SAVINGS,
        'Plan et Contrat d\'Epargne': Account.TYPE_SAVINGS,
        'COMPTE SUR LIVRET': Account.TYPE_SAVINGS,
        'LIVRET DEV.DURABLE': Account.TYPE_SAVINGS,
        'LDD Solidaire': Account.TYPE_SAVINGS,
        'LIVRET A': Account.TYPE_SAVINGS,
        'LIVRET JEUNE': Account.TYPE_SAVINGS,
        'LIVRET GRAND PRIX': Account.TYPE_SAVINGS,
        'LEP': Account.TYPE_SAVINGS,
        'L.EPAR POPULAIRE': Account.TYPE_SAVINGS,
        'LEL': Account.TYPE_SAVINGS,
        'PLAN EPARG. LOGEMENT': Account.TYPE_SAVINGS,
        'L. EPAR LOGEMENT': Account.TYPE_SAVINGS,
        'CPT PARTS SOCIALES': Account.TYPE_MARKET,
        'PEL': Account.TYPE_SAVINGS,
        'PEL 16 2013': Account.TYPE_SAVINGS,
        'PEL 16 2014': Account.TYPE_SAVINGS,
        'PARTS SOCIALES': Account.TYPE_MARKET,
        'Titres': Account.TYPE_MARKET,
        'Compte titres': Account.TYPE_MARKET,
        'Mes crédits immobiliers': Account.TYPE_LOAN,
        'Mes crédits renouvelables': Account.TYPE_LOAN,
        'Mes crédits consommation': Account.TYPE_LOAN,
        'PEA NUMERAIRE': Account.TYPE_PEA,
        'PEA': Account.TYPE_PEA,
    }
    ACCOUNT_TYPES_LINK = {
        'SYNTHESE_ASSURANCE_CNP': Account.TYPE_LIFE_INSURANCE,
        'REDIR_ASS_VIE': Account.TYPE_LIFE_INSURANCE,
        'SYNTHESE_EPARGNE': Account.TYPE_LIFE_INSURANCE,
        'ASSURANCE_VIE': Account.TYPE_LIFE_INSURANCE,
        'NA_WEB': Account.TYPE_LIFE_INSURANCE,
        'BOURSE': Account.TYPE_MARKET,
        'COMPTE_TITRE': Account.TYPE_MARKET,
    }

    def on_load(self):

        # For now, we have to handle this because after this warning message,
        # the user is disconnected (even if all others account are reachable)
        if 'QCF' in self.browser.url:
            # QCF is a mandatory test to make sure you know the basics about financials products
            # however, you can still choose to postpone it. hence the continue link
            link = Link('//span[@id="lea-prdvel-lien"]//b/a[contains(text(), "Continuer")]')(self.doc)
            if link:
                self.logger.warning("By-passing QCF")
                self.browser.location(link)
            else:
                message = CleanText('//span[contains(@id, "QCF")]/p')(self.doc)
                expected = (
                    "investissement financier (QCF) n’est plus valide à ce jour ou que vous avez refusé d’y répondre",
                    "expérience en matière d'instruments financiers n'est plus valide ou n’a pas pu être déterminé",
                )
                if any(e in message for e in expected):
                    raise ActionNeeded(message)
                raise AssertionError('Unhandled error while going to market space: %s' % message)
ntome's avatar
ntome committed
        message = CleanText(
            '//body/div[@class="content"]//p[contains(text(), "indisponible pour cause de maintenance")]'
        )(self.doc)
        if message:
            raise BrowserUnavailable(message)
Baptiste Delpey's avatar
Baptiste Delpey committed
        # This page is sometimes an useless step to the market website.
ntome's avatar
ntome committed
        bourse_link = Link(
            '//div[@id="MM_COMPTE_TITRE_pnlbourseoic"]//a[contains(text(), "Accédez à la consultation")]',
            default=None
        )(self.doc)

        if bourse_link:
        return bool(CleanText('//span[contains(text(), "Authentification non rejouable")]')(self.doc))
        return (
            not bool(CleanText('//table[@class="menu"]//div[contains(., "Crédits")]')(self.doc))
            and not bool(CleanText('//table[@class="header-navigation_main"]//a[contains(., "Crédits")]')(self.doc))
        )
    def check_measure_accounts(self):
ntome's avatar
ntome committed
        return not CleanText(
            '//div[@class="MessageErreur"]/ul/li[contains(text(), "Aucun compte disponible")]'
        )(self.doc)
    def check_no_accounts(self):
ntome's avatar
ntome committed
        no_account_message = CleanText(
            '//span[@id="MM_LblMessagePopinError"]/p[contains(text(), "Aucun compte disponible")]'
        )(self.doc)
        if no_account_message:
            raise NoAccountsException(no_account_message)
    def find_and_replace(self, info, acc_id):
        # The site might be broken: id in js: 4097800039137N418S00197, id in title: 1379418S001 (N instead of 9)
        # So we seek for a 1 letter difference and replace if found .... (so sad)
        for i in range(len(info['id']) - len(acc_id) + 1):
            sub_part = info['id'][i:i + len(acc_id)]
            z = zip(sub_part, acc_id)
            if len([tuple_letter for tuple_letter in z if len(set(tuple_letter)) > 1]) == 1:
                info['link'] = info['link'].replace(sub_part, acc_id)
                info['id'] = info['id'].replace(sub_part, acc_id)
                return

    def _get_account_info(self, a, accounts):
ntome's avatar
ntome committed
        m = re.search(
            r"PostBack(Options)?\([\"'][^\"']+[\"'],\s*['\"]([HISTORIQUE_\w|SYNTHESE_ASSURANCE_CNP|BOURSE|COMPTE_TITRE][\d\w&]+)?['\"]",
            a.attrib.get('href', '')
        )
        if m is None:
            return None
        else:
            # it is in form CB&12345[&2]. the last part is only for new website
            # and is necessary for navigation.
            link = m.group(2)
            parts = link.split('&')
            info['link'] = link
ntome's avatar
ntome committed
            id = re.search(r"([\d]+)", a.attrib.get('title', ''))
            if len(parts) > 1:
                info['type'] = parts[0]
                if info['type'] in ('REDIR_ASS_VIE', 'NA_WEB'):
                    # The link format for these account types has an additional parameter
                    info['id'] = info['_id'] = parts[2]
                else:
                    info['id'] = info['_id'] = parts[1]
                if id or info['id'] in [acc._info['_id'] for acc in accounts.values()]:
ntome's avatar
ntome committed
                    if id:
                        _id = id.group(1)
                    else:
                        unique_ids = {k for k, v in accounts.items() if info['id'] == v._info['_id']}
                        _id = list(unique_ids)[0]
                    self.find_and_replace(info, _id)
                info['id'] = info['_id'] = id.group(1)
            account_type = self.ACCOUNT_TYPES_LINK.get(info['type'])
            if account_type:
                info['acc_type'] = account_type
    def is_account_inactive(self, account_id):
        return self.doc.xpath('//tr[td[contains(text(), $id)]][@class="Inactive"]', id=account_id)

    def _add_account(self, accounts, link, label, account_type, balance, number=None, ownership=NotAvailable):
        info = self._get_account_info(link, accounts)
        if info is None:
            self.logger.warning('Unable to parse account %r: %r' % (label, link))
            return

        account = Account()
        account.id = info['id']
        if is_rib_valid(info['id']):
            account.iban = rib2iban(info['id'])
        account._info = info
        account.label = label
        account.ownership = ownership
ntome's avatar
ntome committed
        account.type = self.ACCOUNT_TYPES.get(label, info.get('acc_type', account_type))
        if 'PERP' in account.label:
            account.type = Account.TYPE_PERP
        if 'NUANCES CAPITALISATI' in account.label:
            account.type = Account.TYPE_CAPITALISATION
        if account.type in (Account.TYPE_LIFE_INSURANCE, Account.TYPE_PERP):
            account.ownership = AccountOwnership.OWNER

        balance = balance or self.get_balance(account)
ntome's avatar
ntome committed
        if not empty(balance):
            account.balance = Decimal(FrenchTransaction.clean_amount(balance))
            account.currency = account.get_currency(balance)
        else:
            account.currency = account.balance = NotAvailable
        account._card_links = []

        # Set coming history link to the parent account. At this point, we don't have card account yet.
        if account._info['type'] == 'HISTORIQUE_CB' and account.id in accounts:
            a = accounts[account.id]
            a.coming = Decimal('0.0')
            a._card_links = account._info
            return

        accounts[account.id] = account
    def get_balance(self, account):
        if account.type not in (Account.TYPE_LIFE_INSURANCE, Account.TYPE_PERP, Account.TYPE_CAPITALISATION):
            return NotAvailable
        page = self.go_history(account._info).page
ntome's avatar
ntome committed
        balance = page.doc.xpath(
            './/tr[td[contains(@id,"NumContrat")]]/td[@class="somme"]/a[contains(@href, $id)]',
            id=account.id
        )
            balance = CleanText('.')(balance[0])
ntome's avatar
ntome committed
            if balance == '':
                balance = NotAvailable
        else:
            # Specific xpath for some Life Insurances:
            balance = page.doc.xpath('//tr[td[contains(text(), $id)]]/td/div[contains(@id, "Solde")]', id=account.id)
            if len(balance) > 0:
                balance = CleanText('.')(balance[0])
ntome's avatar
ntome committed
                if balance == '':
                    balance = NotAvailable
            else:
                # sometimes the accounts are attached but no info is available
                balance = NotAvailable
    def get_measure_balance(self, account):
        for tr in self.doc.xpath('//table[@cellpadding="1"]/tr[not(@class)]'):
            account_number = CleanText('./td/a[contains(@class, "NumeroDeCompte")]')(tr)
            if re.search(r'[A-Z]*\d{3,}', account_number).group() in account.id:
                # The regex '\s\d{1,3}(?:[\s.,]\d{3})*(?:[\s.,]\d{2})' matches for example '106 100,64'
                return re.search(r'\s\d{1,3}(?:[\s.,]\d{3})*(?:[\s.,]\d{2})', account_number).group()
        return NotAvailable

    def get_measure_ids(self):
        accounts_id = []
        for a in self.doc.xpath('//table[@cellpadding="1"]/tr/td[2]/a'):
ntome's avatar
ntome committed
            accounts_id.append(re.search(r"(\d{6,})", Attr('.', 'href')(a)).group(1))
    def has_next_page(self):
        return self.doc.xpath('//div[@id="MM_SYNTHESE_MESURES_m_DivLinksPrecSuiv"]//a[contains(text(), "Page suivante")]')

    def goto_next_page(self):
        form = self.get_form(id="main")

        form['__EVENTTARGET'] = 'MM$SYNTHESE_MESURES$lnkSuivante'
        form['__EVENTARGUMENT'] = ''
        form['m_ScriptManager'] = 'MM$m_UpdatePanel|MM$SYNTHESE_MESURES$lnkSuivante'
        fix_form(form)
        form.submit()

    def get_list(self, owner_name):
        # Old website
        for table in self.doc.xpath('//table[@cellpadding="1"]'):
            account_type = Account.TYPE_UNKNOWN
            for tr in table.xpath('./tr'):
                tds = tr.findall('td')
                if tr.attrib.get('class', '') == 'DataGridHeader':
                    account_type = (
                        self.ACCOUNT_TYPES.get(tds[1].text.strip())
                        or self.ACCOUNT_TYPES.get(CleanText('.')(tds[2]))
                        or self.ACCOUNT_TYPES.get(CleanText('.')(tds[3]), Account.TYPE_UNKNOWN)
                    )
                    # On the same row, there could have many accounts (check account and a card one).
                    # For the card line, the number will be the same than the checking account, so we skip it.
                    ownership = self.get_ownership(tds, owner_name)
                    if len(tds) > 4:
                        for i, a in enumerate(tds[2].xpath('./a')):
                            label = CleanText('.')(a)
                            balance = CleanText('.')(tds[-2].xpath('./a')[i])
                            number = None
                            # if i > 0, that mean it's a card account. The number will be the same than it's
                            # checking parent account, we have to skip it.
                            if i == 0:
                                number = CleanText('.')(tds[-4].xpath('./a')[0])
                            self._add_account(accounts, a, label, account_type, balance, number, ownership=ownership)
                    # Only 4 tds on "banque de la reunion" website.
                    elif len(tds) == 4:
                        for i, a in enumerate(tds[1].xpath('./a')):
                            label = CleanText('.')(a)
                            balance = CleanText('.')(tds[-1].xpath('./a')[i])
                            self._add_account(accounts, a, label, account_type, balance, ownership=ownership)
ntome's avatar
ntome committed
        website = 'old'
        if accounts:
            website = 'new'
        self.logger.debug('we are on the %s website', website)
        if len(accounts) == 0:
            # New website
            for table in self.doc.xpath('//div[@class="panel"]'):
                title = table.getprevious()
                if title is None:
                    continue
                account_type = self.ACCOUNT_TYPES.get(CleanText('.')(title), Account.TYPE_UNKNOWN)
                for tr in table.xpath('.//tr[@class!="en-tetes" and @class!="Inactive"]'):
                    tds = tr.findall('td')
                    for i in range(len(tds)):
                        if a is not None:
                            break

                    if a is None:
                    # sometimes there's a tooltip span to ignore next to <strong>
                    # (perhaps only on creditcooperatif)
                    label = CleanText('.//strong')(tds[0])
                    balance = CleanText('.//td[has-class("somme")]')(tr)
                    ownership = self.get_ownership(tds, owner_name)
                    account = self._add_account(accounts, a, label, account_type, balance, ownership=ownership)
                    if account:
                        account.number = CleanText('.')(tds[1])
        return list(accounts.values())
    def get_ownership(self, tds, owner_name):
        if len(tds) > 2:
            account_owner = CleanText('.', default=None)(tds[2]).upper()
            if account_owner and any(title in account_owner for title in ('M', 'MR', 'MLLE', 'MLE', 'MME')):
ntome's avatar
ntome committed
                pattern = re.compile(
                    r'(m|mr|me|mme|mlle|mle|ml)\.? ?(.*)\bou (m|mr|me|mme|mlle|mle|ml)\b(.*)',
                    re.IGNORECASE
                )

                if pattern.search(account_owner):
                    return AccountOwnership.CO_OWNER
                elif all(n in account_owner for n in owner_name.split()):
                    return AccountOwnership.OWNER
                return AccountOwnership.ATTORNEY
        return NotAvailable

    def is_access_error(self):
        error_message = u"Vous n'êtes pas autorisé à accéder à cette fonction"
        if error_message in CleanText('//div[@class="MessageErreur"]')(self.doc):
            return True
    def go_loans_conso(self, tr):

        link = tr.xpath('./td/a[contains(@id, "IdaCreditPerm")]')
ntome's avatar
ntome committed
        m = re.search(r'CREDITCONSO&(\w+)', link[0].attrib['href'])
        if m:
            account = m.group(1)

        form = self.get_form(id="main")
        form['__EVENTTARGET'] = 'MM$SYNTHESE_CREDITS'
        form['__EVENTARGUMENT'] = 'ACTIVDESACT_CREDITCONSO&%s' % account
        form['m_ScriptManager'] = 'MM$m_UpdatePanel|MM$SYNTHESE_CREDITS'
        form.submit()

    def get_loan_list(self):
        accounts = OrderedDict()

        # Old website
        for tr in self.doc.xpath('//table[@cellpadding="1"]/tr[not(@class) and td[a]]'):
            tds = tr.findall('td')

            if 'Veuillez contacter le Crédit Bailleur' in CleanText('./a')(tds[4]):
                # balance not available, we skip the account
                continue

            account = Account()
            account.id = CleanText('./a')(tds[2]).split('-')[0].strip()
            account.label = CleanText('./a')(tds[2]).split('-')[-1].strip()
            account.type = Account.TYPE_LOAN
            account.balance = -CleanDecimal('./a', replace_dots=True)(tds[4])
            account.currency = account.get_currency(CleanText('./a')(tds[4]))
            accounts[account.id] = account

ntome's avatar
ntome committed
        website = 'old'
        if accounts:
            website = 'new'
        self.logger.debug('we are on the %s website', website)
        if len(accounts) == 0:
            # New website
            for table in self.doc.xpath('//div[@class="panel"]'):
                title = table.getprevious()
                if title is None:
                    continue
                if "immobiliers" not in CleanText('.')(title):
                    account_type = self.ACCOUNT_TYPES.get(CleanText('.')(title), Account.TYPE_UNKNOWN)
                    for tr in table.xpath('./table/tbody/tr[contains(@id,"MM_SYNTHESE_CREDITS") and contains(@id,"IdTrGlobal")]'):
                        tds = tr.findall('td')
                            continue
                        for i in tds[0].xpath('.//a/strong'):
                            label = i.text.strip()
                            break
ntome's avatar
ntome committed

                        balance = Decimal(FrenchTransaction.clean_amount(CleanText('.')(tds[-1])))
                        if len(tds) == 3:
                            available = Decimal(FrenchTransaction.clean_amount(CleanText('.')(tds[-2])))

                            if (
                                available
                                and not any(cls in Attr('.', 'id')(tr) for cls in ['dgImmo', 'dgConso'])
                            ):
                                # in case of Consumer credit or revolving credit, we substract avalaible amount with max amout
                                # to get what was spend
                                balance = available - balance

                        account = Loan()
                        account.id = label.split(' ')[-1]
                        account.label = unicode(label)
                        account.type = account_type
                        account.balance = -abs(balance)
                        account.currency = account.get_currency(CleanText('.')(tds[-1]))
                        account._card_links = []
                        # The website doesn't show any information relative to the loan
                        # owner, we can then assume they all belong to the credentials owner.
                        account.ownership = AccountOwnership.OWNER

                        if "renouvelables" in CleanText('.')(title):
                            if 'JSESSIONID' in self.browser.session.cookies:
                                # Need to delete this to access the consumer loans space (a new one will be created)
                                del self.browser.session.cookies['JSESSIONID']
                            try:
                                self.go_loans_conso(tr)
                            except ClientError as e:
                                if e.response.status_code == 401:
ntome's avatar
ntome committed
                                    raise ActionNeeded(
                                        'La situation actuelle de votre dossier ne vous permet pas d\'accéder à cette fonctionnalité. '
                                        + 'Nous vous invitons à contacter votre Centre de relation Clientèle pour accéder à votre prêt.'
                                    )
                            d = self.browser.loans_conso()
                            if d:
                                account.total_amount = float_to_decimal(d['contrat']['creditMaxAutorise'])
                                account.available_amount = float_to_decimal(d['situationCredit']['disponible'])
ntome's avatar
ntome committed
                                account.next_payment_amount = float_to_decimal(
                                    d['situationCredit']['mensualiteEnCours']
                                )
                        accounts[account.id] = account
        return list(accounts.values())
    @method
    class get_real_estate_loans(ListElement):
        # beware the html response is slightly different from what can be seen with the browser
        # because of some JS most likely: use the native HTML response to build the xpath
        item_xpath = '//h3[contains(text(), "immobiliers")]//following-sibling::div[@class="panel"][1]//div[@id[starts-with(.,"MM_SYNTHESE_CREDITS")] and contains(@id, "IdDivDetail")]'

        class iter_account(TableElement):
            item_xpath = './table[@class="static"][1]/tbody'
            head_xpath = './table[@class="static"][1]/tbody/tr/th'

            col_total_amount = 'Capital Emprunté'
            col_rate = 'Taux d’intérêt nominal'
            col_balance = 'Capital Restant Dû'
            col_last_payment_date = 'Dernière échéance'
            col_next_payment_amount = 'Montant prochaine échéance'
            col_next_payment_date = 'Prochaine échéance'

            def parse(self, el):
                self.env['id'] = CleanText("./h2")(el).split()[-1]
                self.env['label'] = CleanText("./h2")(el)

            class item(ItemElement):

                klass = Loan

                obj_id = Env('id')
                obj_label = Env('label')
                obj_type = Loan.TYPE_LOAN
                obj_total_amount = MyDecimal(MyTableCell("total_amount"))
                obj_balance = MyDecimal(MyTableCell("balance"), sign=lambda x: -1)
                obj_currency = Currency(MyTableCell("balance"))
                obj_last_payment_date = Date(CleanText(MyTableCell("last_payment_date")), dayfirst=True)
                obj_next_payment_amount = MyDecimal(MyTableCell("next_payment_amount"))
ntome's avatar
ntome committed
                obj_next_payment_date = Date(
                    CleanText(
                        MyTableCell("next_payment_date", default=''),
                        default=NotAvailable
                    ),
ntome's avatar
ntome committed
                    default=NotAvailable
                )
                obj_rate = MyDecimal(MyTableCell("rate", default=NotAvailable), default=NotAvailable)
                # The website doesn't show any information relative to the loan
                # owner, we can then assume they all belong to the credentials owner.
                obj_ownership = AccountOwnership.OWNER
    def submit_form(self, form, eventargument, eventtarget, scriptmanager):
        form['__EVENTARGUMENT'] = eventargument
        form['__EVENTTARGET'] = eventtarget
        form['m_ScriptManager'] = scriptmanager
        fix_form(form)
        form.submit()

    def go_levies(self, account_id=None):
        form = self.get_form(id='main')
        if account_id:
            # Go to an account specific levies page
            eventargument = ""
            if "MM$m_CH$IsMsgInit" in form:
                # Old website
                form['MM$SYNTHESE_SDD_RECUS$m_ExDropDownList'] = account_id
                eventtarget = "MM$SYNTHESE_SDD_RECUS$m_ExDropDownList"
                scriptmanager = "MM$m_UpdatePanel|MM$SYNTHESE_SDD_RECUS$m_ExDropDownList"
            else:
                # New website
                form['MM$SYNTHESE_SDD_RECUS$ddlCompte'] = account_id
                eventtarget = "MM$SYNTHESE_SDD_RECUS$ddlCompte"
                scriptmanager = "MM$m_UpdatePanel|MM$SYNTHESE_SDD_RECUS$ddlCompte"
            self.submit_form(form, eventargument, eventtarget, scriptmanager,)
        else:
            # Go to an general levies page page where all levies are found
            if "MM$m_CH$IsMsgInit" in form:
                # Old website
                eventargument = "SDDRSYN0"
                eventtarget = "Menu_AJAX"
                scriptmanager = "m_ScriptManager|Menu_AJAX"
            else:
                # New website
                eventargument = "SDDRSYN0&codeMenu=WPS1"
                eventtarget = "MM$Menu_Ajax"
                scriptmanager = "MM$m_UpdatePanel|MM$Menu_Ajax"
            self.submit_form(form, eventargument, eventtarget, scriptmanager,)

Romain Bignon's avatar
Romain Bignon committed
    def go_list(self):
        form = self.get_form(id='main')
        if "MM$m_CH$IsMsgInit" in form:
            # Old website
            eventtarget = "Menu_AJAX"
            scriptmanager = "m_ScriptManager|Menu_AJAX"
        else:
            # New website
            eventtarget = "MM$m_PostBack"
            scriptmanager = "MM$m_UpdatePanel|MM$m_PostBack"

        self.submit_form(form, eventargument, eventtarget, scriptmanager)

    def go_cards(self):
        # Do not try to go the card summary if we have no card, it breaks the session
ntome's avatar
ntome committed
        if (
            self.browser.new_website
            and not CleanText('//form[@id="main"]//a/span[text()="Mes cartes bancaires"]')(self.doc)
        ):
            self.logger.info("Do not try to go the CardsPage, there is not link on the main page")
            return

        form = self.get_form(id='main')

        if "MM$m_CH$IsMsgInit" in form:
            # Old website
            eventtarget = "Menu_AJAX"
            eventargument = "HISENCB0"
            scriptmanager = "m_ScriptManager|Menu_AJAX"
            # New website
            eventtarget = "MM$SYNTHESE$btnSyntheseCarte"
            scriptmanager = "MM$m_UpdatePanel|MM$SYNTHESE$btnSyntheseCarte"
        self.submit_form(form, eventargument, eventtarget, scriptmanager)
    # only for old website
    def go_card_coming(self, eventargument):
        form = self.get_form(id='main')
        eventtarget = "MM$HISTORIQUE_CB"
        scriptmanager = "m_ScriptManager|Menu_AJAX"
        self.submit_form(form, eventargument, eventtarget, scriptmanager)

    # only for new website
    def go_coming(self, eventargument):
        form = self.get_form(id='main')
        eventtarget = "MM$HISTORIQUE_CB"
        scriptmanager = "MM$m_UpdatePanel|MM$HISTORIQUE_CB"
        self.submit_form(form, eventargument, eventtarget, scriptmanager)
Romain Bignon's avatar
Romain Bignon committed

    # On some pages, navigate to indexPage does not lead to the list of measures, so we need this form ...
    def go_measure_list(self):
        form = self.get_form(id='main')

        form['__EVENTARGUMENT'] = "MESLIST0"
        form['__EVENTTARGET'] = 'Menu_AJAX'
        form['m_ScriptManager'] = 'm_ScriptManager|Menu_AJAX'

        fix_form(form)

        form.submit()

    # This function goes to the accounts page of one measure giving its id
    def go_measure_accounts_list(self, measure_id):
        form = self.get_form(id='main')

        form['__EVENTARGUMENT'] = "CPTSYNT0"

        if "MM$m_CH$IsMsgInit" in form: