# -*- coding: utf-8 -*-
# Copyright(C) 2012-2014 Laurent Bachelier
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with weboob. If not, see .
from __future__ import absolute_import, print_function
from collections import OrderedDict
from functools import wraps
import re
import pickle
import base64
from hashlib import sha256
import zlib
from functools import reduce
try:
from requests.packages import urllib3
except ImportError:
import urllib3
import os
import sys
from copy import deepcopy
import inspect
from datetime import datetime, timedelta
from dateutil import parser
from threading import Lock
try:
import requests
if int(requests.__version__.split('.')[0]) < 2:
raise ImportError()
except ImportError:
raise ImportError('Please install python-requests >= 2.0')
from weboob.exceptions import BrowserHTTPSDowngrade, ModuleInstallError, BrowserRedirect, BrowserIncorrectPassword
from weboob.tools.log import getLogger
from weboob.tools.compat import basestring, unicode, urlparse, urljoin, urlencode, parse_qsl
from weboob.tools.json import json
from .cookies import WeboobCookieJar
from .exceptions import HTTPNotFound, ClientError, ServerError
from .sessions import FuturesSession
from .profiles import Firefox
from .pages import NextPage
from .url import URL, normalize_url
class Browser(object):
"""
Simple browser class.
Act like a browser, and don't try to do too much.
"""
PROFILE = Firefox()
"""
Default profile used by browser to navigate on websites.
"""
TIMEOUT = 10.0
"""
Default timeout during requests.
"""
REFRESH_MAX = 0.0
"""
When handling a Refresh header, the browsers considers it only if the sleep
time in lesser than this value.
"""
VERIFY = True
"""
Check SSL certificates.
"""
PROXIES = None
MAX_RETRIES = 2
MAX_WORKERS = 10
"""
Maximum of threads for asynchronous requests.
"""
ALLOW_REFERRER = True
"""
Controls the behavior of get_referrer.
"""
COOKIE_POLICY = None
"""
Default CookieJar policy.
Example: weboob.browser.cookies.BlockAllCookies()
"""
@classmethod
def asset(cls, localfile):
"""
Absolute file path for a module local file.
"""
if os.path.isabs(localfile):
return localfile
return os.path.join(os.path.dirname(inspect.getfile(cls)), localfile)
def __init__(self, logger=None, proxy=None, responses_dirname=None, weboob=None):
self.logger = getLogger('browser', logger)
self.responses_dirname = responses_dirname
self.responses_count = 1
self.responses_count_lock = Lock()
if isinstance(self.VERIFY, basestring):
self.VERIFY = self.asset(self.VERIFY)
self.PROXIES = proxy
self._setup_session(self.PROFILE)
self.url = None
self.response = None
def deinit(self):
self.session.close()
def set_normalized_url(self, response, **kwargs):
response.url = normalize_url(response.url)
def save_response(self, response, warning=False, **kwargs):
if self.responses_dirname is None:
import tempfile
self.responses_dirname = tempfile.mkdtemp(prefix='weboob_session_')
print('Debug data will be saved in this directory: %s' % self.responses_dirname, file=sys.stderr)
elif not os.path.isdir(self.responses_dirname):
os.makedirs(self.responses_dirname)
import mimetypes
# get the content-type, remove optionnal charset part
mimetype = response.headers.get('Content-Type', '').split(';')[0]
# due to http://bugs.python.org/issue1043134
if mimetype == 'text/plain':
ext = '.txt'
else:
# try to get an extension (and avoid adding 'None')
ext = mimetypes.guess_extension(mimetype, False) or ''
with self.responses_count_lock:
counter = self.responses_count
self.responses_count += 1
path = re.sub(r'[^A-z0-9\.-_]+', '_', urlparse(response.url).path.rpartition('/')[2])[-10:]
if path.endswith(ext):
ext = ''
filename = '%02d-%d%s%s%s' % \
(counter, response.status_code, '-' if path else '', path, ext)
response_filepath = os.path.join(self.responses_dirname, filename)
request = response.request
with open(response_filepath + '-request.txt', 'w') as f:
f.write('%s %s\n\n\n' % (request.method, request.url))
for key, value in request.headers.items():
f.write('%s: %s\n' % (key, value))
if request.body is not None: # separate '' from None
f.write('\n\n\n%s' % request.body)
with open(response_filepath + '-response.txt', 'w') as f:
if hasattr(response.elapsed, 'total_seconds'):
f.write('Time: %3.3fs\n' % response.elapsed.total_seconds())
f.write('%s %s\n\n\n' % (response.status_code, response.reason))
for key, value in response.headers.items():
f.write('%s: %s\n' % (key, value))
with open(response_filepath, 'wb') as f:
f.write(response.content)
match_filepath = os.path.join(self.responses_dirname, 'url_response_match.txt')
with open(match_filepath, 'a') as f:
f.write('# %d %s %s\n' % (response.status_code, response.reason, response.headers.get('Content-Type', '')))
f.write('%s\t%s\n' % (response.url, filename))
msg = u'Response saved to %s' % response_filepath
if warning:
self.logger.warning(msg)
else:
self.logger.info(msg)
def _create_session(self):
return FuturesSession(max_workers=self.MAX_WORKERS, max_retries=self.MAX_RETRIES)
def _setup_session(self, profile):
"""
Set up a python-requests session for our usage.
"""
session = self._create_session()
session.proxies = self.PROXIES
session.verify = not self.logger.settings['ssl_insecure'] and self.VERIFY
if not session.verify:
try:
urllib3.disable_warnings()
except AttributeError:
# urllib3 is too old, warnings won't be disable
pass
# defines a max_retries. It's mandatory in case a server is not
# handling keep alive correctly, like the proxy burp
adapter_kwargs = dict(max_retries=self.MAX_RETRIES)
# set connection pool size equal to MAX_WORKERS if needed
if self.MAX_WORKERS > requests.adapters.DEFAULT_POOLSIZE:
adapter_kwargs.update(pool_connections=self.MAX_WORKERS,
pool_maxsize=self.MAX_WORKERS)
session.mount('https://', requests.adapters.HTTPAdapter(**adapter_kwargs))
session.mount('http://', requests.adapters.HTTPAdapter(**adapter_kwargs))
if self.TIMEOUT:
session.timeout = self.TIMEOUT
## weboob only can provide proxy and HTTP auth options
session.trust_env = False
profile.setup_session(session)
session.hooks['response'].append(self.set_normalized_url)
if self.responses_dirname is not None:
session.hooks['response'].append(self.save_response)
self.session = session
session.cookies = WeboobCookieJar()
if self.COOKIE_POLICY:
session.cookies.set_policy(self.COOKIE_POLICY)
def set_profile(self, profile):
profile.setup_session(self.session)
def location(self, url, **kwargs):
"""
Like :meth:`open` but also changes the current URL and response.
This is the most common method to request web pages.
Other than that, has the exact same behavior of open().
"""
assert not kwargs.get('is_async'), "Please use open() instead of location() to make asynchronous requests."
response = self.open(url, **kwargs)
self.response = response
self.url = self.response.url
return response
def open(self, url, referrer=None,
allow_redirects=True,
stream=None,
timeout=None,
verify=None,
cert=None,
proxies=None,
data_encoding=None,
is_async=False,
callback=lambda response: response,
**kwargs):
"""
Make an HTTP request like a browser does:
* follow redirects (unless disabled)
* provide referrers (unless disabled)
Unless a `method` is explicitly provided, it makes a GET request,
or a POST if data is not None,
An empty `data` (not None, like '' or {}) *will* make a POST.
It is a wrapper around session.request().
All session.request() options are available.
You should use location() or open() and not session.request(),
since it has some interesting additions, which are easily
individually disabled through the arguments.
Call this instead of location() if you do not want to "visit" the URL
(for instance, you are downloading a file).
When `is_async` is True, open() returns a Future object (see
concurrent.futures for more details), which can be evaluated with its
result() method. If any exception is raised while processing request,
it is caught and re-raised when calling result().
For example:
>>> Browser().open('http://google.com', is_async=True).result().text # doctest: +SKIP
:param url: URL
:type url: str
:param data: POST data
:type url: str or dict or None
:param referrer: Force referrer. False to disable sending it, None for guessing
:type referrer: str or False or None
:param is_async: Process request in a non-blocking way
:type is_async: bool
:param callback: Callback to be called when request has finished,
with response as its first and only argument
:type callback: function
:rtype: :class:`requests.Response`
"""
if 'async' in kwargs:
import warnings
warnings.warn('Please use is_async instead of async.', DeprecationWarning)
is_async = kwargs['async']
del kwargs['async']
if isinstance(url, basestring):
url = normalize_url(url)
elif isinstance(url, requests.Request):
url.url = normalize_url(url.url)
req = self.build_request(url, referrer, data_encoding=data_encoding, **kwargs)
preq = self.prepare_request(req)
if hasattr(preq, '_cookies'):
# The _cookies attribute is not present in requests < 2.2. As in
# previous version it doesn't calls extract_cookies_to_jar(), it is
# not a problem as we keep our own cookiejar instance.
preq._cookies = WeboobCookieJar.from_cookiejar(preq._cookies)
if self.COOKIE_POLICY:
preq._cookies.set_policy(self.COOKIE_POLICY)
if proxies is None:
proxies = self.PROXIES
if verify is None:
verify = not self.logger.settings['ssl_insecure'] and self.VERIFY
if timeout is None:
timeout = self.TIMEOUT
# We define an inner_callback here in order to execute the same code
# regardless of is_async param.
def inner_callback(future, response):
if allow_redirects:
response = self.handle_refresh(response)
self.raise_for_status(response)
return callback(response)
# call python-requests
response = self.session.send(preq,
allow_redirects=allow_redirects,
stream=stream,
timeout=timeout,
verify=verify,
cert=cert,
proxies=proxies,
callback=inner_callback,
is_async=is_async)
return response
def async_open(self, url, **kwargs):
"""
Shortcut to open(url, is_async=True).
"""
if 'async' in kwargs:
del kwargs['async']
if 'is_async' in kwargs:
del kwargs['is_async']
return self.open(url, is_async=True, **kwargs)
def raise_for_status(self, response):
"""
Like Response.raise_for_status but will use other classes if needed.
"""
http_error_msg = None
if 400 <= response.status_code < 500:
http_error_msg = '%s Client Error: %s' % (response.status_code, response.reason)
cls = ClientError
if response.status_code == 404:
cls = HTTPNotFound
elif 500 <= response.status_code < 600:
http_error_msg = '%s Server Error: %s' % (response.status_code, response.reason)
cls = ServerError
if http_error_msg:
raise cls(http_error_msg, response=response)
# in case we did not catch something that should be
response.raise_for_status()
def build_request(self, url, referrer=None, data_encoding=None, **kwargs):
"""
Does the same job as open(), but returns a Request without
submitting it.
This allows further customization to the Request.
"""
if isinstance(url, requests.Request):
req = url
url = req.url
else:
req = requests.Request(url=url, **kwargs)
# guess method
if req.method is None:
if req.data or req.json:
req.method = 'POST'
else:
req.method = 'GET'
# convert unicode strings to proper encoding
if isinstance(req.data, unicode) and data_encoding:
req.data = req.data.encode(data_encoding)
if isinstance(req.data, dict) and data_encoding:
req.data = OrderedDict([(k, v.encode(data_encoding) if isinstance(v, unicode) else v)
for k, v in req.data.items()])
if referrer is None:
referrer = self.get_referrer(self.url, url)
if referrer:
# Yes, it is a misspelling.
req.headers.setdefault('Referer', referrer)
return req
def prepare_request(self, req):
"""
Get a prepared request from a Request object.
This method aims to be overloaded by children classes.
"""
return self.session.prepare_request(req)
REFRESH_RE = re.compile(r"^(?P[\d\.]+)(;\s*url=[\"']?(?P.*?)[\"']?)?$", re.IGNORECASE)
def handle_refresh(self, response):
"""
Called by open, to handle Refresh HTTP header.
It only redirect to the refresh URL if the sleep time is inferior to
REFRESH_MAX.
"""
if 'Refresh' not in response.headers:
return response
m = self.REFRESH_RE.match(response.headers['Refresh'])
if m:
# XXX perhaps we should not redirect if the refresh url is equal to the current url.
url = m.groupdict().get('url', None) or response.request.url
sleep = float(m.groupdict()['sleep'])
if sleep <= self.REFRESH_MAX:
self.logger.debug('Refresh to %s' % url)
return self.open(url)
else:
self.logger.debug('Do not refresh to %s because %s > REFRESH_MAX(%s)' % (url, sleep, self.REFRESH_MAX))
return response
self.logger.warning('Unable to handle refresh "%s"' % response.headers['Refresh'])
return response
def get_referrer(self, oldurl, newurl):
"""
Get the referrer to send when doing a request.
If we should not send a referrer, it will return None.
Reference: https://en.wikipedia.org/wiki/HTTP_referer
The behavior can be controlled through the ALLOW_REFERRER attribute.
True always allows the referers
to be sent, False never, and None only if it is within
the same domain.
:param oldurl: Current absolute URL
:type oldurl: str or None
:param newurl: Target absolute URL
:type newurl: str
:rtype: str or None
"""
if self.ALLOW_REFERRER is False:
return
if oldurl is None:
return
old = urlparse(oldurl)
new = urlparse(newurl)
# Do not leak secure URLs to insecure URLs
if old.scheme == 'https' and new.scheme != 'https':
return
# Reloading the page. Usually no referrer.
if oldurl == newurl:
return
# Domain-based privacy
if self.ALLOW_REFERRER is None and old.netloc != new.netloc:
return
return oldurl
def export_session(self):
def make_cookie(c):
d = {
k: getattr(c, k) for k in ['name', 'value', 'domain', 'path', 'secure']
}
#d['session'] = c.discard
d['httpOnly'] = 'httponly' in [k.lower() for k in c._rest.keys()]
d['expirationDate'] = getattr(c, 'expires', None)
return d
return {
'url': self.url,
'cookies': [make_cookie(c) for c in self.session.cookies],
}
class UrlNotAllowed(Exception):
"""
Raises by :class:`DomainBrowser` when `RESTRICT_URL` is set and trying to go
on an url not matching `BASEURL`.
"""
class DomainBrowser(Browser):
"""
A browser that handles relative URLs and can have a base URL (usually a domain).
For instance self.location('/hello') will get http://weboob.org/hello
if BASEURL is 'http://weboob.org/'.
"""
BASEURL = None
"""
Base URL, e.g. 'http://weboob.org/' or 'https://weboob.org/'
See absurl().
"""
RESTRICT_URL = False
"""
URLs allowed to load.
This can be used to force SSL (if the BASEURL is SSL) or any other leakage.
Set to True to allow only URLs starting by the BASEURL.
Set it to a list of allowed URLs if you have multiple allowed URLs.
More complex behavior is possible by overloading url_allowed()
"""
def __init__(self, baseurl=None, *args, **kwargs):
super(DomainBrowser, self).__init__(*args, **kwargs)
if baseurl is not None:
self.BASEURL = baseurl
def url_allowed(self, url):
"""
Checks if we are allowed to visit an URL.
See RESTRICT_URL.
:param url: Absolute URL
:type url: str
:rtype: bool
"""
if self.BASEURL is None or self.RESTRICT_URL is False:
return True
if self.RESTRICT_URL is True:
return url.startswith(self.BASEURL)
for restrict_url in self.RESTRICT_URL:
if url.startswith(restrict_url):
return True
return False
def absurl(self, uri, base=None):
"""
Get the absolute URL, relative to a base URL.
If base is None, it will try to use the current URL.
If there is no current URL, it will try to use BASEURL.
If base is False, it will always try to use the current URL.
If base is True, it will always try to use BASEURL.
:param uri: URI to make absolute. It can be already absolute.
:type uri: str
:param base: Base absolute URL.
:type base: str or None or False or True
:rtype: str
"""
if not base:
base = self.url
if base is None or base is True:
base = self.BASEURL
return urljoin(base, uri)
def open(self, req, *args, **kwargs):
"""
Like :meth:`Browser.open` but handles urls without domains, using
the :attr:`BASEURL` attribute.
"""
uri = req.url if isinstance(req, requests.Request) else req
url = self.absurl(uri)
if not self.url_allowed(url):
raise UrlNotAllowed(url)
if isinstance(req, requests.Request):
req.url = url
else:
req = url
return super(DomainBrowser, self).open(req, *args, **kwargs)
def go_home(self):
"""
Go to the "home" page, usually the BASEURL.
"""
return self.location(self.BASEURL or self.absurl('/'))
class PagesBrowser(DomainBrowser):
r"""
A browser which works pages and keep state of navigation.
To use it, you have to derive it and to create URL objects as class
attributes. When open() or location() are called, if the url matches
one of URL objects, it returns a Page object. In case of location(), it
stores it in self.page.
Example:
>>> from .pages import Page
>>> class HomePage(Page):
... pass
...
>>> class ListPage(Page):
... pass
...
>>> class MyBrowser(PagesBrowser):
... BASEURL = 'http://example.org'
... home = URL('/(index\.html)?', HomePage)
... list = URL('/list\.html', ListPage)
...
You can then use URL instances to go on pages.
"""
_urls = None
def __init__(self, *args, **kwargs):
self.highlight_el = kwargs.pop('highlight_el', False)
super(PagesBrowser, self).__init__(*args, **kwargs)
self.page = None
# exclude properties because they can access other fields not yet defined
def is_property(attr):
v = getattr(type(self), attr, None)
return hasattr(v, '__get__') or hasattr(v, '__set__')
attrs = [(attr, getattr(self, attr)) for attr in dir(self) if not is_property(attr)]
attrs = [v for v in attrs if isinstance(v[1], URL)]
attrs.sort(key=lambda v: v[1]._creation_counter)
self._urls = OrderedDict(deepcopy(attrs))
for k, v in self._urls.items():
setattr(self, k, v)
for url in self._urls.values():
url.browser = self
def open(self, *args, **kwargs):
"""
Same method than
:meth:`weboob.browser.browsers.DomainBrowser.open`, but the
response contains an attribute `page` if the url matches any
:class:`URL` object.
"""
callback = kwargs.pop('callback', lambda response: response)
page_class = kwargs.pop('page', None)
# Have to define a callback to seamlessly process synchronous and
# asynchronous requests, see :meth:`Browser.open` and its `is_async`
# and `callback` params.
def internal_callback(response):
# Try to handle the response page with an URL instance.
response.page = None
if page_class:
response.page = page_class(self, response)
return callback(response)
for url in self._urls.values():
response.page = url.handle(response)
if response.page is not None:
self.logger.debug('Handle %s with %s', response.url, response.page.__class__.__name__)
break
if response.page is None:
regexp = r'^(?P\w+)://.*'
proto_response = re.match(regexp, response.url)
if proto_response:
proto_response = proto_response.group('proto')
proto_base = re.match(regexp, self.BASEURL).group('proto')
if proto_base == 'https' and proto_response != 'https':
raise BrowserHTTPSDowngrade()
self.logger.debug('Unable to handle %s', response.url)
return callback(response)
return super(PagesBrowser, self).open(callback=internal_callback, *args, **kwargs)
def location(self, *args, **kwargs):
"""
Same method than
:meth:`weboob.browser.browsers.Browser.location`, but if the
url matches any :class:`URL` object, an attribute `page` is added to
response, and the attribute :attr:`PagesBrowser.page` is set.
"""
if self.page is not None:
# Call leave hook.
self.page.on_leave()
response = self.open(*args, **kwargs)
self.response = response
self.page = response.page
self.url = response.url
if self.page is not None:
# Call load hook.
self.page.on_load()
# Returns self.response in case on_load recalls location()
return self.response
def pagination(self, func, *args, **kwargs):
r"""
This helper function can be used to handle pagination pages easily.
When the called function raises an exception :class:`NextPage`, it goes
on the wanted page and recall the function.
:class:`NextPage` constructor can take an url or a Request object.
>>> from .pages import HTMLPage
>>> class Page(HTMLPage):
... def iter_values(self):
... for el in self.doc.xpath('//li'):
... yield el.text
... for next in self.doc.xpath('//a'):
... raise NextPage(next.attrib['href'])
...
>>> class Browser(PagesBrowser):
... BASEURL = 'https://people.symlink.me'
... list = URL('/~rom1/projects/weboob/list-(?P\d+).html', Page)
...
>>> b = Browser()
>>> b.list.go(pagenum=1) # doctest: +ELLIPSIS
>>> list(b.pagination(lambda: b.page.iter_values()))
['One', 'Two', 'Three', 'Four']
"""
while True:
try:
for r in func(*args, **kwargs):
yield r
except NextPage as e:
self.location(e.request)
else:
return
def need_login(func):
"""
Decorator used to require to be logged to access to this function.
This decorator can be used on any method whose first argument is a
browser (typically a :class:`LoginBrowser`). It checks for the
`logged` attribute in the current browser's page: when this
attribute is set to ``True`` (e.g., when the page inherits
:class:`LoggedPage`), then nothing special happens.
In all other cases (when the browser isn't on any defined page or
when the page's `logged` attribute is ``False``), the
:meth:`LoginBrowser.do_login` method of the browser is called before
calling :`func`.
"""
@wraps(func)
def inner(browser, *args, **kwargs):
if (not hasattr(browser, 'logged') or (hasattr(browser, 'logged') and not browser.logged)) and \
(not hasattr(browser, 'page') or browser.page is None or not browser.page.logged):
browser.do_login()
if browser.logger.settings.get('export_session'):
browser.logger.debug('logged in with session: %s', json.dumps(browser.export_session()))
return func(browser, *args, **kwargs)
return inner
class LoginBrowser(PagesBrowser):
"""
A browser which supports login.
"""
def __init__(self, username, password, *args, **kwargs):
super(LoginBrowser, self).__init__(*args, **kwargs)
self.username = username
self.password = password
def do_login(self):
"""
Abstract method to implement to login on website.
It is called when a login is needed.
"""
raise NotImplementedError()
def do_logout(self):
"""
Logout from website.
By default, simply clears the cookies.
"""
self.session.cookies.clear()
class StatesMixin(object):
"""
Mixin to store states of browser.
"""
__states__ = []
"""
Saved state variables.
"""
STATE_DURATION = None
"""
In minutes, used to set an expiration datetime object of the state.
"""
def locate_browser(self, state):
try:
self.location(state['url'])
except (requests.exceptions.HTTPError, requests.exceptions.TooManyRedirects):
pass
def load_state(self, state):
if 'expire' in state and parser.parse(state['expire']) < datetime.now():
return self.logger.info('State expired, not reloading it from storage')
if 'cookies' in state:
try:
self.session.cookies = pickle.loads(zlib.decompress(base64.b64decode(state['cookies'])))
except (TypeError, zlib.error, EOFError, ValueError):
self.logger.error('Unable to reload cookies from storage')
else:
self.logger.info('Reloaded cookies from storage')
for attrname in self.__states__:
if attrname in state:
setattr(self, attrname, state[attrname])
if 'url' in state:
self.locate_browser(state)
def dump_state(self):
state = {}
if hasattr(self, 'page') and self.page:
state['url'] = self.page.url
state['cookies'] = base64.b64encode(zlib.compress(pickle.dumps(self.session.cookies, -1)))
for attrname in self.__states__:
try:
state[attrname] = getattr(self, attrname)
except AttributeError:
pass
if self.STATE_DURATION is not None:
state['expire'] = unicode((datetime.now() + timedelta(minutes=self.STATE_DURATION)).replace(microsecond=0))
self.logger.info('Stored cookies into storage')
return state
class APIBrowser(DomainBrowser):
"""
A browser for API websites.
"""
def build_request(self, *args, **kwargs):
if 'data' in kwargs:
kwargs['data'] = json.dumps(kwargs['data'])
if 'headers' not in kwargs:
kwargs['headers'] = {}
kwargs['headers']['Content-Type'] = 'application/json'
return super(APIBrowser, self).build_request(*args, **kwargs)
def open(self, *args, **kwargs):
"""
Do a JSON request.
The "Content-Type" header is always set to "application/json".
:param data: if specified, format as JSON and send as request body
:type data: :class:`dict`
:param headers: if specified, add these headers to the request
:type headers: :class:`dict`
"""
return super(APIBrowser, self).open(*args, **kwargs)
def request(self, *args, **kwargs):
"""
Do a JSON request and parse the response.
:returns: a dict containing the parsed JSON server response
:rtype: :class:`dict`
"""
return self.open(*args, **kwargs).json()
class AbstractBrowserMissingParentError(Exception):
pass
class AbstractBrowser(Browser):
""" AbstractBrowser allow inheritance of a browser defined in another module.
Websites can share many pages and code base. This class allow to load a browser
provided by another module and to build our own browser on top of it (like standard
python inheritance. Weboob will install and download the PARENT module for you.
PARENT is a mandatory attribute, it's the name of the module providing the parent Browser
PARENT_ATTR is an optionnal attribute used when the parent module does not have only one
browser defined as BROWSER class attribute: you can customized the path of the object to load.
Note that you must pass a valid weboob instance as first argument of the constructor.
"""
PARENT = None
PARENT_ATTR = None
def __new__(cls, *args, **kwargs):
weboob = kwargs['weboob']
if cls.PARENT is None:
raise AbstractBrowserMissingParentError("PARENT is not defined for browser %s" % cls)
try:
module = weboob.load_or_install_module(cls.PARENT)
except ModuleInstallError as err:
raise ModuleInstallError('This module depends on %s module but %s\'s installation failed with: %s' % (cls.PARENT, cls.PARENT, err))
if cls.PARENT_ATTR is None:
parent = module.klass.BROWSER
else:
parent = reduce(getattr, cls.PARENT_ATTR.split('.'), module)
if parent is None:
raise AbstractBrowserMissingParentError("Failed to load parent class")
cls.__bases__ = (parent,)
return object.__new__(cls)
class OAuth2Mixin(StatesMixin):
AUTHORIZATION_URI = None
ACCESS_TOKEN_URI = None
SCOPE = ''
client_id = None
client_secret = None
redirect_uri = None
access_token = None
access_token_expire = None
auth_uri = None
token_type = None
refresh_token = None
def __init__(self, *args, **kwargs):
super(OAuth2Mixin, self).__init__(*args, **kwargs)
self.__states__ += ('access_token', 'access_token_expire', 'refresh_token', 'token_type')
def build_request(self, *args, **kwargs):
headers = kwargs.setdefault('headers', {})
if self.access_token:
headers['Authorization'] = '{} {}'.format(self.token_type, self.access_token)
return super(OAuth2Mixin, self).build_request(*args, **kwargs)
def dump_state(self):
self.access_token_expire = unicode(self.access_token_expire) if self.access_token_expire else None
return super(OAuth2Mixin, self).dump_state()
def load_state(self, state):
super(OAuth2Mixin, self).load_state(state)
self.access_token_expire = parser.parse(self.access_token_expire) if self.access_token_expire else None
@property
def logged(self):
return self.access_token is not None and self.access_token_expire > datetime.now()
def do_login(self):
if self.refresh_token:
self.use_refresh_token()
elif self.auth_uri:
self.request_access_token(self.auth_uri)
else:
self.request_authorization()
def build_authorization_parameters(self):
return {'redirect_uri': self.redirect_uri,
'scope': self.SCOPE,
'client_id': self.client_id,
'response_type': 'code',
}
def build_authorization_uri(self):
p = urlparse(self.AUTHORIZATION_URI)
q = dict(parse_qsl(p.query))
q.update(self.build_authorization_parameters())
return p._replace(query=urlencode(q)).geturl()
def request_authorization(self):
self.logger.info('request authorization')
raise BrowserRedirect(self.build_authorization_uri())
def build_access_token_parameters(self, values):
return {'code': values['code'],
'grant_type': 'authorization_code',
'redirect_uri': self.redirect_uri,
'client_id': self.client_id,
'client_secret': self.client_secret,
}
def do_token_request(self, data):
return self.open(self.ACCESS_TOKEN_URI, data=data)
def request_access_token(self, auth_uri):
self.logger.info('requesting access token')
if isinstance(auth_uri, dict):
values = auth_uri
else:
values = dict(parse_qsl(urlparse(auth_uri).query))
data = self.build_access_token_parameters(values)
try:
auth_response = self.do_token_request(data).json()
except ClientError:
raise BrowserIncorrectPassword()
self.update_token(auth_response)
def use_refresh_token(self):
self.logger.info('refreshing token')
data = {'grant_type': 'refresh_token',
'refresh_token': self.refresh_token,
}
try:
auth_response = self.do_token_request(data).json()
except ClientError:
raise BrowserIncorrectPassword()
self.update_token(auth_response)
def update_token(self, auth_response):
self.token_type = auth_response['token_type'].capitalize() # don't know yet if this is a good idea, but required by bnpstet
if 'refresh_token' in auth_response:
self.refresh_token = auth_response['refresh_token']
self.access_token = auth_response['access_token']
self.access_token_expire = datetime.now() + timedelta(seconds=int(auth_response['expires_in']))
class OAuth2PKCEMixin(OAuth2Mixin):
def __init__(self, *args, **kwargs):
super(OAuth2PKCEMixin, self).__init__(*args, **kwargs)
self.__states__ += ('pkce_verifier', 'pkce_challenge')
self.pkce_verifier = self.code_verifier()
self.pkce_challenge = self.code_challenge(self.pkce_verifier)
# PKCE (Proof Key for Code Exchange) standard protocol methods:
def code_verifier(self, bytes_number=64):
return base64.urlsafe_b64encode(os.urandom(bytes_number)).rstrip(b'=')
def code_challenge(self, verifier):
digest = sha256(verifier).digest()
return base64.urlsafe_b64encode(digest).rstrip(b'=')
def build_authorization_parameters(self):
return {'redirect_uri': self.redirect_uri,
'code_challenge_method': 'S256',
'code_challenge': self.pkce_challenge,
'client_id': self.client_id
}
def build_access_token_parameters(self, values):
return {'code': values['code'],
'grant_type': 'authorization_code',
'code_verifier': self.pkce_verifier,
'redirect_uri': self.redirect_uri,
'client_id': self.client_id,
'client_secret': self.client_secret,
}