Newer
Older
# -*- coding: utf-8 -*-
# Copyright(C) 2018 Vincent Ardisson
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals, absolute_import
import codecs
from collections import OrderedDict
from contextlib import contextmanager
from copy import deepcopy
from glob import glob
from tempfile import NamedTemporaryFile
import time
try:
from selenium import webdriver
except ImportError:
raise ImportError('Please install python-selenium')
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.webdriver.common.proxy import Proxy, ProxyType
from selenium.common.exceptions import (
NoSuchElementException, TimeoutException, NoSuchFrameException,
)
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.firefox_profile import FirefoxProfile
from selenium.webdriver.remote.command import Command
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from weboob.tools.log import getLogger
from weboob.tools.compat import urljoin
from .pages import HTMLPage as BaseHTMLPage
from .url import URL
__all__ = (
'SeleniumBrowser', 'SeleniumPage', 'HTMLPage',
'CustomCondition', 'AnyCondition', 'AllCondition', 'NotCondition',
'IsHereCondition', 'VisibleXPath', 'ClickableXPath', 'ClickableLinkText',
'HasTextCondition', 'WrapException',
'xpath_locator', 'link_locator', 'ElementWrapper',
)
class CustomCondition(object):
"""Abstract condition class
In Selenium, waiting is done on callable objects named "conditions".
Basically, a condition is a function predicate returning True if some condition is met.
The builtin selenium conditions are in :any:`selenium.webdriver.support.expected_conditions`.
This class exists to differentiate normal methods from condition objects when calling :any:`SeleniumPage.is_here`.
See https://seleniumhq.github.io/selenium/docs/api/py/webdriver_support/selenium.webdriver.support.expected_conditions.html
When using `selenium.webdriver.support.expected_conditions`, it's better to
wrap them using :any:`WrapException`.
"""
def __call__(self, driver):
raise NotImplementedError()
class WrapException(CustomCondition):
"""Wrap Selenium's builtin `expected_conditions` to catch exceptions.
Selenium's builtin `expected_conditions` return True when a condition is met
but might throw exceptions when it's not met, which might not be desirable.
`WrapException` wraps such `expected_conditions` to catch those exception
and simply return False when such exception is thrown.
"""
def __init__(self, condition):
self.condition = condition
def __call__(self, driver):
try:
return self.condition(driver)
except NoSuchElementException:
return False
class AnyCondition(CustomCondition):
"""Condition that is true if any of several conditions is true.
"""
def __init__(self, *conditions):
self.conditions = tuple(WrapException(cb) for cb in conditions)
def __call__(self, driver):
return any(cb(driver) for cb in self.conditions)
class AllCondition(CustomCondition):
"""Condition that is true if all of several conditions are true.
"""
def __init__(self, *conditions):
self.conditions = tuple(WrapException(cb) for cb in conditions)
def __call__(self, driver):
return all(cb(driver) for cb in self.conditions)
class NotCondition(CustomCondition):
"""Condition that tests the inverse of another condition."""
def __init__(self, condition):
self.condition = WrapException(condition)
def __call__(self, driver):
return not self.condition(driver)
class IsHereCondition(CustomCondition):
"""Condition that is true if a page "is here".
This condition is to be passed to `SeleniumBrowser.wait_until`.
It mustn't be used in a `SeleniumPage.is_here` definition.
"""
def __init__(self, urlobj):
assert isinstance(urlobj, URL)
self.urlobj = urlobj
def __call__(self, driver):
return self.urlobj.is_here()
class WithinFrame(CustomCondition):
"""Check a condition inside a frame.
In Selenium, frames are separated from each other and from the main page.
This class wraps a condition to execute it within a frame.
"""
def __init__(self, selector, condition):
self.selector = selector
self.condition = condition
def __call__(self, driver):
try:
driver.switch_to.frame(self.selector)
except NoSuchFrameException:
return False
try:
return self.condition(driver)
finally:
driver.switch_to.default_content()
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
class StablePageCondition(CustomCondition):
"""
Warning: this condition will not work if a site has a carousel or something
like this that constantly changes the DOM.
"""
purge_times = 10
def __init__(self, waiting=3):
self.elements = OrderedDict()
self.waiting = waiting
def _purge(self):
now = time.time()
for k in list(self.elements):
if now - self.elements[k][0] > self.purge_times * self.waiting:
del self.elements[k]
def __call__(self, driver):
self._purge()
hashed = hashlib.md5(driver.page_source.encode('utf-8')).hexdigest()
now = time.time()
page_id = driver.find_element_by_xpath('/*').id
if page_id not in self.elements or self.elements[page_id][1] != hashed:
self.elements[page_id] = (now, hashed)
return False
elif now - self.elements[page_id][0] < self.waiting:
return False
return True
def VisibleXPath(xpath):
"""Wraps `visibility_of_element_located`"""
return WrapException(EC.visibility_of_element_located(xpath_locator(xpath)))
def ClickableXPath(xpath):
"""Wraps `element_to_be_clickable`"""
return WrapException(EC.element_to_be_clickable(xpath_locator(xpath)))
def ClickableLinkText(text, partial=False):
"""Wraps `element_to_be_clickable`"""
return WrapException(EC.element_to_be_clickable(link_locator(text, partial)))
def HasTextCondition(xpath):
"""Condition to ensure some xpath is visible and contains non-empty text."""
xpath = '(%s)[normalize-space(text())!=""]' % xpath
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
return VisibleXPath(xpath)
def xpath_locator(xpath):
"""Creates an XPath locator from a string
Most Selenium functions don't accept XPaths directly but "locators".
Locators can be XPath, CSS selectors.
"""
return (By.XPATH, xpath)
def link_locator(text, partial=False):
"""Creates an link text locator locator from a string
Most Selenium functions don't accept XPaths directly but "locators".
Warning: if searched text is not directly in <a> but in one of its children,
some webdrivers might not find the link.
"""
if partial:
return (By.PARTIAL_LINK_TEXT, text)
else:
return (By.LINK_TEXT, text)
class ElementWrapper(object):
"""Wrapper to Selenium element to ressemble lxml.
Some differences:
- only a subset of lxml's Element class are available
- cannot access XPath "text()", only Elements
See https://seleniumhq.github.io/selenium/docs/api/py/webdriver_remote/selenium.webdriver.remote.webelement.html
"""
def __init__(self, wrapped):
self.wrapped = wrapped
def xpath(self, xpath):
"""Returns a list of elements matching `xpath`.
Since it uses `find_elements_by_xpath`, it does not raise
`NoSuchElementException` or `TimeoutException`.
"""
return [ElementWrapper(sel) for sel in self.wrapped.find_elements_by_xpath(xpath)]
def text_content(self):
return self.wrapped.text
@property
def text(self):
# Selenium can only fetch text recursively.
# Could be implemented by injecting JS though.
raise NotImplementedError()
def itertext(self):
return [self.wrapped.text]
def __getattr__(self, attr):
return getattr(self.wrapped, attr)
@property
class attrib(object):
def __init__(self, el):
self.el = el
def __getitem__(self, k):
v = self.el.get_attribute(k)
if v is None:
raise KeyError('Attribute %r was not found' % k)
return v
def get(self, k, default=None):
v = self.el.get_attribute(k)
if v is None:
return default
return v
class SeleniumPage(object):
"""Page to use in a SeleniumBrowser
Differences with regular weboob Pages:
- cannot access raw HTML text
"""
logged = False
def __init__(self, browser):
super(SeleniumPage, self).__init__()
self.params = {}
self.browser = browser
self.driver = browser.driver
self.logger = getLogger(self.__class__.__name__.lower(), browser.logger)
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
@property
def doc(self):
return ElementWrapper(self.browser.driver.find_element_by_xpath('/*'))
def is_here(self):
"""Method to determine if the browser is on this page and the page is ready.
Use XPath and page content to determine if we are on this page.
Make sure the page is "ready" for the usage we want. For example, if there's
a splash screen in front the page, preventing click, it should return False.
`is_here` can be a method or a :any:`CustomCondition` instance.
"""
return True
# TODO get_form
class HTMLPage(BaseHTMLPage):
ENCODING = 'utf-8'
def __init__(self, browser):
fake = FakeResponse(
url=browser.url,
text=browser.page_source,
content=browser.page_source.encode('utf-8'),
encoding = 'utf-8',
)
super(HTMLPage, self).__init__(browser, fake, encoding='utf-8')
self.driver = browser.driver
OPTIONS_CLASSES = {
webdriver.Firefox: webdriver.FirefoxOptions,
webdriver.Chrome: webdriver.ChromeOptions,
webdriver.PhantomJS: webdriver.ChromeOptions, # unused, put dummy thing
}
CAPA_CLASSES = {
webdriver.Firefox: DesiredCapabilities.FIREFOX,
webdriver.Chrome: DesiredCapabilities.CHROME,
webdriver.PhantomJS: DesiredCapabilities.PHANTOMJS,
}
class DirFirefoxProfile(FirefoxProfile):
def __init__(self, custom_dir):
self._weboob_dir = custom_dir
super(DirFirefoxProfile, self).__init__()
def _create_tempfolder(self):
if self._weboob_dir:
return self._weboob_dir
return super(DirFirefoxProfile, self)._create_tempfolder()
class FakeResponse(object):
page = None
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
class SeleniumBrowser(object):
"""Browser similar to PagesBrowser, but using Selenium.
URLs instances can be used. The need_login decorator can be used too.
Differences:
- since JS code can be run anytime, the current `url` and `page` can change anytime
- it's not possible to use `open()`, only `location()` can be used
- many options are not implemented yet (like proxies) or cannot be implemented at all
"""
DRIVER = webdriver.Firefox
"""Selenium driver class"""
HEADLESS = True
"""Run without any display"""
DEFAULT_WAIT = 10
"""Default wait time for `wait_*` methods"""
WINDOW_SIZE = None
"""Rendering window size
It can be useful for responsive websites which show or hide elements depending
on the viewport size.
"""
MAX_SAVED_RESPONSES = (1 << 30) # limit to 1GiB
def __init__(self, logger=None, proxy=None, responses_dirname=None, weboob=None):
super(SeleniumBrowser, self).__init__()
self.responses_dirname = responses_dirname
self.responses_count = 0
self.weboob = weboob
self.logger = getLogger('browser', logger)
self.proxy = proxy or {}
self.implicit_timeout = 0
self.last_page_hash = None
self._setup_driver()
self._urls = []
cls = type(self)
for attr in dir(cls):
val = getattr(cls, attr)
if isinstance(val, URL):
val = deepcopy(val)
val.browser = self
setattr(self, attr, val)
self._urls.append(val)
self._urls.sort(key=lambda u: u._creation_counter)
def _build_options(self):
return OPTIONS_CLASSES[self.DRIVER]()
def _build_capabilities(self):
return CAPA_CLASSES[self.DRIVER].copy()
def _setup_driver(self):
proxy = Proxy()
proxy.proxy_type = ProxyType.DIRECT
if 'http' in self.proxy:
proxy.http_proxy = self.proxy['http']
if 'https' in self.proxy:
proxy.ssl_proxy = self.proxy['https']
capa = self._build_capabilities()
options = self._build_options()
# TODO some browsers don't need headless
# TODO handle different proxy setting?
options.set_headless(self.HEADLESS)
if self.responses_dirname and not os.path.isdir(self.responses_dirname):
os.makedirs(self.responses_dirname)
options.profile = DirFirefoxProfile(self.responses_dirname)
if self.responses_dirname:
capa['profile'] = self.responses_dirname
self.driver = self.DRIVER(options=options, capabilities=capa)
elif self.DRIVER is webdriver.Chrome:
self.driver = self.DRIVER(options=options, desired_capabilities=capa)
elif self.DRIVER is webdriver.PhantomJS:
if self.responses_dirname:
if not os.path.isdir(self.responses_dirname):
os.makedirs(self.responses_dirname)
log_path = os.path.join(self.responses_dirname, 'selenium.log')
else:
log_path = NamedTemporaryFile(prefix='weboob_selenium_', suffix='.log', delete=False).name
self.driver = self.DRIVER(desired_capabilities=capa, service_log_path=log_path)
if self.WINDOW_SIZE:
self.driver.set_window_size(*self.WINDOW_SIZE)
### Browser
def deinit(self):
if self.driver:
self.driver.quit()
@property
def url(self):
return self.driver.current_url
@property
def page(self):
def do_on_load(page):
if hasattr(page, 'on_load'):
page.on_load()
for val in self._urls:
if not val.match(self.url):
continue
page = val.klass(self)
with self.implicit_wait(0):
try:
if isinstance(page.is_here, CustomCondition):
if page.is_here(self.driver):
self.logger.debug('Handle %s with %s', self.url, type(page).__name__)
self.save_response_if_changed()
self.logger.debug('Handle %s with %s', self.url, type(page).__name__)
self.save_response_if_changed()
return page
except NoSuchElementException:
pass
self.logger.debug('Unable to handle %s', self.url)
def open(self, *args, **kwargs):
# TODO maybe implement with a new window?
raise NotImplementedError()
def location(self, url, data=None, headers=None, params=None, method=None, json=None):
"""Change current url of the browser.
Warning: unlike other requests-based weboob browsers, this function does not block
until the page is loaded, it's completely asynchronous.
To use the new page content, it's necessary to wait, either implicitly (e.g. with
context manager :any:`implicit_wait`) or explicitly (e.g. using method
:any:`wait_until`)
"""
assert method is None
assert params is None
assert data is None
assert json is None
assert not headers
self.logger.debug('opening %r', url)
self.driver.get(url)
try:
WebDriverWait(self.driver, 1).until(EC.url_changes(self.url))
except TimeoutException:
pass
return FakeResponse(page=self.page)
def export_session(self):
cookies = [cookie.copy() for cookie in self.driver.get_cookies()]
for cookie in cookies:
cookie['expirationDate'] = cookie.pop('expiry', None)
ret = {
'url': self.url,
'cookies': cookies,
}
return ret
def save_response_if_changed(self):
hash = hashlib.md5(self.driver.page_source.encode('utf-8')).hexdigest()
if self.last_page_hash != hash:
self.save_response()
self.last_page_hash = hash
def save_response(self):
if self.responses_dirname:
if not os.path.isdir(self.responses_dirname):
os.makedirs(self.responses_dirname)
total = sum(os.path.getsize(f) for f in glob('%s/*' % self.responses_dirname))
if self.MAX_SAVED_RESPONSES is not None and total >= self.MAX_SAVED_RESPONSES:
self.logger.info('quota reached, not saving responses')
return
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
self.responses_count += 1
path = '%s/%02d.html' % (self.responses_dirname, self.responses_count)
with codecs.open(path, 'w', encoding='utf-8') as fd:
fd.write(self.driver.page_source)
self.logger.info('Response saved to %s', path)
def absurl(self, uri, base=None):
# FIXME this is copy-pasta from DomainBrowser
if not base:
base = self.url
if base is None or base is True:
base = self.BASEURL
return urljoin(base, uri)
### a few selenium wrappers
def wait_xpath(self, xpath, timeout=None):
self.wait_until(EC.presence_of_element_located(xpath_locator(xpath)), timeout)
def wait_xpath_visible(self, xpath, timeout=None):
self.wait_until(EC.visibility_of_element_located(xpath_locator(xpath)), timeout)
def wait_xpath_clickable(self, xpath, timeout=None):
self.wait_until(EC.element_to_be_clickable(xpath_locator(xpath)), timeout)
def wait_until_is_here(self, urlobj, timeout=None):
self.wait_until(IsHereCondition(urlobj), timeout)
def wait_until(self, condition, timeout=None):
"""Wait until some condition object is met
Wraps WebDriverWait.
See https://seleniumhq.github.io/selenium/docs/api/py/webdriver_support/selenium.webdriver.support.wait.html
See :any:`CustomCondition`.
:param timeout: wait time in seconds (else DEFAULT_WAIT if None)
"""
if timeout is None:
timeout = self.DEFAULT_WAIT
try:
WebDriverWait(self.driver, timeout).until(condition)
except (NoSuchElementException, TimeoutException):
if self.responses_dirname:
self.driver.get_screenshot_as_file('%s/%02d.png' % (self.responses_dirname, self.responses_count))
self.save_response()
raise
def implicitly_wait(self, timeout):
"""Set implicit wait time
When querying anything in DOM in Selenium, like evaluating XPath, if not found,
Selenium will wait in a blocking manner until it is found or until the
implicit wait timeouts.
By default, it is 0, so if an XPath is not found, it fails immediately.
:param timeout: new implicit wait time in seconds
"""
self.implicit_timeout = timeout
self.driver.implicitly_wait(timeout)
@contextmanager
def implicit_wait(self, timeout):
"""Context manager to change implicit wait time and restore it
Example::
with browser.implicit_wait(10):
# Within this block, the implicit wait will be set to 10 seconds
# and be restored at the end of block.
# If the link is not found immediately, it will be periodically
# retried until found (for max 10 seconds).
el = self.find_element_link_text("Show list")
el.click()
"""
old = self.implicit_timeout
try:
self.driver.implicitly_wait(timeout)
yield
finally:
self.driver.implicitly_wait(old)
@contextmanager
def in_frame(self, selector):
"""Context manager to execute a block inside a frame and restore main page after.
In selenium, to operate on a frame's content, one needs to switch to the frame before
and return to main page after.
:param selector: selector to match the frame
Example::
with self.in_frame(xpath_locator('//frame[@id="foo"]')):
el = self.find_element_by_xpath('//a[@id="bar"]')
el.click()
"""
self.driver.switch_to.frame(selector)
try:
yield
finally:
self.driver.switch_to.default_content()
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
def get_storage(self):
"""Get localStorage content for current domain.
As for cookies, this method only manipulates data for current domain.
It's not possible to get all localStorage content. To get localStorage
for multiple domains, the browser must change the url to each domain
and call get_storage each time after.
To do so, it's wise to choose a neutral URL (like an image file or JS file)
to avoid the target page itself changing the cookies.
"""
response = self.driver.execute(Command.GET_LOCAL_STORAGE_KEYS)
ret = {}
for k in response['value']:
response = self.driver.execute(Command.GET_LOCAL_STORAGE_ITEM, {'key': k})
ret[k] = response['value']
return ret
def update_storage(self, d):
"""Update local storage content for current domain.
It has the same restrictions as `get_storage`.
"""
for k, v in d.items():
self.driver.execute(Command.SET_LOCAL_STORAGE_ITEM, {'key': k, 'value': v})
def clear_storage(self):
"""Clear local storage."""
self.driver.execute(Command.CLEAR_LOCAL_STORAGE)
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
class SubSeleniumMixin(object):
"""Mixin to have a Selenium browser for performing login."""
SELENIUM_BROWSER = None
"""Class of Selenium browser to use for the login"""
__states__ = ('selenium_state',)
selenium_state = None
def create_selenium_browser(self):
dirname = self.responses_dirname
if dirname:
dirname += '/selenium'
return self.SELENIUM_BROWSER(self.config, logger=self.logger, responses_dirname=dirname, proxy=self.PROXIES)
def do_login(self):
sub_browser = self.create_selenium_browser()
try:
if self.selenium_state and hasattr(sub_browser, 'load_state'):
sub_browser.load_state(self.selenium_state)
sub_browser.do_login()
self.load_selenium_session(sub_browser)
finally:
try:
if hasattr(sub_browser, 'dump_state'):
self.selenium_state = sub_browser.dump_state()
finally:
sub_browser.deinit()
def load_selenium_session(self, selenium):
d = selenium.export_session()
for cookie in d['cookies']:
self.session.cookies.set(cookie['name'], cookie['value'], domain=cookie['domain'])
if hasattr(self, 'locate_browser'):
self.locate_browser(d)