Newer
Older
# Copyright(C) 2012-2013 Romain Bignon
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
Romain Bignon
committed
from decimal import Decimal
from weboob.capabilities.base import find_object, NotAvailable
from weboob.capabilities.bank import (
CapBankWealth, CapBankTransferAddRecipient, AccountNotFound, RecipientNotFound,
Account, TransferError
)
from weboob.capabilities.contact import CapContact
from weboob.capabilities.profile import CapProfile
from weboob.capabilities.bill import (
CapDocument, Subscription, SubscriptionNotFound,
Document, DocumentNotFound,
)
from weboob.tools.backend import Module, BackendConfig
from weboob.tools.value import ValueBackendPassword
from .browser import CreditMutuelBrowser
Module, CapBankWealth, CapBankTransferAddRecipient, CapDocument,
CapContact, CapProfile,
MAINTAINER = u'Julien Veyssier'
DESCRIPTION = u'Crédit Mutuel'
CONFIG = BackendConfig(ValueBackendPassword('login', label='Identifiant', masked=False),
ValueBackendPassword('password', label='Mot de passe'))
BROWSER = CreditMutuelBrowser
def create_default_browser(self):
return self.create_browser(self.config['login'].get(), self.config['password'].get())
def iter_accounts(self):
for account in self.browser.get_accounts_list():
yield account
def get_account(self, _id):
account = self.browser.get_account(_id)
if account:
return account
else:
raise AccountNotFound()
for tr in self.browser.get_history(account):
if tr._is_coming:
yield tr
for tr in self.browser.get_history(account):
if not tr._is_coming:
yield tr
def iter_investment(self, account):
return self.browser.get_investment(account)
def iter_transfer_recipients(self, origin_account):
if not self.browser.is_new_website:
raise NotImplementedError()
if not isinstance(origin_account, Account):
origin_account = find_object(self.iter_accounts(), id=origin_account, error=AccountNotFound)
return self.browser.iter_recipients(origin_account)
def new_recipient(self, recipient, **params):
return self.browser.new_recipient(recipient, **params)
def init_transfer(self, transfer, **params):
# There is a check on the website, transfer can't be done with too long reason.
if transfer.label:
transfer.label = transfer.label[:27]
self.logger.info('Going to do a new transfer')
if transfer.account_iban:
account = find_object(self.iter_accounts(), iban=transfer.account_iban, error=AccountNotFound)
else:
Romain Bignon
committed
account = find_object(self.iter_accounts(), id=transfer.account_id, error=AccountNotFound)
if transfer.recipient_iban:
recipient = find_object(self.iter_transfer_recipients(account.id), iban=transfer.recipient_iban, error=RecipientNotFound)
else:
Romain Bignon
committed
recipient = find_object(self.iter_transfer_recipients(account.id), id=transfer.recipient_id, error=RecipientNotFound)
except AssertionError:
raise TransferError('Account id is invalid')
try: # quantize to show 2 decimals.
amount = Decimal(transfer.amount).quantize(Decimal(10) ** -2)
except ValueError:
raise TransferError('Transfer amount is invalid')
# drop characters that can crash website
transfer.label = transfer.label.encode('cp1252', errors="ignore").decode('cp1252')
return self.browser.init_transfer(account, recipient, amount, transfer.exec_date, transfer.label)
def execute_transfer(self, transfer, **params):
return self.browser.execute_transfer(transfer)
def iter_contacts(self):
return self.browser.get_advisor()
def get_profile(self):
if not hasattr(self.browser, 'get_profile'):
raise NotImplementedError()
return self.browser.get_profile()
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def get_document(self, _id):
subscription_id = _id.split('_')[0]
subscription = self.get_subscription(subscription_id)
return find_object(self.iter_documents(subscription), id=_id, error=DocumentNotFound)
def get_subscription(self, _id):
return find_object(self.iter_subscription(), id=_id, error=SubscriptionNotFound)
def iter_documents(self, subscription):
if not isinstance(subscription, Subscription):
subscription = self.get_subscription(subscription)
return self.browser.iter_documents(subscription)
def iter_subscription(self):
return self.browser.iter_subscriptions()
def download_document(self, document):
if not isinstance(document, Document):
document = self.get_document(document)
if document.url is NotAvailable:
return
return self.browser.open(document.url).content
def iter_resources(self, objs, split_path):
if Account in objs:
self._restrict_level(split_path)
return self.iter_accounts()
if Subscription in objs:
self._restrict_level(split_path)
return self.iter_subscription()