diff --git a/modules/hsbc/browser.py b/modules/hsbc/browser.py index 6fdecacd6f088940d72527e6ffca3d51a23ab1c3..321f6af9300fb4a133a8ec1e1668976d44339493 100644 --- a/modules/hsbc/browser.py +++ b/modules/hsbc/browser.py @@ -23,6 +23,7 @@ import ssl from datetime import timedelta, date from lxml.etree import XMLSyntaxError +from collections import OrderedDict from weboob.tools.date import LinearDateGuesser from weboob.capabilities.bank import Account, AccountNotFound @@ -34,8 +35,8 @@ from weboob.capabilities.base import find_object from .pages.account_pages import ( - AccountsPage, CBOperationPage, CPTOperationPage, LoginPage, AppGonePage, RibPage, - UnavailablePage, OtherPage, FrameContainer, ProfilePage, + AccountsPage, OwnersListPage, CBOperationPage, CPTOperationPage, LoginPage, + AppGonePage, RibPage, UnavailablePage, OtherPage, FrameContainer, ProfilePage, ) from .pages.life_insurances import ( LifeInsurancesPage, LifeInsurancePortal, LifeInsuranceMain, LifeInsuranceUseless, @@ -78,6 +79,7 @@ class HSBC(LoginBrowser): AppGonePage) rib = URL(r'/cgi-bin/emcgi', RibPage) accounts = URL(r'/cgi-bin/emcgi', AccountsPage) + owners_list = URL(r'/cgi-bin/emcgi', OwnersListPage) life_insurance_useless = URL(r'/cgi-bin/emcgi', LifeInsuranceUseless) profile = URL(r'/cgi-bin/emcgi', ProfilePage) unavailable = URL(r'/cgi-bin/emcgi', UnavailablePage) @@ -115,15 +117,16 @@ class HSBC(LoginBrowser): RetrieveUselessPage ) - # catch-all other_page = URL(r'/cgi-bin/emcgi', OtherPage) def __init__(self, username, password, secret, *args, **kwargs): super(HSBC, self).__init__(username, password, *args, **kwargs) - self.accounts_list = dict() + self.accounts_list = OrderedDict() + self.unique_accounts_list = dict() self.secret = secret self.PEA_LISTING = {} + self.owners = [] def load_state(self, state): return @@ -159,8 +162,9 @@ def do_login(self): if new_base_url in self.url: self.BASEURL = new_base_url - self.js_url = self.page.get_js_url() - home_url = self.page.get_frame() + if self.frame_page.is_here(): + home_url = self.page.get_frame() + self.js_url = self.page.get_js_url() if not home_url or not self.page.logged: raise BrowserIncorrectPassword() @@ -177,51 +181,99 @@ def go_post(self, url, data=None): url = url[:url.find('?')] self.location(url, data=q) - @need_login - def get_accounts_list(self): - if not self.accounts_list: - self.update_accounts_list() - - # go on cards page if there are cards accounts - for a in self.accounts_list.values(): - if a.type == Account.TYPE_CARD: - self.location(a.url) - break - - # get all couples (card, parent) on cards page - all_card_and_parent = [] - if self.cbPage.is_here(): - all_card_and_parent = self.page.get_all_parent_id() - self.go_post(self.js_url, data={'debr': 'COMPTES_PAN'}) - - # update cards parent and currency - for a in self.accounts_list.values(): - if a.type == Account.TYPE_CARD: - for card in all_card_and_parent: - if a.id in card[0].replace(' ', ''): - a.parent = find_object(self.accounts_list.values(), id=card[1]) - if a.parent and not a.currency: - a.currency = a.parent.currency - yield a + def go_to_owner_accounts(self, owner): + """ + The owners URLs change all the time so we must refresh them. + If we try to go to a person's accounts page while we are already + on this page, the website returns an empty page with the message + "Pas de TIERS", so we must always go to the owners list before + going to the owner's account page. + """ + if not self.owners_list.is_here(): + self.go_post(self.js_url, data={'debr': 'OPTIONS_TIE'}) + + if not self.owners_list.is_here(): + # Sometimes when we fetch info from a PEA account, the first POST + # fails and we are blocked on some owner's AccountsPage. + self.logger.warning('The owners list redirection failed, we must try again.') + self.go_post(self.js_url, data={'debr': 'OPTIONS_TIE'}) + + # Refresh owners URLs in case they changed: + self.owners = self.page.get_owners_urls() + self.go_post(self.owners[owner]) @need_login - def update_accounts_list(self, iban=True): - if self.accounts.is_here(): - self.go_post(self.js_url) + def iter_account_owners(self): + """ + Some connections have a "Compte de Tiers" section with several + people each having their own accounts. We must fetch the account + for each person and store the owner of each account. + """ + if self.unique_accounts_list: + for account in self.unique_accounts_list.values(): + yield account else: - data = {'debr': 'COMPTES_PAN'} - self.go_post(self.js_url, data=data) + self.go_post(self.js_url, data={'debr': 'OPTIONS_TIE'}) + if self.owners_list.is_here(): + self.owners = self.page.get_owners_urls() + + # self.accounts_list will be a dictionary of owners each + # containing a dictionary of the owner's accounts. + for owner in range(len(self.owners)): + self.accounts_list[owner] = {} + self.update_accounts_list(owner, True) + + # We must set an "_owner" attribute to each account. + for a in self.accounts_list[owner].values(): + a._owner = owner + + # go on cards page if there are cards accounts + for a in self.accounts_list[owner].values(): + if a.type == Account.TYPE_CARD: + self.location(a.url) + break + + # get all couples (card, parent) on cards page + all_card_and_parent = [] + if self.cbPage.is_here(): + all_card_and_parent = self.page.get_all_parent_id() + self.go_post(self.js_url, data={'debr': 'COMPTES_PAN'}) + + # update cards parent and currency + for a in self.accounts_list[owner].values(): + if a.type == Account.TYPE_CARD: + for card in all_card_and_parent: + if a.id in card[0].replace(' ', ''): + a.parent = find_object(self.accounts_list[owner].values(), id=card[1]) + if a.parent and not a.currency: + a.currency = a.parent.currency + + # We must get back to the owners list before moving to the next owner: + self.go_post(self.js_url, data={'debr': 'OPTIONS_TIE'}) + + # Fill a dictionary will all accounts without duplicating common accounts: + for owner in self.accounts_list.values(): + for account in owner.values(): + if account.id not in self.unique_accounts_list.keys(): + self.unique_accounts_list[account.id] = account + + for account in self.unique_accounts_list.values(): + yield account + @need_login + def update_accounts_list(self, owner, iban=True): + # Go to the owner's account page in case we are not there already: + self.go_to_owner_accounts(owner) for a in self.page.iter_spaces_account(): try: - self.accounts_list[a.id].url = a.url + self.accounts_list[owner][a.id].url = a.url except KeyError: - self.accounts_list[a.id] = a + self.accounts_list[owner][a.id] = a if iban: self.location(self.js_url, params={'debr': 'COMPTES_RIB'}) if self.rib.is_here(): - self.page.get_rib(self.accounts_list) + self.page.get_rib(self.accounts_list[owner]) @need_login def _quit_li_space(self): @@ -248,10 +300,9 @@ def _quit_li_space(self): @need_login def _go_to_life_insurance(self, account): self._quit_li_space() - self.go_post(account.url) - if self.frame_page.is_here() or self.life_insurance_useless.is_here() or self.life_not_found.is_here(): + if self.accounts.is_here() or self.frame_page.is_here() or self.life_insurance_useless.is_here() or self.life_not_found.is_here(): self.logger.warning('cannot go to life insurance %r', account) return False @@ -264,9 +315,8 @@ def _go_to_life_insurance(self, account): @need_login def get_history(self, account, coming=False, retry_li=True): self._quit_li_space() - - self.update_accounts_list(False) - account = self.accounts_list[account.id] + self.update_accounts_list(account._owner, False) + account = self.accounts_list[account._owner][account.id] if account.url is None: return [] @@ -307,18 +357,18 @@ def get_history(self, account, coming=False, retry_li=True): return history try: - self.go_post(self.accounts_list[account.id].url) + self.go_post(self.accounts_list[account._owner][account.id].url) # sometime go to hsbc life insurance space do logout except HTTPNotFound: self.app_gone = True self.do_logout() self.do_login() - # If we relogin on hsbc, all link have change + # If we relogin on hsbc, all links have changed if self.app_gone: self.app_gone = False - self.update_accounts_list() - self.location(self.accounts_list[account.id].url) + self.update_accounts_list(account._owner, False) + self.location(self.accounts_list[account._owner][account.id].url) if self.page is None: return [] @@ -326,6 +376,7 @@ def get_history(self, account, coming=False, retry_li=True): # for 'fusion' space if hasattr(account, '_is_form') and account._is_form: # go on accounts page to get account form + self.go_to_owner_accounts(account._owner) self.go_post(self.js_url, data={'debr': 'COMPTES_PAN'}) self.page.go_history_page(account) @@ -356,6 +407,8 @@ def _get_history(self): yield tr def get_investments(self, account, retry_li=True): + if not account.url: + raise NotImplementedError() if account.type in (Account.TYPE_LIFE_INSURANCE, Account.TYPE_CAPITALISATION): return self.get_life_investments(account, retry_li=retry_li) elif account.type == Account.TYPE_PEA: @@ -369,6 +422,8 @@ def get_investments(self, account, retry_li=True): raise NotImplementedError() def get_scpi_investments(self, account): + if not account.url: + raise NotImplementedError() # Clean account url m = re.search(r"'(.*)'", account.url) if m: @@ -377,6 +432,7 @@ def get_scpi_investments(self, account): account_url = account.url # Need to be on accounts page to go on scpi page + self.go_to_owner_accounts(account._owner) self.accounts.go() # Go on scpi page self.location(account_url) @@ -387,6 +443,7 @@ def get_scpi_investments(self, account): return self.page.iter_scpi_investment() def get_pea_investments(self, account): + self.go_to_owner_accounts(account._owner) assert account.type in (Account.TYPE_PEA, Account.TYPE_MARKET) # When invest balance is 0, there is not link to go on market page @@ -394,7 +451,10 @@ def get_pea_investments(self, account): return [] if not self.PEA_LISTING: - self._go_to_wealth_accounts() + # _go_to_wealth_accounts returns True if everything went well. + if not self._go_to_wealth_accounts(account): + self.logger.warning('Unable to connect to wealth accounts.') + return [] # Get account number without "EUR" account_id = re.search(r'\d{4,}', account.id).group(0) @@ -421,12 +481,9 @@ def get_pea_investments(self, account): return pea_invests def get_life_investments(self, account, retry_li=True): - self._quit_li_space() - - self.update_accounts_list(False) - account = self.accounts_list[account.id] - + self.update_accounts_list(account._owner, False) + account = self.accounts_list[account._owner][account.id] try: if not self._go_to_life_insurance(account): self._quit_li_space() @@ -452,16 +509,28 @@ def get_life_investments(self, account, retry_li=True): return investments - def _go_to_wealth_accounts(self): + def _go_to_wealth_accounts(self, account): if not hasattr(self.page, 'get_middle_frame_url'): # if we can catch the URL, we go directly, else we need to browse # the website - self.update_accounts_list() + self.update_accounts_list(account._owner, False) self.location(self.page.get_middle_frame_url()) + if self.page.get_patrimoine_url(): self.location(self.page.get_patrimoine_url()) self.page.go_next() + + if self.login.is_here(): + self.logger.warning('Connection to the Logon page failed, we must try again.') + self.do_login() + self.update_accounts_list(account._owner, False) + self.investment_form_page.go() + # If reloggin did not help accessing the wealth space, + # there is nothing more we can do to get there. + if not self.investment_form_page.is_here(): + return False + self.page.go_to_logon() helper = ProductViewHelper(self) # we need to go there to initialize the session @@ -469,9 +538,17 @@ def _go_to_wealth_accounts(self): self.PEA_LISTING['liquidities'] = list(helper.retrieve_liquidity()) self.PEA_LISTING['investments'] = list(helper.retrieve_invests()) self.connection.go() + return True @need_login def get_profile(self): + if not self.owners: + self.go_post(self.js_url, data={'debr': 'OPTIONS_TIE'}) + if self.owners_list.is_here(): + self.owners = self.page.get_owners_urls() + + # The main owner of the connection is always the first of the list: + self.go_to_owner_accounts(0) data = {'debr': 'PARAM'} self.go_post(self.js_url, data=data) return self.page.get_profile() diff --git a/modules/hsbc/module.py b/modules/hsbc/module.py index 60192a5aa321818ca66eaa314c8d5d270bdcd9d9..c152c25649cfa16b9be61a4ec37dfe20c4e5bab0 100644 --- a/modules/hsbc/module.py +++ b/modules/hsbc/module.py @@ -47,11 +47,11 @@ def create_default_browser(self): self.config['secret'].get()) def iter_accounts(self): - for account in self.browser.get_accounts_list(): + for account in self.browser.iter_account_owners(): yield account def get_account(self, _id): - return find_object(self.browser.get_accounts_list(), id=_id, error=AccountNotFound) + return find_object(self.browser.iter_account_owners(), id=_id, error=AccountNotFound) def iter_history(self, account): for tr in self.browser.get_history(account): diff --git a/modules/hsbc/pages/account_pages.py b/modules/hsbc/pages/account_pages.py index 71dbb723477700c8796b654718b91dee2afa75f1..089ba1ce941a859ed3478eba4e9255043cdc6a8f 100644 --- a/modules/hsbc/pages/account_pages.py +++ b/modules/hsbc/pages/account_pages.py @@ -160,13 +160,20 @@ class item(ItemElement): def condition(self): return len(self.el.xpath('./td')) > 2 + # Some accounts have no in the first def obj_label(self): - return Label(CleanText('./td[1]/a'))(self) or 'Compte sans libellé' + if self.el.xpath('./td[1]/a'): + return Label(CleanText('./td[1]/a'))(self) or 'Compte sans libellé' + return Label(CleanText('./td[1]'))(self) or 'Compte sans libellé' obj_coming = Env('coming') obj_currency = FrenchTransaction.Currency('./td[2]') - obj_url = CleanText(AbsoluteLink('./td[1]/a'), replace=[('\n', '')]) + def obj_url(self): + # Accounts without an in the have no link + if self.el.xpath('./td[1]/a'): + return CleanText(AbsoluteLink('./td[1]/a'), default=None, replace=[('\n', '')])(self) + return None obj_type = AccountsType(Field('label')) obj_coming = NotAvailable @@ -226,6 +233,13 @@ def obj_id(self): return account_id +class OwnersListPage(AccountsPage): + is_here = '//h1[text()="Comptes de tiers"]' + + def get_owners_urls(self): + return self.doc.xpath('//div[@class="GoBack"]/a/@href') + + class RibPage(GenericLandingPage): def is_here(self): return bool(self.doc.xpath('//h1[contains(text(), "RIB/IBAN")]')) @@ -244,7 +258,8 @@ def get_rib(self, accounts): form = self.get_form(name="FORM_RIB") form['index_rib'] = str(nb+1) form.submit() - self.browser.page.link_rib(accounts) + if self.browser.rib.is_here(): + self.browser.page.link_rib(accounts) class Pagination(object): diff --git a/modules/hsbc/pages/investments.py b/modules/hsbc/pages/investments.py index 15f65bebf6350b26ab600233d4d6435f0d0a931d..b1e382d17db842970e295bd17a800cd1a49a43fa 100644 --- a/modules/hsbc/pages/investments.py +++ b/modules/hsbc/pages/investments.py @@ -9,6 +9,7 @@ from weboob.capabilities import NotAvailable from weboob.capabilities.bank import Account, Investment +from weboob.tools.capabilities.bank.investments import is_isin_valid from weboob.browser.elements import ItemElement, TableElement, DictElement, method from weboob.browser.pages import HTMLPage, JsonPage, LoggedPage @@ -359,9 +360,13 @@ class item(ItemElement): klass = Investment obj_label = CleanText(Dict('productName')) - obj_code = CleanText(Dict('productIdInformation/0/productAlternativeNumber')) - obj_code_type = Investment.CODE_TYPE_ISIN obj_quantity = CleanDecimal(Dict('holdingDetailInformation/0/productHoldingQuantityCount')) + obj_code = CleanText(Dict('productIdInformation/0/productAlternativeNumber'), replace=[('-FR', '')]) + + def obj_code_type(self): + if is_isin_valid(Field('code')(self)): + return Investment.CODE_TYPE_ISIN + return NotAvailable def obj_vdate(self): vdate = Dict('holdingDetailInformation/0/productPriceUpdateDate')(self)