Skip to content
browser.py 71 KiB
Newer Older
# -*- coding: utf-8 -*-

# Copyright(C) 2012 Romain Bignon
#
# This file is part of a weboob module.
# This weboob module is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This weboob module is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this weboob module. If not, see <http://www.gnu.org/licenses/>.
ntome's avatar
ntome committed
# flake8: compatible

from __future__ import unicode_literals

import datetime
from uuid import uuid4
from collections import OrderedDict
from decimal import Decimal
from weboob.browser import LoginBrowser, need_login, StatesMixin
from weboob.browser.switch import SiteSwitch
from weboob.browser.url import URL
from weboob.capabilities.bank import (
    Account, AddRecipientStep, Recipient, TransferBankError, Transaction, TransferStep,
from weboob.capabilities.base import NotAvailable, find_object
from weboob.capabilities.bill import Subscription
from weboob.capabilities.profile import Profile
from weboob.browser.exceptions import BrowserHTTPNotFound, ClientError, ServerError
    BrowserIncorrectPassword, BrowserUnavailable, BrowserHTTPError, BrowserPasswordExpired,
    AuthMethodNotImplemented, AppValidation, AppValidationExpired,
from weboob.tools.capabilities.bank.transactions import (
    sorted_transactions, FrenchTransaction, keep_only_card_transactions,
    omit_deferred_transactions,
)
from weboob.tools.capabilities.bank.investments import create_french_liquidity
from weboob.tools.compat import urljoin, urlparse, parse_qsl, parse_qs, urlencode, urlunparse
from weboob.tools.json import json
from weboob.tools.value import Value
from weboob.tools.decorators import retry
from .pages import (
ntome's avatar
ntome committed
    IndexPage, ErrorPage, MarketPage, LifeInsurance, LifeInsuranceHistory, LifeInsuranceInvestments,
    GarbagePage, MessagePage, LoginPage,
    TransferPage, ProTransferPage, TransferConfirmPage, TransferSummaryPage, ProTransferConfirmPage,
    ProTransferSummaryPage, ProAddRecipientOtpPage, ProAddRecipientPage,
ntome's avatar
ntome committed
    SmsPage, ValidationPageOption, AuthentPage, RecipientPage, CanceledAuth,
    CaissedepargneKeyboard, CaissedepargneNewKeyboard,
    TransactionsDetailsPage, LoadingPage, ConsLoanPage, MeasurePage,
    NatixisLIHis, NatixisLIInv, NatixisRedirectPage,
    SubscriptionPage, CreditCooperatifMarketPage, UnavailablePage,
    CardsPage, CardsComingPage, CardsOldWebsitePage, TransactionPopupPage,
    OldLeviesPage, NewLeviesPage, NewLoginPage, JsFilePage, AuthorizePage,
    AuthenticationMethodPage, VkImagePage, AuthenticationStepPage, LoginTokensPage,
from .transfer_pages import CheckingPage, TransferListPage
from .linebourse_browser import LinebourseAPIBrowser

__all__ = ['CaisseEpargne']


def decode_utf8_cookie(data):
    # caissedepargne/palatine cookies may contain non-ascii bytes which is ill-defined.
    # Actually, they use utf-8.
    # Since it's not standard, requests/urllib interprets it freely... as latin-1
    # and we can't really blame for that.
    # Let's decode this shit ourselves.
    if sys.version_info.major == 2 and isinstance(data, bytes):
        # on top of that, sometimes the cookie is already unicode
        # which part does this? urllib? requests?
        # who knows, in the end we have to avoid puking despite the stench
        return data.decode('utf-8')
    else:
        return data.encode('latin-1').decode('utf-8')


def monkeypatch_for_lowercase_percent(session):
    # In the transfer flow, the main site (something like net123.caisse-epargne.fr)
    # redirects to the OTP site (something like www.icgauth.caisse-epargne.fr).
    # %2F is equivalent to %2f, right? It's hexadecimal after all. That's what
    # RFC3986, RFC2396, RFC1630 say, also normalization of case is possible.
    # That's what requests and urllib3 implement.
    # But some dumbasses think otherwise and simply violate the RFCs.
    # They SHOULD [interpreted as described in RFC2119] step away from the computer
    # and never touch it again because they are obviously too stupid to use it.
    # So, we are forced to hack deep in urllib3 to force our custom URL tweaking.

    def patch_attr(obj, attr, func):
        if hasattr(obj, '_old_%s' % attr):
            return

        old_func = getattr(obj, attr)
        setattr(obj, '_old_%s' % attr, old_func)
        setattr(obj, attr, func)

    pm = session.adapters['https://'].poolmanager

    def connection_from_host(*args, **kwargs):
        pool = pm._old_connection_from_host(*args, **kwargs)

        def make_request(conn, method, url, *args, **kwargs):
            if url.startswith('/dacswebssoissuer/AuthnRequestServlet'):
                # restrict this hazardous change to otp urls
                url = re.sub(r'%[0-9A-F]{2}', lambda m: m.group(0).lower(), url)
            return pool._old__make_request(conn, method, url, *args, **kwargs)

        patch_attr(pool, '_make_request', make_request)
        return pool

    patch_attr(pm, 'connection_from_host', connection_from_host)


class CaisseEpargne(LoginBrowser, StatesMixin):
    BASEURL = "https://www.caisse-epargne.fr"
    STATE_DURATION = 5
    login = URL(
        r'/authentification/manage\?step=identification&identifiant=(?P<login>.*)',
        r'https://.*/login.aspx',
        LoginPage
    )

    new_login = URL(r'/se-connecter/sso', NewLoginPage)
    js_file = URL(r'/se-connecter/main-.*.js$', JsFilePage)

    authorize = URL(r'https://www.as-ex-ath-groupe.caisse-epargne.fr/api/oauth/v2/authorize', AuthorizePage)
    login_tokens = URL(r'https://www.as-ex-ath-groupe.caisse-epargne.fr/api/oauth/v2/consume', LoginTokensPage)

    # Login and transfer authentication
    authentication_step = URL(
ntome's avatar
ntome committed
        r'https://(?P<domain>www.icgauth.[^/]+)/dacsrest/api/v1u0/transaction/(?P<validation_id>[^/]+)/step',
        AuthenticationStepPage
    )
    authentication_method_page = URL(
        r'https://(?P<domain>www.icgauth.[^/]+)/dacsrest/api/v1u0/transaction/(?P<validation_id>)',
        r'https://www.icgauth.caisse-epargne.fr/dacsrest/api/v1u0/transaction/.*',
        AuthenticationMethodPage,
        r'https://(?P<domain>www.icgauth.[^/]+)/dacs-rest-media/api/v1u0/medias/mappings/[a-z0-9-]+/images',

    # eg of both possible regexes:
    # https://www.icgauth.caisse-epargne.fr/dacstemplate-SOL/index.html?transactionID=CtxDACSP[a-f0-9]+
    # https://www.icgauth.caisse-epargne.fr/dacstemplate-SOL/_12579/index.html?transactionID=CtxDACSP[a-f0-9]+
ntome's avatar
ntome committed
    validation_option = URL(
        r'https://(?P<domain>www.icgauth.[^/]+)/dacstemplate-SOL/(?:[^/]+/)?index.html\?transactionID=.*',
        ValidationPageOption
    )
    sms = URL(r'https://(?P<domain>www.icgauth.[^/]+)/dacswebssoissuer/AuthnRequestServlet', SmsPage)
    app_validation = URL(r'https://(?P<domain>www.icgauth.[^/]+)/dacsrest/WaitingCallbackHandler', AppValidationPage)
ntome's avatar
ntome committed
    account_login = URL(
        r'/authentification/manage\?step=account&identifiant=(?P<login>.*)&account=(?P<accountType>.*)',
        LoginPage
    )
    loading = URL(r'https://.*/CreditConso/ReroutageCreditConso.aspx', LoadingPage)
ntome's avatar
ntome committed
    cons_loan = URL(
        r'https://www.credit-conso-cr.caisse-epargne.fr/websavcr-web/rest/contrat/getContrat\?datePourIe=(?P<datepourie>)',
        ConsLoanPage
    )
    transaction_detail = URL(r'https://.*/Portail.aspx.*', TransactionsDetailsPage)
    recipient = URL(r'https://.*/Portail.aspx.*', RecipientPage)
    checking = URL(r'https://.*/Portail.aspx.*', CheckingPage)
    transfer_list = URL(r'https://.*/Portail.aspx.*', TransferListPage)
    transfer = URL(r'https://.*/Portail.aspx.*', TransferPage)
    transfer_summary = URL(r'https://.*/Portail.aspx.*', TransferSummaryPage)
    transfer_confirm = URL(r'https://.*/Portail.aspx.*', TransferConfirmPage)
    pro_transfer = URL(r'https://.*/Portail.aspx.*', ProTransferPage)
    pro_transfer_confirm = URL(r'https://.*/Portail.aspx.*', ProTransferConfirmPage)
    pro_transfer_summary = URL(r'https://.*/Portail.aspx.*', ProTransferSummaryPage)
    pro_add_recipient_otp = URL(r'https://.*/Portail.aspx.*', ProAddRecipientOtpPage)
    pro_add_recipient = URL(r'https://.*/Portail.aspx.*', ProAddRecipientPage)
    measure_page = URL(r'https://.*/Portail.aspx.*', MeasurePage)
    cards_old = URL(r'https://.*/Portail.aspx.*', CardsOldWebsitePage)
    cards = URL(r'https://.*/Portail.aspx.*', CardsPage)
    cards_coming = URL(r'https://.*/Portail.aspx.*', CardsComingPage)
    old_checkings_levies = URL(r'https://.*/Portail.aspx.*', OldLeviesPage)
    new_checkings_levies = URL(r'https://.*/Portail.aspx.*', NewLeviesPage)
    authent = URL(r'https://.*/Portail.aspx.*', AuthentPage)
    subscription = URL(r'https://.*/Portail.aspx\?tache=(?P<tache>).*', SubscriptionPage)
    transaction_popup = URL(r'https://.*/Portail.aspx.*', TransactionPopupPage)
    home = URL(r'https://.*/Portail.aspx.*', IndexPage)
    home_tache = URL(r'https://.*/Portail.aspx\?tache=(?P<tache>).*', IndexPage)
    error = URL(
        r'https://.*/login.aspx',
        r'https://.*/Pages/logout.aspx.*',
        r'https://.*/particuliers/Page_erreur_technique.aspx.*',
        ErrorPage
    )
    market = URL(
        r'https://.*/Pages/Bourse.*',
        r'https://www.caisse-epargne.offrebourse.com/ReroutageSJR',
        r'https://www.caisse-epargne.offrebourse.com/fr/6CE.*',
        MarketPage
    )
    unavailable_page = URL(r'https://www.caisse-epargne.fr/.*/au-quotidien', UnavailablePage)

    creditcooperatif_market = URL(r'https://www.offrebourse.com/.*', CreditCooperatifMarketPage)  # just to catch the landing page of the Credit Cooperatif's Linebourse
    natixis_redirect = URL(
        r'/NaAssuranceRedirect/NaAssuranceRedirect.aspx',
        r'https://www.espace-assurances.caisse-epargne.fr/espaceinternet-ce/views/common/routage-itce.xhtml\?windowId=automatedEntryPoint',
        NatixisRedirectPage
    )
ntome's avatar
ntome committed
    life_insurance_history = URL(
        r'https://www.extranet2.caisse-epargne.fr/cin-front/contrats/evenements',
        LifeInsuranceHistory
    )
    life_insurance_investments = URL(
        r'https://www.extranet2.caisse-epargne.fr/cin-front/contrats/details',
        LifeInsuranceInvestments
    )
    life_insurance = URL(
        r'https://.*/Assurance/Pages/Assurance.aspx',
        r'https://www.extranet2.caisse-epargne.fr.*',
        LifeInsurance
    )
ntome's avatar
ntome committed
    natixis_life_ins_his = URL(
        r'https://www.espace-assurances.caisse-epargne.fr/espaceinternet-ce/rest/v2/contratVie/load-operation/(?P<id1>\w+)/(?P<id2>\w+)/(?P<id3>)',
        NatixisLIHis
    )
    natixis_life_ins_inv = URL(
        r'https://www.espace-assurances.caisse-epargne.fr/espaceinternet-ce/rest/v2/contratVie/load/(?P<id1>\w+)/(?P<id2>\w+)/(?P<id3>)',
        NatixisLIInv
    )
    message = URL(r'https://www.caisse-epargne.offrebourse.com/DetailMessage\?refresh=O', MessagePage)
    garbage = URL(
        r'https://www.caisse-epargne.offrebourse.com/Portefeuille',
        r'https://www.caisse-epargne.fr/particuliers/.*/emprunter.aspx',
        r'https://.*/particuliers/emprunter.*',
        r'https://.*/particuliers/epargner.*',
        GarbagePage
    )
    __states__ = (
        'BASEURL', 'multi_type', 'typeAccount', 'is_cenet_website', 'recipient_form',
        'is_send_sms', 'is_app_validation', 'otp_validation',
    # Accounts managed in life insurance space (not in linebourse)
ntome's avatar
ntome committed
    insurance_accounts = (
        'AIKIDO',
        'ASSURECUREUIL',
        'ECUREUIL PROJET',
        'GARANTIE RETRAITE EU',
        'INITIATIVES PLUS',
        'INITIATIVES TRANSMIS',
        'LIVRET ASSURANCE VIE',
        'OCEOR EVOLUTION',
        'PATRIMONIO CRESCENTE',
        'PEP TRANSMISSION',
        'PERP',
        'PERSPECTIVES ECUREUI',
        'POINTS RETRAITE ECUR',
        'RICOCHET',
        'SOLUTION PERP',
        'TENDANCES',
        'YOGA',
    )
    def __init__(self, nuser, *args, **kwargs):
        self.BASEURL = kwargs.pop('domain', self.BASEURL)
        if not self.BASEURL.startswith('https://'):
            self.BASEURL = 'https://%s' % self.BASEURL
        self.is_cenet_website = False
        self.accounts = None
        self.loans = None
        self.typeAccount = None
        self.inexttype = 0  # keep track of index in the connection type's list
        self.nuser = nuser
        self.recipient_form = None
        self.weboob = kwargs['weboob']
        self.market_url = kwargs.pop(
            'market_url',
            'https://www.caisse-epargne.offrebourse.com',
        )
        super(CaisseEpargne, self).__init__(*args, **kwargs)
        dirname = self.responses_dirname
        if dirname:
            dirname += '/bourse'

        self.linebourse = self.LINEBOURSE_BROWSER(
            self.market_url,
            logger=self.logger,
            responses_dirname=dirname,
            weboob=self.weboob,
            proxy=self.PROXIES,
        )
        monkeypatch_for_lowercase_percent(self.session)

    def deleteCTX(self):
        # For connection to offrebourse and natixis, we need to delete duplicate of CTX cookie
        if len([k for k in self.session.cookies.keys() if k == 'CTX']) > 1:
            del self.session.cookies['CTX']

    def load_state(self, state):
        if state.get('expire') and parser.parse(state['expire']) < datetime.datetime.now():
            return self.logger.info('State expired, not reloading it from storage')

        transfer_states = ('recipient_form', 'is_app_validation', 'is_send_sms', 'otp_validation')

        for transfer_state in transfer_states:
            if transfer_state in state and state[transfer_state] is not None:
                super(CaisseEpargne, self).load_state(state)
                self.logged = True
                break
    def locate_browser(self, state):
        # in case of transfer/add recipient, we shouldn't go back to previous page
        # site will crash else
        pass
    def do_login(self):
        data = self.get_connection_data()
        accounts_types = data.get('account')

        if data.get('authMode', '') == 'redirect':  # the connection type EU could also be used as a criteria
            raise SiteSwitch('cenet')

        type_account = data['account'][0]

        if self.multi_type:
            assert type_account == self.typeAccount

        if 'keyboard' in data:
            self.do_old_login(data, type_account, accounts_types)
        else:
            # New virtual keyboard
            self.do_new_login(data)

    def get_connection_data(self):
        """
        Attempt to log in.
        Note: this method does nothing if we are already logged in.
        """
        # Among the parameters used during the login step, there is
        # a connection type (called typeAccount) that can take the
        # following values:
        # WE: espace particulier
        # WP: espace pro
        # WM: personnes protégées
        # EU: Cenet
        #
        # A connection can have one connection type as well as many of
        # them. There is an issue when there is many connection types:
        # the connection type to use can't be guessed in advance, we
        # have to test all of them until the login step is successful
        # (sometimes all connection type can be used for the login, sometimes
        # only one will work).
        #
        # For simplicity's sake, we try each connection type from first to
        # last (they are returned in a list by the first request)
        #
        # Examples of connection types combination that have been seen so far:
        # [WE]
        # [WP]
        # [WE, WP]
        # [WE, WP, WM]
        # [WP, WM]
        # [EU]
        # [EU, WE]  (EU tends to come first when present)
        if not self.username or not self.password:
            raise BrowserIncorrectPassword()
        @retry(ValueError)
        def retry_go_login():
            """
            On occasions the page is not the expected JsonPage,
            although response is a code 200,
            and trying to parse it as such would throw a JSONDecodeError.
            Retrying does the trick and avoids raising a BrowserUnavailable.
            """
            return self.login.go(login=self.username)

        # Retrieve the list of types: can contain a single type or more
        # - when there is a single type: all the information are available
        # - when there are several types: an additional request is needed

        data = connection.get_response()
        if data is None:
Baptiste Delpey's avatar
Baptiste Delpey committed
            raise BrowserIncorrectPassword()
        data = self.check_connection_data(data)
        assert data is not None
        return data

    def check_connection_data(self, data):
        accounts_types = data.get('account', [])
        if not self.nuser and 'WE' not in accounts_types:
            raise BrowserIncorrectPassword("Utilisez Caisse d'Épargne Professionnels et renseignez votre nuser pour connecter vos comptes sur l'epace Professionels ou Entreprises.")

        if len(accounts_types) > 1:
            # Additional request when there is more than one connection type
            # to "choose" from the list of connection types
            if self.inexttype < len(accounts_types):
                if accounts_types[self.inexttype] == 'EU' and not self.nuser:
                    # when EU is present and not alone, it tends to come first
                    # if nuser is unset though, user probably doesn't want 'EU'
                    self.inexttype += 1
                elif accounts_types[self.inexttype] == 'WE' and self.nuser:
                    # User is probably a netpro user and want to access their
                    # professional accounts
                    self.inexttype += 1
                self.typeAccount = accounts_types[self.inexttype]
ntome's avatar
ntome committed
                raise AssertionError('should have logged in with at least one connection type')
            data = self.account_login.go(login=self.username, accountType=self.typeAccount).get_response()


    def do_old_login(self, data, type_account, accounts_types):
        # Old virtual keyboard
        id_token_clavier = data['keyboard']['Id']
        vk = CaissedepargneKeyboard(data['keyboard']['ImageClavier'], data['keyboard']['Num']['string'])
        newCodeConf = vk.get_string_code(self.password)

            'idTokenClavier': id_token_clavier,
            'newCodeConf': newCodeConf,
            'auth_mode': 'ajax',
            'nuusager': self.nuser.encode('utf-8'),
ntome's avatar
ntome committed
            'codconf': '',  # must be present though empty
            'typeAccount': type_account,
            'step': 'authentification',
            'ctx': 'typsrv={}'.format(type_account),
            'clavierSecurise': '1',
ntome's avatar
ntome committed
            'nuabbd': self.username,
            res = self.location(data['url'], params=payload)
            raise BrowserUnavailable()
        if not res.page:
            raise BrowserUnavailable()

        response = res.page.get_response()

        assert response is not None

        if response['error'] == 'Veuillez changer votre mot de passe':
            raise BrowserPasswordExpired(response['error'])

        if not response['action']:
            # the only possible way to log in w/o nuser is on WE. if we're here no need to go further.
            if not self.nuser and self.typeAccount == 'WE':
                raise BrowserIncorrectPassword(self.page.get_wrongpass_message())
            # all typeAccount tested and still not logged
            # next iteration will throw the AssertionError if we don't raise an error here
            if self.inexttype == len(accounts_types):
                raise BrowserIncorrectPassword(self.page.get_wrongpass_message())
            if self.multi_type:
                # try to log in with the next connection type's value
                self.do_login()
                return
            raise BrowserIncorrectPassword(self.page.get_wrongpass_message())
        self.BASEURL = urljoin(data['url'], '/')
Baptiste Delpey's avatar
Baptiste Delpey committed
        try:
            self.home.go()
Baptiste Delpey's avatar
Baptiste Delpey committed
        except BrowserHTTPNotFound:
            raise BrowserIncorrectPassword()
    def get_auth_mechanisms_validation_info(self):
        """ First step of strong authentication validation
        This method retrieve all informations needed for validation form.
        Warning: need to be on `validation_option` page to get the "transaction ID".
        """
        transaction_id = re.search(r'transactionID=(.*)', self.page.url)
        if transaction_id:
            transaction_id = transaction_id.group(1)
        else:
ntome's avatar
ntome committed
            raise AssertionError('Transfer transaction id was not found in url')

        otp_validation_domain = urlparse(self.url).netloc

        self.authentication_method_page.go(
            domain=otp_validation_domain,
            validation_id=transaction_id
        )

        # Can have error at first authentication request.
        # In that case, it's not an invalid otp error.
        # So, return a wrongpass.
        self.page.check_errors(feature='login')

        self.otp_validation = self.page.get_authentication_method_info()

        if self.otp_validation['type'] not in ('SMS', 'CLOUDCARD', 'PASSWORD'):
            self.logger.warning('Not handled authentication method : "%s"' % self.otp_validation['type'])
            raise AuthMethodNotImplemented()
        self.otp_validation['validation_unit_id'] = self.page.validation_unit_id
        self.otp_validation['validation_id'] = transaction_id
        self.otp_validation['domain'] = otp_validation_domain

    def do_otp_sms_authentication(self, **params):
        """ Second step of sms authentication validation

        This method validate otp sms.
        Warning:
        * need to be used through `do_authentication_validation` method
        in order to handle authentication response
        * do not forget to use the first part to have all form information
        * do not forget to set `otp_sms` params

        Parameters:
        otp_sms (str): the OTP received by SMS
        """
        assert self.otp_validation
        assert 'otp_sms' in params

        self.authentication_step.go(
            domain=self.otp_validation['domain'],
            validation_id=self.otp_validation['validation_id'],
            json={
                'validate': {
                    self.otp_validation['validation_unit_id']: [{
                        'id': self.otp_validation['id'],
                        'otp_sms': params['otp_sms'],
                        'type': 'SMS',
                    }],
                },
            }
        )
        self.otp_validation = None

    def do_cloudcard_authentication(self, **params):
        """ Second step of cloudcard authentication validation

        This method check the application validation status.
        Warning:
        * need to be used through `do_authentication_validation` method
        in order to handle authentication response
        * do not forget to use the first part to have all form information
        """
        assert self.otp_validation

        timeout = time.time() + 300.0
        referer_url = self.authentication_method_page.build(
            domain=self.otp_validation['domain'],
            validation_id=self.otp_validation['validation_id'],
        )

        while time.time() < timeout:
            self.app_validation.go(
                domain=self.otp_validation['domain'],
                headers={'Referer': referer_url},
            )
            status = self.page.get_status()
            # The status is 'valid' even when the user cancels it on
            # the application. The `authentication_step` will return
            # AUTHENTICATION_CANCELED in its response status.
            if status == 'valid':
                self.authentication_step.go(
                    domain=self.otp_validation['domain'],
                    validation_id=self.otp_validation['validation_id'],
                    json={
                        'validate': {
                            self.otp_validation['validation_unit_id']: [{
                                'id': self.otp_validation['id'],
                                'type': 'CLOUDCARD',
                            }],
                        },
                    },
                )
                break

            assert status == 'progress', 'Unhandled CloudCard status : "%s"' % status
            time.sleep(2)
        else:
            raise AppValidationExpired()

        self.otp_validation = None

    def do_vk_authentication(self, **params):
        """ Authentication with virtual keyboard

        Warning: need to be used through `do_authentication_validation` method
        in order to handle authentication response
        """

        # Can have error at first authentication request.
        # In that case, it's not a vk error, return a wrongpass.
        self.page.check_errors(feature='login')

        validation_id = self.page.get_validation_id()
        validation_unit_id = self.page.validation_unit_id

        vk_info = self.page.get_authentication_method_info()
        vk_id = vk_info['id']
        vk_images_url = vk_info['virtualKeyboard']['externalRestMediaApiUrl']
        otp_validation_domain = urlparse(self.url).netloc

        self.location(vk_images_url)
        images_url = self.page.get_all_images_data()
        vk = CaissedepargneNewKeyboard(self, images_url)
        code = vk.get_string_code(self.password)

        self.authentication_step.go(
            domain=otp_validation_domain,
            validation_id=validation_id,
            json={
                'validate': {
                    validation_unit_id: [{
                        'id': vk_id,
                        'password': code,
                        'type': 'PASSWORD',
                    }],
                },
            },
            headers={
                'Referer': self.BASEURL,
                'Accept': 'application/json, text/plain, */*',
            },
        )

    def do_authentication_validation(self, authentication_method, feature, **params):
        """ Handle all sort of authentication with `icgauth`

        This method is used for login or transfer/new recipient authentication.

        Parameters:
        authentication_method (str): authentication method in ('SMS', 'CLOUDCARD', 'PASSWORD')
        feature (str): action that need authentication in ('login', 'transfer', 'recipient')
        """
        AUTHENTICATION_METHODS = {
            'SMS': self.do_otp_sms_authentication,
            'CLOUDCARD': self.do_cloudcard_authentication,
            'PASSWORD': self.do_vk_authentication,
        }
        AUTHENTICATION_METHODS[authentication_method](**params)

        assert self.authentication_step.is_here()
        self.page.check_errors(feature=feature)

        redirect_data = self.page.get_redirect_data()
        assert redirect_data, 'redirect_data must not be empty'

        self.location(
            redirect_data['action'],
            data={
                'SAMLResponse': redirect_data['samlResponse'],
            },
            headers={
                'Referer': self.BASEURL,
                'Accept': 'application/json, text/plain, */*',
            },
        )

    def do_new_login(self, data):
        connection_type = self.page.get_connection_type()
        csid = str(uuid4())
        redirect_url = data['url']

        parts = list(urlparse(redirect_url))
        url_params = parse_qs(urlparse(redirect_url).query)

        qs = OrderedDict(parse_qsl(parts[4]))
        qs.update({'csid': csid})
        parts[4] = urlencode(qs)
        url = urlunparse(parts)

        continue_url = url_params['continue'][0]
        continue_parameters = data['continueParameters']

        # snid is either present in continue_parameters (creditcooperatif / banquebcp)
        # or in url_params (caissedepargne / other children)
        snid = json.loads(continue_parameters).get('snid') or url_params['snid'][0]

        self.location(
            url,
            method='POST',
            params={
                'continue_parameters': continue_parameters,
            },
        )

        main_js_file = self.page.get_main_js_file_url()
        self.location(main_js_file)

        client_id = self.page.get_client_id()
        nonce = self.page.get_nonce()  # Hardcoded in their js...

        # On the website, this sends back json because of the header
        # 'Accept': 'applcation/json'. If we do not add this header, we
        # instead have a form that we can directly send to complete
        # the login.
ntome's avatar
ntome committed

        claims = {
            'userinfo': {
                'cdetab': None,
                'authMethod': None,
                'authLevel': None,
            },
            'id_token': {
                'auth_time': {"essential": True},
                "last_login": None,
            },
        }
        bpcesta = {
            "csid": csid,
            "typ_app": "rest",
            "enseigne": "ce",
            "typ_sp": "out-band",
            "typ_act": "auth",
            "snid": snid,
            "cdetab": url_params['cdetab'][0],
            "typ_srv": connection_type,
ntome's avatar
ntome committed
        }
        params = {
            'nonce': nonce,
            'scope': 'openid readUser',
            'response_type': 'id_token token',
            'response_mode': 'form_post',
            'cdetab': url_params['cdetab'][0],
            'login_hint': self.username,
            'display': 'page',
            'client_id': client_id,
            # don't know if the separators= is really needed
            'claims': json.dumps(claims, separators=(',', ':')),
            'bpcesta': json.dumps(bpcesta, separators=(',', ':')),
        }
        if self.nuser:
            params['login_hint'] += ' %s' % self.nuser
        self.authorize.go(params=params)
        if self.response.headers.get('Page_Erreur', '') == 'INDISPO':
            raise BrowserUnavailable()

        pre_login_status = self.page.get_wrong_pre_login_status()
        if pre_login_status == 'AUTHENTICATION_FAILED':
            # failing at this step means no password has been submitted yet
            # and no auth method type cannot be recovered
            # corresponding to 'erreur technique' on website
            raise BrowserUnavailable()

        authentication_method = self.page.get_authentication_method_type()
        self.do_authentication_validation(
            authentication_method=authentication_method,
            feature='login'
        )

        access_token = self.page.get_access_token()
        id_token = self.page.get_id_token()

        continue_parameters = json.loads(continue_parameters)
        self.location(
            continue_url,
            data={
                'id_token': id_token,
                'access_token': access_token,
                'ctx': continue_parameters['ctx'],
                'redirectUrl': continue_parameters['redirectUrl'],
                'ctx_routage': continue_parameters['ctx_routage'],
            },
        )
        # Url look like this : https://www.net382.caisse-epargne.fr/Portail.aspx
        # We only want the https://www.net382.caisse-epargne.fr part
        # We start the .find at 8 to get the first `/` after `https://`
        parsed_url = urlparse(self.url)
        self.BASEURL = 'https://' + parsed_url.netloc

    def loans_conso(self):
        days = ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun')
        month = ('Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec')
        now = datetime.datetime.today()
        # for non-DST
        # d = '%s %s %s %s %s:%s:%s GMT+0100 (heure normale d’Europe centrale)' % (days[now.weekday()], now.day, month[now.month - 1], now.year, now.hour, format(now.minute, "02"), now.second)
        # TODO use babel library to simplify this code
ntome's avatar
ntome committed
        d = '%s %s %s %s %s:%s:%s GMT+0200 (heure d’été d’Europe centrale)' % (
            days[now.weekday()], now.day, month[now.month - 1], now.year,
            now.hour, format(now.minute, "02"), now.second,
        )
        if self.home.is_here():
            msg = self.page.loan_unavailable_msg()
            if msg:
                self.logger.warning('%s' % msg)
                return None
        self.cons_loan.go(datepourie=d)
        return self.page.get_conso()

ntome's avatar
ntome committed
    def go_measure_list(self, page_num=0):
        self.home.go()

        if not self.measure_page.is_here():
ntome's avatar
ntome committed
            raise AssertionError('Should be on measure_page')

        self.page.go_measure_list()
        for _ in range(page_num):
            self.page.goto_next_page()

        # Get name from profile to verify who is the owner of accounts.
        name = self.get_profile().name.upper().split(' ', 1)
        if len(name) == 2:  # if the name is complete (with first and last name)
            owner_name = name[1]
        else:  # if there is only first name
            owner_name = name[0]
        return owner_name

    @need_login
    def get_measure_accounts_list(self):
        """
        On home page there is a list of "measure" links, each one leading to one person accounts list.
        Iter over each 'measure' and navigate to it to get all accounts
        """
        self.home.go()

        owner_name = self.get_owner_name()
        # Make sure we are on list of measures page
        if self.measure_page.is_here():
            self.page.check_no_accounts()
            self.accounts = []
            for page_num in range(20):
                for measure_id in self.page.get_measure_ids():
                    self.page.go_measure_accounts_list(measure_id)
                    if self.page.check_measure_accounts():
                        for account in self.page.get_list(owner_name):
                            account._info['measure_id'] = measure_id
                            account._info['measure_id_page_num'] = page_num
                            self.accounts.append(account)
                    self.go_measure_list(page_num)
                if not self.page.has_next_page():
                    break
                self.page.goto_next_page()

            for account in self.accounts:
                if 'acc_type' in account._info and account._info['acc_type'] == Account.TYPE_LIFE_INSURANCE:
                    self.go_measure_list(account._info['measure_id_page_num'])
                    self.page.go_measure_accounts_list(account._info['measure_id'])
                    self.page.go_history(account._info)

                    if self.message.is_here():
                        self.page.submit()
                        self.page.go_history(account._info)

                    balance = self.page.get_measure_balance(account)
                    account.balance = Decimal(FrenchTransaction.clean_amount(balance))
                    account.currency = account.get_currency(balance)

        return self.accounts

    def update_linebourse_token(self):
        assert self.linebourse is not None, "linebourse browser should already exist"
        self.linebourse.session.cookies.update(self.session.cookies)
        # It is important to fetch the domain dynamically because
        # for caissedepargne the domain is 'www.caisse-epargne.offrebourse.com'
        # whereas for creditcooperatif it is 'www.offrebourse.com'
        domain = urlparse(self.url).netloc
        self.linebourse.session.headers['X-XSRF-TOKEN'] = self.session.cookies.get('XSRF-TOKEN', domain=domain)
    def add_linebourse_accounts_data(self):
        for account in self.accounts:
            self.deleteCTX()
            if account.type in (Account.TYPE_MARKET, Account.TYPE_PEA):
                self.home_tache.go(tache='CPTSYNT0')
                self.page.go_history(account._info)
                    self.page.go_history(account._info)
                # Some users may not have access to this.
                if not self.market.is_here():
                    continue
                self.page.submit()
                    # Some users may not have access to this.
                    self.update_linebourse_token()
                    page = self.linebourse.go_portfolio(account.id)
                    assert self.linebourse.portfolio.is_here()
                    # We must declare "page" because this URL also matches MarketPage
                    account.valuation_diff = page.get_valuation_diff()
                    # We need to go back to the synthesis, else we can not go home later
                    self.home_tache.go(tache='CPTSYNT0')
                else:
ntome's avatar
ntome committed
                    raise AssertionError("new domain that hasn't been seen so far?")
    def add_card_accounts(self):
        """
        Card cases are really tricky on the new website.
        There are 2 kinds of page where we can find cards information
            - CardsPage: List some of the PSU cards
            - CardsComingPage: On the coming transaction page (for a specific checking account),
                we can find all cards related to this checking account. Information to reach this
                CC is in the home page

        We have to go through this both kind of page for those reasons:
                - If there is no coming yet, the card will not be found in the home page and we will not
                be able to reach the CardsComingPage. But we can find it on CardsPage
                - Some cards are only on the CardsComingPage and not the CardsPage
                - In CardsPage, there are cards (with "Business" in the label) without checking account on the
                website (neither history nor coming), so we skip them.
                - Some card on the CardsPage that have a checking account parent, but if we follow the link to
                reach it with CardsComingPage, we find an other card that is not in CardsPage.
        """
        if self.new_website:
            for account in self.accounts:
                # Adding card's account that we find in CardsComingPage of each Checking account
                if account._card_links:
                    self.home.go()
                    self.page.go_history(account._card_links)
                    for card in self.page.iter_cards():
                        card.parent = account
                        card._coming_info = self.page.get_card_coming_info(card.number, card.parent._card_links.copy())
                        card.ownership = account.ownership
                        self.accounts.append(card)
        self.home.go()
        self.page.go_list()
        self.page.go_cards()

        # We are on the new website. We already added some card, but we can find more of them on the CardsPage
        if self.cards.is_here():
            for card in self.page.iter_cards():
                card.parent = find_object(self.accounts, number=card._parent_id)
                assert card.parent, 'card account parent %s was not found' % card

                # If we already added this card, we don't have to add it a second time
                if find_object(self.accounts, number=card.number):
                    continue

                info = card.parent._card_links

                # If card.parent._card_links is not filled, it mean this checking account
                # has no coming transactions.
                card._coming_info = None
                card.ownership = card.parent.ownership
                if info:
                    self.page.go_list()
                    self.page.go_history(info)
                    card._coming_info = self.page.get_card_coming_info(card.number, info.copy())

                    if not card._coming_info:
                        self.logger.warning('Skip card %s (not found on checking account)', card.number)
                self.accounts.append(card)

        # We are on the old website. We add all card that we can find on the CardsPage
        elif self.cards_old.is_here():
            for card in self.page.iter_cards():
                card.parent = find_object(self.accounts, number=card._parent_id)
                assert card.parent, 'card account parent %s was not found' % card.number