Newer
Older
# -*- coding: utf-8 -*-
# Copyright(C) 2016 Benjamin Bouvier
#
# This file is part of a weboob module.
# This weboob module is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This weboob module is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this weboob module. If not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
from datetime import datetime, timedelta
from weboob.browser import need_login
from weboob.browser.browsers import DomainBrowser, StatesMixin
from weboob.capabilities.base import find_object, NotAvailable
from weboob.capabilities.bank import Account, Transaction, AccountNotFound
from weboob.browser.filters.standard import CleanText
from weboob.exceptions import (
BrowserIncorrectPassword, BrowserUnavailable, BrowserQuestion, NeedInteractiveFor2FA,
)
from weboob.browser.exceptions import ClientError, BrowserTooManyRequests
from weboob.tools.value import Value
# Do not use an APIBrowser since APIBrowser sends all its requests bodies as
# JSON, although N26 only accepts urlencoded format.
class Number26Browser(DomainBrowser, StatesMixin):
BASEURL = 'https://api.tech26.de'
# Password encoded in base64 for the initial basic-auth scheme used to
# get an access token.
INITIAL_TOKEN = 'bXktdHJ1c3RlZC13ZHBDbGllbnQ6c2VjcmV0'
__states__ = ('bearer', 'auth_method', 'mfaToken', 'refresh_token', 'token_expire')
@property
def logged(self):
return self.token_expire and datetime.strptime(self.token_expire, '%Y-%m-%d %H:%M:%S') > datetime.now()
Makes it more convenient to add the bearer token and convert the result
body back to JSON.
if not self.logged:
kwargs.setdefault('headers', {})['Authorization'] = 'Basic ' + self.INITIAL_TOKEN
else:
kwargs.setdefault('headers', {})['Authorization'] = self.auth_method + ' ' + self.bearer
kwargs.setdefault('headers', {})['Accept'] = "application/json"
def __init__(self, config, *args, **kwargs):
super(Number26Browser, self).__init__(*args, **kwargs)
self.config = config
self.username = self.config['login'].get()
self.password = self.config['password'].get()
self.refresh_token = None
self.token_expire = None
self.mfaToken = None
self.bearer = self.INITIAL_TOKEN
def do_otp(self, mfaToken):
data = {
'challengeType': 'otp',
'mfaToken': mfaToken
}
try:
result = self.request('/api/mfa/challenge', json=data)
except ClientError as e:
# if we send more than 5 otp without success, the server will warn the user to
# wait 12h before retrying, but in fact it seems that we can resend otp 5 mins later
if e.response.status_code == 429:
raise BrowserUnavailable(json_response['detail'])
raise BrowserQuestion(Value('otp', label='Veuillez entrer le code reçu par sms au ' + result['obfuscatedPhoneNumber']))
def update_token(self, auth_method, bearer, refresh_token, expires_in):
self.auth_method = auth_method
self.bearer = bearer
self.refresh_token = refresh_token
if expires_in is not None:
self.token_expire = (datetime.now() + timedelta(seconds=expires_in)).strftime('%Y-%m-%d %H:%M:%S')
else:
self.token_expire = None
def has_refreshed(self):
'refresh_token': self.refresh_token,
'grant_type': 'refresh_token'
try:
result = self.request('/oauth2/token', data=data)
except ClientError as e:
if e.response.status_code == 401:
self.update_token('Basic', self.INITIAL_TOKEN, None, None)
return False
if e.response.status_code == 429:
raise BrowserTooManyRequests(json_response['detail'])
raise
self.update_token(result['token_type'], result['access_token'], result['refresh_token'], result['expires_in'])
return True
def do_login(self):
# The refresh token last between one and two hours, be carefull, otp asked frequently
if self.refresh_token:
if self.has_refreshed():
return
if self.config['request_information'].get() is None:
raise NeedInteractiveFor2FA()
if self.config['otp'].get():
data = {
'mfaToken': self.mfaToken,
'grant_type': 'mfa_otp',
'otp': self.config['otp'].get()
}
else:
data = {
'username': self.username,
'password': self.password,
'grant_type': 'password'
}
result = self.request('/oauth2/token', data=data)
json_response = ex.response.json()
if json_response.get('title') == 'A second authentication factor is required.':
self.mfaToken = json_response.get('mfaToken')
elif json_response.get('error') == 'invalid_grant':
raise BrowserIncorrectPassword(json_response['error_description'])
elif json_response.get('title') == 'Error':
raise BrowserUnavailable(json_response['message'])
elif json_response.get('title') == 'invalid_otp':
raise BrowserIncorrectPassword(json_response['userMessage']['detail'])
# if we try too many requests, it will return a 429 and the user will have
# to wait 30 minutes before retrying, and if he retries at 29 min, he will have
# to wait 30 minutes more
elif ex.response.status_code == 429:
raise BrowserTooManyRequests(json_response['detail'])
raise
self.update_token(result['token_type'], result['access_token'], result['refresh_token'], result['expires_in'])
def get_accounts(self):
account = self.request('/api/accounts')
spaces = self.request('/api/spaces')
a = Account()
# Number26 only provides a checking account (as of sept 19th 2016).
a.type = Account.TYPE_CHECKING
a.label = u'Checking account'
a.id = account["id"]
a.balance = Decimal(str(spaces["totalBalance"]))
return find_object(self.get_accounts(), id=_id, error=AccountNotFound)
def get_categories(self):
"""
Generates a map of categoryId -> categoryName, for fast lookup when
fetching transactions.
"""
categories = self.request('/api/smrt/categories')
cmap = {}
for c in categories:
cmap[c["id"]] = c["name"]
return cmap
@staticmethod
def is_past_transaction(t):
return "userAccepted" in t or "confirmed" in t
return self._internal_get_transactions(categories, Number26Browser.is_past_transaction)
def get_coming(self, categories):
filter = lambda x: not Number26Browser.is_past_transaction(x)
return self._internal_get_transactions(categories, filter)
def _internal_get_transactions(self, categories, filter_func):
TYPES = {
'PT': Transaction.TYPE_CARD,
'AA': Transaction.TYPE_CARD,
'CT': Transaction.TYPE_TRANSFER,
'WEE': Transaction.TYPE_BANK,
}
transactions = self.request('/api/smrt/transactions?limit=1000')
if not filter_func(t) or t["amount"] == 0:
new.date = datetime.fromtimestamp(t["createdTS"] / 1000)
new.rdate = datetime.fromtimestamp(t["visibleTS"] / 1000)
if "merchantName" in t:
new.raw = new.label = t["merchantName"]
elif "partnerName" in t:
new.raw = CleanText().filter(t["referenceText"]) if "referenceText" in t else CleanText().filter(t["partnerName"])
if "originalCurrency" in t:
new.original_currency = t["originalCurrency"]
new.original_amount = Decimal(str(t["originalAmount"]))
new.type = TYPES.get(t["type"], Transaction.TYPE_UNKNOWN)
if t["category"] in categories:
new.category = categories[t["category"]]