Skip to content
GitLab
Menu
Why GitLab
Pricing
Contact Sales
Explore
Why GitLab
Pricing
Contact Sales
Explore
Sign in
Get free trial
woob
woob
Compare revisions
5e0e2e824b0aca3a36d69417a81bdc461cb3274d to 9d1ab5affb8f54064101c47a630567a8566de406
Commits on Source (2)
stable_backport_data: add more stuff
· e03ab338
hydrargyrum
authored
Jan 30, 2020
e03ab338
backport master modules fixes
· 9d1ab5af
hydrargyrum
authored
Jan 30, 2020
9d1ab5af
Hide whitespace changes
Inline
Side-by-side
modules/750g/compat/weboob_browser_filters_standard.py
View file @
9d1ab5af
...
...
@@ -47,3 +47,6 @@ def filter(self, txt):
return
self
.
map_dict
[
key
]
return
self
.
default_or_raise
(
ItemNotFound
(
'
Unable to handle %r on %r
'
%
(
txt
,
self
.
map_dict
)))
Title
=
Capitalize
modules/adecco/compat/weboob_browser_filters_standard.py
View file @
9d1ab5af
...
...
@@ -47,3 +47,6 @@ def filter(self, txt):
return
self
.
map_dict
[
key
]
return
self
.
default_or_raise
(
ItemNotFound
(
'
Unable to handle %r on %r
'
%
(
txt
,
self
.
map_dict
)))
Title
=
Capitalize
modules/afer/compat/weboob_capabilities_bank.py
View file @
9d1ab5af
import
weboob.capabilities.bank
as
OLD
from
weboob.capabilities.base
import
StringField
from
weboob.capabilities.base
import
StringField
,
DecimalField
from
weboob.capabilities.date
import
DateField
# can't import *, __all__ is incomplete...
...
...
@@ -17,7 +17,13 @@
# can't create a subclass because of CapBank.iter_resources reimplementations:
# modules will import our subclass, but boobank will call iter_resources with the OLD class
Account
.
_fields
[
'
ownership
'
]
=
StringField
(
'
Relationship between the credentials owner (PSU) and the account
'
)
Account
.
_fields
[
'
company_name
'
]
=
StringField
(
'
Name of the company of the stock - only for employee savings
'
)
Loan
.
_fields
[
'
ownership
'
]
=
StringField
(
'
Relationship between the credentials owner (PSU) and the account
'
)
Transaction
.
_fields
[
'
gross_amount
'
]
=
DecimalField
(
'
Amount of the transaction without the commission
'
)
Transaction
.
_fields
[
'
original_commission
'
]
=
DecimalField
(
'
Original commission (in another currency)
'
)
Transaction
.
_fields
[
'
original_commission_currency
'
]
=
StringField
(
'
Currency of the original commission
'
)
Transaction
.
_fields
[
'
original_gross_amount
'
]
=
DecimalField
(
'
Original gross amount (in another currency)
'
)
Transaction
.
_fields
[
'
bdate
'
]
=
DateField
(
'
Bank date, when the transaction appear on website (usually extracted from column date)
'
)
...
...
@@ -45,6 +51,17 @@ class AccountOwnership(object):
AccountOwnerType
.
ASSOCIATION
=
u
'
ASSO
'
AccountType
.
TYPE_PER
=
20
Account
.
TYPE_PER
=
20
class
BeneficiaryType
(
object
):
RECIPIENT
=
'
recipient
'
IBAN
=
'
iban
'
PHONE_NUMBER
=
'
phone_number
'
CapBankTransfer
.
accepted_beneficiary_types
=
(
BeneficiaryType
.
RECIPIENT
,)
try
:
__all__
+=
[
'
AccountOwnership
'
,
'
RecipientInvalidOTP
'
,
'
TransferInvalidOTP
'
]
...
...
modules/agendaculturel/compat/weboob_browser_filters_standard.py
View file @
9d1ab5af
...
...
@@ -47,3 +47,6 @@ def filter(self, txt):
return
self
.
map_dict
[
key
]
return
self
.
default_or_raise
(
ItemNotFound
(
'
Unable to handle %r on %r
'
%
(
txt
,
self
.
map_dict
)))
Title
=
Capitalize
modules/agendadulibre/compat/weboob_browser_filters_standard.py
View file @
9d1ab5af
...
...
@@ -47,3 +47,6 @@ def filter(self, txt):
return
self
.
map_dict
[
key
]
return
self
.
default_or_raise
(
ItemNotFound
(
'
Unable to handle %r on %r
'
%
(
txt
,
self
.
map_dict
)))
Title
=
Capitalize
modules/airparif/compat/weboob_browser_filters_standard.py
View file @
9d1ab5af
...
...
@@ -47,3 +47,6 @@ def filter(self, txt):
return
self
.
map_dict
[
key
]
return
self
.
default_or_raise
(
ItemNotFound
(
'
Unable to handle %r on %r
'
%
(
txt
,
self
.
map_dict
)))
Title
=
Capitalize
modules/allocine/browser.py
View file @
9d1ab5af
...
...
@@ -22,7 +22,7 @@
import
time
from
datetime
import
date
,
datetime
,
timedelta
from
weboob
.
browser
.
browsers
import
APIBrowser
from
.compat.
weboob
_
browser
_
browsers
import
APIBrowser
from
weboob.browser.profiles
import
Android
from
weboob.capabilities.base
import
NotAvailable
,
NotLoaded
,
find_object
from
weboob.capabilities.calendar
import
CATEGORIES
,
STATUS
,
TRANSP
,
BaseCalendarEvent
...
...
modules/allocine/compat/__init__.py
0 → 100644
View file @
9d1ab5af
modules/allocine/compat/weboob_browser_browsers.py
0 → 100644
View file @
9d1ab5af
import
weboob.browser.browsers
as
OLD
# can't import *, __all__ is incomplete...
for
attr
in
dir
(
OLD
):
globals
()[
attr
]
=
getattr
(
OLD
,
attr
)
try
:
__all__
=
OLD
.
__all__
except
AttributeError
:
pass
class
OAuth2Mixin
(
StatesMixin
):
AUTHORIZATION_URI
=
None
ACCESS_TOKEN_URI
=
None
SCOPE
=
''
client_id
=
None
client_secret
=
None
redirect_uri
=
None
access_token
=
None
access_token_expire
=
None
auth_uri
=
None
token_type
=
None
refresh_token
=
None
oauth_state
=
None
def
__init__
(
self
,
*
args
,
**
kwargs
):
super
(
OAuth2Mixin
,
self
).
__init__
(
*
args
,
**
kwargs
)
self
.
__states__
+=
(
'
access_token
'
,
'
access_token_expire
'
,
'
refresh_token
'
,
'
token_type
'
)
def
build_request
(
self
,
*
args
,
**
kwargs
):
headers
=
kwargs
.
setdefault
(
'
headers
'
,
{})
if
self
.
access_token
:
headers
[
'
Authorization
'
]
=
'
{} {}
'
.
format
(
self
.
token_type
,
self
.
access_token
)
return
super
(
OAuth2Mixin
,
self
).
build_request
(
*
args
,
**
kwargs
)
def
dump_state
(
self
):
self
.
access_token_expire
=
unicode
(
self
.
access_token_expire
)
if
self
.
access_token_expire
else
None
return
super
(
OAuth2Mixin
,
self
).
dump_state
()
def
load_state
(
self
,
state
):
super
(
OAuth2Mixin
,
self
).
load_state
(
state
)
self
.
access_token_expire
=
parser
.
parse
(
self
.
access_token_expire
)
if
self
.
access_token_expire
else
None
def
raise_for_status
(
self
,
response
):
if
response
.
status_code
==
401
:
self
.
access_token
=
None
return
super
(
OAuth2Mixin
,
self
).
raise_for_status
(
response
)
@property
def
logged
(
self
):
return
self
.
access_token
is
not
None
and
(
not
self
.
access_token_expire
or
self
.
access_token_expire
>
datetime
.
now
())
def
do_login
(
self
):
if
self
.
refresh_token
:
self
.
use_refresh_token
()
elif
self
.
auth_uri
:
self
.
request_access_token
(
self
.
auth_uri
)
else
:
self
.
request_authorization
()
def
build_authorization_parameters
(
self
):
params
=
{
'
redirect_uri
'
:
self
.
redirect_uri
,
'
scope
'
:
self
.
SCOPE
,
'
client_id
'
:
self
.
client_id
,
'
response_type
'
:
'
code
'
,
}
if
self
.
oauth_state
:
params
[
'
state
'
]
=
self
.
oauth_state
return
params
def
build_authorization_uri
(
self
):
p
=
urlparse
(
self
.
AUTHORIZATION_URI
)
q
=
dict
(
parse_qsl
(
p
.
query
))
q
.
update
(
self
.
build_authorization_parameters
())
return
p
.
_replace
(
query
=
urlencode
(
q
)).
geturl
()
def
request_authorization
(
self
):
self
.
logger
.
info
(
'
request authorization
'
)
raise
BrowserRedirect
(
self
.
build_authorization_uri
())
def
handle_callback_error
(
self
,
values
):
# Here we try to catch callback errors occurring during enrollment
# Ideally overload this method in each module to catch specific error
assert
values
.
get
(
'
code
'
),
"
No
'
code
'
was found into the callback url, please raise the right error: %s
"
%
values
def
build_access_token_parameters
(
self
,
values
):
return
{
'
code
'
:
values
[
'
code
'
],
'
grant_type
'
:
'
authorization_code
'
,
'
redirect_uri
'
:
self
.
redirect_uri
,
'
client_id
'
:
self
.
client_id
,
'
client_secret
'
:
self
.
client_secret
,
}
def
do_token_request
(
self
,
data
):
return
self
.
open
(
self
.
ACCESS_TOKEN_URI
,
data
=
data
)
def
request_access_token
(
self
,
auth_uri
):
self
.
logger
.
info
(
'
requesting access token
'
)
if
isinstance
(
auth_uri
,
dict
):
values
=
auth_uri
else
:
values
=
dict
(
parse_qsl
(
urlparse
(
auth_uri
).
query
))
self
.
handle_callback_error
(
values
)
data
=
self
.
build_access_token_parameters
(
values
)
try
:
auth_response
=
self
.
do_token_request
(
data
).
json
()
except
ClientError
:
raise
BrowserIncorrectPassword
()
self
.
update_token
(
auth_response
)
def
build_refresh_token_parameters
(
self
):
return
{
'
grant_type
'
:
'
refresh_token
'
,
'
refresh_token
'
:
self
.
refresh_token
,
'
client_id
'
:
self
.
client_id
,
'
client_secret
'
:
self
.
client_secret
,
'
redirect_uri
'
:
self
.
redirect_uri
,
}
def
use_refresh_token
(
self
):
self
.
logger
.
info
(
'
refreshing token
'
)
data
=
self
.
build_refresh_token_parameters
()
try
:
auth_response
=
self
.
do_token_request
(
data
).
json
()
except
ClientError
:
self
.
refresh_token
=
None
raise
BrowserIncorrectPassword
()
self
.
update_token
(
auth_response
)
def
update_token
(
self
,
auth_response
):
self
.
token_type
=
auth_response
.
get
(
'
token_type
'
,
'
Bearer
'
).
capitalize
()
# don't know yet if this is a good idea, but required by bnpstet
if
'
refresh_token
'
in
auth_response
:
self
.
refresh_token
=
auth_response
[
'
refresh_token
'
]
self
.
access_token
=
auth_response
[
'
access_token
'
]
self
.
access_token_expire
=
datetime
.
now
()
+
timedelta
(
seconds
=
int
(
auth_response
[
'
expires_in
'
]))
class
OAuth2PKCEMixin
(
OAuth2Mixin
):
def
__init__
(
self
,
*
args
,
**
kwargs
):
super
(
OAuth2PKCEMixin
,
self
).
__init__
(
*
args
,
**
kwargs
)
self
.
__states__
+=
(
'
pkce_verifier
'
,
'
pkce_challenge
'
)
self
.
pkce_verifier
=
self
.
code_verifier
()
self
.
pkce_challenge
=
self
.
code_challenge
(
self
.
pkce_verifier
)
# PKCE (Proof Key for Code Exchange) standard protocol methods:
def
code_verifier
(
self
,
bytes_number
=
64
):
return
base64
.
urlsafe_b64encode
(
os
.
urandom
(
bytes_number
)).
rstrip
(
b
'
=
'
).
decode
(
'
ascii
'
)
def
code_challenge
(
self
,
verifier
):
digest
=
sha256
(
verifier
.
encode
(
'
utf8
'
)).
digest
()
return
base64
.
urlsafe_b64encode
(
digest
).
rstrip
(
b
'
=
'
).
decode
(
'
ascii
'
)
def
build_authorization_parameters
(
self
):
params
=
{
'
redirect_uri
'
:
self
.
redirect_uri
,
'
code_challenge_method
'
:
'
S256
'
,
'
code_challenge
'
:
self
.
pkce_challenge
,
'
client_id
'
:
self
.
client_id
,
}
if
self
.
oauth_state
:
params
[
'
state
'
]
=
self
.
oauth_state
return
params
def
build_access_token_parameters
(
self
,
values
):
return
{
'
code
'
:
values
[
'
code
'
],
'
grant_type
'
:
'
authorization_code
'
,
'
code_verifier
'
:
self
.
pkce_verifier
,
'
redirect_uri
'
:
self
.
redirect_uri
,
'
client_id
'
:
self
.
client_id
,
'
client_secret
'
:
self
.
client_secret
,
}
class
TwoFactorBrowser
(
LoginBrowser
,
StatesMixin
):
# period to keep the same state
# it is different from STATE_DURATION which updates the expire date at each dump
TWOFA_DURATION
=
None
INTERACTIVE_NAME
=
'
request_information
'
# dict of config keys and methods used for double authentication
# must be set up in the init to handle function pointers
AUTHENTICATION_METHODS
=
{}
# list of cookie keys to clear before dumping state
COOKIES_TO_CLEAR
=
()
# login can also be done with credentials without 2FA
HAS_CREDENTIALS_ONLY
=
False
def
__init__
(
self
,
config
,
*
args
,
**
kwargs
):
super
(
TwoFactorBrowser
,
self
).
__init__
(
*
args
,
**
kwargs
)
self
.
config
=
config
self
.
is_interactive
=
config
.
get
(
self
.
INTERACTIVE_NAME
,
Value
()).
get
()
is
not
None
self
.
twofa_logged_date
=
None
self
.
__states__
+=
(
'
twofa_logged_date
'
,)
def
get_expire
(
self
):
expires_dates
=
[
datetime
.
now
()
+
timedelta
(
minutes
=
self
.
STATE_DURATION
)]
if
getattr
(
self
,
'
twofa_logged_date
'
,
None
)
and
self
.
TWOFA_DURATION
is
not
None
:
expires_dates
.
append
(
self
.
twofa_logged_date
+
timedelta
(
minutes
=
self
.
TWOFA_DURATION
))
return
unicode
(
max
(
expires_dates
).
replace
(
microsecond
=
0
))
def
dump_state
(
self
):
self
.
clear_not_2fa_cookies
()
# so the date can be parsed in json
# because twofa_logged_date is in state
self
.
twofa_logged_date
=
str
(
self
.
twofa_logged_date
)
return
super
(
TwoFactorBrowser
,
self
).
dump_state
()
def
init_login
(
self
):
"""
Abstract method to implement initiation of login on website.
This method should raise an exception.
SCA exceptions :
- AppValidation for polling method
- BrowserQuestion for SMS method, token method etc.
Any other exceptions, default to BrowserIncorrectPassword.
"""
raise
NotImplementedError
()
def
clear_init_cookies
(
self
):
# clear cookies to avoid some errors
self
.
session
.
cookies
.
clear
()
def
clear_not_2fa_cookies
(
self
):
# clear cookies that we don't need for 2FA
for
cookie_key
in
self
.
COOKIES_TO_CLEAR
:
if
cookie_key
in
self
.
session
.
cookies
:
del
self
.
session
.
cookies
[
cookie_key
]
def
check_interactive
(
self
):
if
not
self
.
is_interactive
:
raise
NeedInteractiveFor2FA
()
def
do_double_authentication
(
self
):
"""
This method will check AUTHENTICATION_METHODS
to dispatch to the right handle_* method.
If no backend configuration could be found,
it will then call init_login method.
"""
assert
self
.
AUTHENTICATION_METHODS
,
'
There is no config for the double authentication.
'
self
.
twofa_logged_date
=
None
for
config_key
,
handle_method
in
self
.
AUTHENTICATION_METHODS
.
items
():
setattr
(
self
,
config_key
,
self
.
config
.
get
(
config_key
,
Value
()).
get
())
if
getattr
(
self
,
config_key
):
handle_method
()
self
.
twofa_logged_date
=
datetime
.
now
()
# cleaning authentication config keys
for
config_key
in
self
.
AUTHENTICATION_METHODS
.
keys
():
if
config_key
in
self
.
config
:
self
.
config
[
config_key
]
=
self
.
config
[
config_key
].
default
break
else
:
if
not
self
.
HAS_CREDENTIALS_ONLY
:
self
.
check_interactive
()
self
.
clear_init_cookies
()
self
.
init_login
()
do_login
=
do_double_authentication
modules/allrecipes/compat/weboob_browser_filters_standard.py
View file @
9d1ab5af
...
...
@@ -47,3 +47,6 @@ def filter(self, txt):
return
self
.
map_dict
[
key
]
return
self
.
default_or_raise
(
ItemNotFound
(
'
Unable to handle %r on %r
'
%
(
txt
,
self
.
map_dict
)))
Title
=
Capitalize
modules/amazon/browser.py
View file @
9d1ab5af
...
...
@@ -26,7 +26,7 @@
WrongCaptchaResponse
)
from
weboob.tools.value
import
Value
from
weboob
.
browser
.
browsers
import
ClientError
from
.compat.
weboob
_
browser
_
browsers
import
ClientError
from
.pages
import
(
LoginPage
,
SubscriptionsPage
,
DocumentsPage
,
DownloadDocumentPage
,
HomePage
,
PanelPage
,
SecurityPage
,
...
...
modules/amazon/compat/weboob_browser_browsers.py
0 → 100644
View file @
9d1ab5af
import
weboob.browser.browsers
as
OLD
# can't import *, __all__ is incomplete...
for
attr
in
dir
(
OLD
):
globals
()[
attr
]
=
getattr
(
OLD
,
attr
)
try
:
__all__
=
OLD
.
__all__
except
AttributeError
:
pass
class
OAuth2Mixin
(
StatesMixin
):
AUTHORIZATION_URI
=
None
ACCESS_TOKEN_URI
=
None
SCOPE
=
''
client_id
=
None
client_secret
=
None
redirect_uri
=
None
access_token
=
None
access_token_expire
=
None
auth_uri
=
None
token_type
=
None
refresh_token
=
None
oauth_state
=
None
def
__init__
(
self
,
*
args
,
**
kwargs
):
super
(
OAuth2Mixin
,
self
).
__init__
(
*
args
,
**
kwargs
)
self
.
__states__
+=
(
'
access_token
'
,
'
access_token_expire
'
,
'
refresh_token
'
,
'
token_type
'
)
def
build_request
(
self
,
*
args
,
**
kwargs
):
headers
=
kwargs
.
setdefault
(
'
headers
'
,
{})
if
self
.
access_token
:
headers
[
'
Authorization
'
]
=
'
{} {}
'
.
format
(
self
.
token_type
,
self
.
access_token
)
return
super
(
OAuth2Mixin
,
self
).
build_request
(
*
args
,
**
kwargs
)
def
dump_state
(
self
):
self
.
access_token_expire
=
unicode
(
self
.
access_token_expire
)
if
self
.
access_token_expire
else
None
return
super
(
OAuth2Mixin
,
self
).
dump_state
()
def
load_state
(
self
,
state
):
super
(
OAuth2Mixin
,
self
).
load_state
(
state
)
self
.
access_token_expire
=
parser
.
parse
(
self
.
access_token_expire
)
if
self
.
access_token_expire
else
None
def
raise_for_status
(
self
,
response
):
if
response
.
status_code
==
401
:
self
.
access_token
=
None
return
super
(
OAuth2Mixin
,
self
).
raise_for_status
(
response
)
@property
def
logged
(
self
):
return
self
.
access_token
is
not
None
and
(
not
self
.
access_token_expire
or
self
.
access_token_expire
>
datetime
.
now
())
def
do_login
(
self
):
if
self
.
refresh_token
:
self
.
use_refresh_token
()
elif
self
.
auth_uri
:
self
.
request_access_token
(
self
.
auth_uri
)
else
:
self
.
request_authorization
()
def
build_authorization_parameters
(
self
):
params
=
{
'
redirect_uri
'
:
self
.
redirect_uri
,
'
scope
'
:
self
.
SCOPE
,
'
client_id
'
:
self
.
client_id
,
'
response_type
'
:
'
code
'
,
}
if
self
.
oauth_state
:
params
[
'
state
'
]
=
self
.
oauth_state
return
params
def
build_authorization_uri
(
self
):
p
=
urlparse
(
self
.
AUTHORIZATION_URI
)
q
=
dict
(
parse_qsl
(
p
.
query
))
q
.
update
(
self
.
build_authorization_parameters
())
return
p
.
_replace
(
query
=
urlencode
(
q
)).
geturl
()
def
request_authorization
(
self
):
self
.
logger
.
info
(
'
request authorization
'
)
raise
BrowserRedirect
(
self
.
build_authorization_uri
())
def
handle_callback_error
(
self
,
values
):
# Here we try to catch callback errors occurring during enrollment
# Ideally overload this method in each module to catch specific error
assert
values
.
get
(
'
code
'
),
"
No
'
code
'
was found into the callback url, please raise the right error: %s
"
%
values
def
build_access_token_parameters
(
self
,
values
):
return
{
'
code
'
:
values
[
'
code
'
],
'
grant_type
'
:
'
authorization_code
'
,
'
redirect_uri
'
:
self
.
redirect_uri
,
'
client_id
'
:
self
.
client_id
,
'
client_secret
'
:
self
.
client_secret
,
}
def
do_token_request
(
self
,
data
):
return
self
.
open
(
self
.
ACCESS_TOKEN_URI
,
data
=
data
)
def
request_access_token
(
self
,
auth_uri
):
self
.
logger
.
info
(
'
requesting access token
'
)
if
isinstance
(
auth_uri
,
dict
):
values
=
auth_uri
else
:
values
=
dict
(
parse_qsl
(
urlparse
(
auth_uri
).
query
))
self
.
handle_callback_error
(
values
)
data
=
self
.
build_access_token_parameters
(
values
)
try
:
auth_response
=
self
.
do_token_request
(
data
).
json
()
except
ClientError
:
raise
BrowserIncorrectPassword
()
self
.
update_token
(
auth_response
)
def
build_refresh_token_parameters
(
self
):
return
{
'
grant_type
'
:
'
refresh_token
'
,
'
refresh_token
'
:
self
.
refresh_token
,
'
client_id
'
:
self
.
client_id
,
'
client_secret
'
:
self
.
client_secret
,
'
redirect_uri
'
:
self
.
redirect_uri
,
}
def
use_refresh_token
(
self
):
self
.
logger
.
info
(
'
refreshing token
'
)
data
=
self
.
build_refresh_token_parameters
()
try
:
auth_response
=
self
.
do_token_request
(
data
).
json
()
except
ClientError
:
self
.
refresh_token
=
None
raise
BrowserIncorrectPassword
()
self
.
update_token
(
auth_response
)
def
update_token
(
self
,
auth_response
):
self
.
token_type
=
auth_response
.
get
(
'
token_type
'
,
'
Bearer
'
).
capitalize
()
# don't know yet if this is a good idea, but required by bnpstet
if
'
refresh_token
'
in
auth_response
:
self
.
refresh_token
=
auth_response
[
'
refresh_token
'
]
self
.
access_token
=
auth_response
[
'
access_token
'
]
self
.
access_token_expire
=
datetime
.
now
()
+
timedelta
(
seconds
=
int
(
auth_response
[
'
expires_in
'
]))
class
OAuth2PKCEMixin
(
OAuth2Mixin
):
def
__init__
(
self
,
*
args
,
**
kwargs
):
super
(
OAuth2PKCEMixin
,
self
).
__init__
(
*
args
,
**
kwargs
)
self
.
__states__
+=
(
'
pkce_verifier
'
,
'
pkce_challenge
'
)
self
.
pkce_verifier
=
self
.
code_verifier
()
self
.
pkce_challenge
=
self
.
code_challenge
(
self
.
pkce_verifier
)
# PKCE (Proof Key for Code Exchange) standard protocol methods:
def
code_verifier
(
self
,
bytes_number
=
64
):
return
base64
.
urlsafe_b64encode
(
os
.
urandom
(
bytes_number
)).
rstrip
(
b
'
=
'
).
decode
(
'
ascii
'
)
def
code_challenge
(
self
,
verifier
):
digest
=
sha256
(
verifier
.
encode
(
'
utf8
'
)).
digest
()
return
base64
.
urlsafe_b64encode
(
digest
).
rstrip
(
b
'
=
'
).
decode
(
'
ascii
'
)
def
build_authorization_parameters
(
self
):
params
=
{
'
redirect_uri
'
:
self
.
redirect_uri
,
'
code_challenge_method
'
:
'
S256
'
,
'
code_challenge
'
:
self
.
pkce_challenge
,
'
client_id
'
:
self
.
client_id
,
}
if
self
.
oauth_state
:
params
[
'
state
'
]
=
self
.
oauth_state
return
params
def
build_access_token_parameters
(
self
,
values
):
return
{
'
code
'
:
values
[
'
code
'
],
'
grant_type
'
:
'
authorization_code
'
,
'
code_verifier
'
:
self
.
pkce_verifier
,
'
redirect_uri
'
:
self
.
redirect_uri
,
'
client_id
'
:
self
.
client_id
,
'
client_secret
'
:
self
.
client_secret
,
}
class
TwoFactorBrowser
(
LoginBrowser
,
StatesMixin
):
# period to keep the same state
# it is different from STATE_DURATION which updates the expire date at each dump
TWOFA_DURATION
=
None
INTERACTIVE_NAME
=
'
request_information
'
# dict of config keys and methods used for double authentication
# must be set up in the init to handle function pointers
AUTHENTICATION_METHODS
=
{}
# list of cookie keys to clear before dumping state
COOKIES_TO_CLEAR
=
()
# login can also be done with credentials without 2FA
HAS_CREDENTIALS_ONLY
=
False
def
__init__
(
self
,
config
,
*
args
,
**
kwargs
):
super
(
TwoFactorBrowser
,
self
).
__init__
(
*
args
,
**
kwargs
)
self
.
config
=
config
self
.
is_interactive
=
config
.
get
(
self
.
INTERACTIVE_NAME
,
Value
()).
get
()
is
not
None
self
.
twofa_logged_date
=
None
self
.
__states__
+=
(
'
twofa_logged_date
'
,)
def
get_expire
(
self
):
expires_dates
=
[
datetime
.
now
()
+
timedelta
(
minutes
=
self
.
STATE_DURATION
)]
if
getattr
(
self
,
'
twofa_logged_date
'
,
None
)
and
self
.
TWOFA_DURATION
is
not
None
:
expires_dates
.
append
(
self
.
twofa_logged_date
+
timedelta
(
minutes
=
self
.
TWOFA_DURATION
))
return
unicode
(
max
(
expires_dates
).
replace
(
microsecond
=
0
))
def
dump_state
(
self
):
self
.
clear_not_2fa_cookies
()
# so the date can be parsed in json
# because twofa_logged_date is in state
self
.
twofa_logged_date
=
str
(
self
.
twofa_logged_date
)
return
super
(
TwoFactorBrowser
,
self
).
dump_state
()
def
init_login
(
self
):
"""
Abstract method to implement initiation of login on website.
This method should raise an exception.
SCA exceptions :
- AppValidation for polling method
- BrowserQuestion for SMS method, token method etc.
Any other exceptions, default to BrowserIncorrectPassword.
"""
raise
NotImplementedError
()
def
clear_init_cookies
(
self
):
# clear cookies to avoid some errors
self
.
session
.
cookies
.
clear
()
def
clear_not_2fa_cookies
(
self
):
# clear cookies that we don't need for 2FA
for
cookie_key
in
self
.
COOKIES_TO_CLEAR
:
if
cookie_key
in
self
.
session
.
cookies
:
del
self
.
session
.
cookies
[
cookie_key
]
def
check_interactive
(
self
):
if
not
self
.
is_interactive
:
raise
NeedInteractiveFor2FA
()
def
do_double_authentication
(
self
):
"""
This method will check AUTHENTICATION_METHODS
to dispatch to the right handle_* method.
If no backend configuration could be found,
it will then call init_login method.
"""
assert
self
.
AUTHENTICATION_METHODS
,
'
There is no config for the double authentication.
'
self
.
twofa_logged_date
=
None
for
config_key
,
handle_method
in
self
.
AUTHENTICATION_METHODS
.
items
():
setattr
(
self
,
config_key
,
self
.
config
.
get
(
config_key
,
Value
()).
get
())
if
getattr
(
self
,
config_key
):
handle_method
()
self
.
twofa_logged_date
=
datetime
.
now
()
# cleaning authentication config keys
for
config_key
in
self
.
AUTHENTICATION_METHODS
.
keys
():
if
config_key
in
self
.
config
:
self
.
config
[
config_key
]
=
self
.
config
[
config_key
].
default
break
else
:
if
not
self
.
HAS_CREDENTIALS_ONLY
:
self
.
check_interactive
()
self
.
clear_init_cookies
()
self
.
init_login
()
do_login
=
do_double_authentication
modules/amazon/compat/weboob_browser_filters_standard.py
View file @
9d1ab5af
...
...
@@ -47,3 +47,6 @@ def filter(self, txt):
return
self
.
map_dict
[
key
]
return
self
.
default_or_raise
(
ItemNotFound
(
'
Unable to handle %r on %r
'
%
(
txt
,
self
.
map_dict
)))
Title
=
Capitalize
modules/amazon/compat/weboob_exceptions.py
View file @
9d1ab5af
...
...
@@ -38,6 +38,18 @@ class AppValidation(DecoupledValidation):
pass
class
AppValidationError
(
Exception
):
pass
class
AppValidationCancelled
(
AppValidationError
):
pass
class
AppValidationExpired
(
AppValidationError
):
pass
class
NeedInteractive
(
Exception
):
pass
...
...
@@ -55,3 +67,20 @@ class NeedInteractiveFor2FA(NeedInteractive):
"""
pass
class
WrongCaptchaResponse
(
Exception
):
"""
when website tell us captcha response is not good
"""
def
__init__
(
self
,
message
=
None
):
super
(
WrongCaptchaResponse
,
self
).
__init__
(
message
or
"
Captcha response is wrong
"
)
class
RecaptchaV3Question
(
CaptchaQuestion
):
type
=
'
g_recaptcha
'
website_key
=
None
website_url
=
None
action
=
None
def
__init__
(
self
,
website_key
,
website_url
,
action
=
None
):
super
(
RecaptchaV3Question
,
self
).
__init__
(
self
.
type
,
website_key
=
website_key
,
website_url
=
website_url
)
self
.
action
=
action
modules/amazonstorecard/compat/weboob_capabilities_bank.py
View file @
9d1ab5af
import
weboob.capabilities.bank
as
OLD
from
weboob.capabilities.base
import
StringField
from
weboob.capabilities.base
import
StringField
,
DecimalField
from
weboob.capabilities.date
import
DateField
# can't import *, __all__ is incomplete...
...
...
@@ -17,7 +17,13 @@
# can't create a subclass because of CapBank.iter_resources reimplementations:
# modules will import our subclass, but boobank will call iter_resources with the OLD class
Account
.
_fields
[
'
ownership
'
]
=
StringField
(
'
Relationship between the credentials owner (PSU) and the account
'
)
Account
.
_fields
[
'
company_name
'
]
=
StringField
(
'
Name of the company of the stock - only for employee savings
'
)
Loan
.
_fields
[
'
ownership
'
]
=
StringField
(
'
Relationship between the credentials owner (PSU) and the account
'
)
Transaction
.
_fields
[
'
gross_amount
'
]
=
DecimalField
(
'
Amount of the transaction without the commission
'
)
Transaction
.
_fields
[
'
original_commission
'
]
=
DecimalField
(
'
Original commission (in another currency)
'
)
Transaction
.
_fields
[
'
original_commission_currency
'
]
=
StringField
(
'
Currency of the original commission
'
)
Transaction
.
_fields
[
'
original_gross_amount
'
]
=
DecimalField
(
'
Original gross amount (in another currency)
'
)
Transaction
.
_fields
[
'
bdate
'
]
=
DateField
(
'
Bank date, when the transaction appear on website (usually extracted from column date)
'
)
...
...
@@ -45,6 +51,17 @@ class AccountOwnership(object):
AccountOwnerType
.
ASSOCIATION
=
u
'
ASSO
'
AccountType
.
TYPE_PER
=
20
Account
.
TYPE_PER
=
20
class
BeneficiaryType
(
object
):
RECIPIENT
=
'
recipient
'
IBAN
=
'
iban
'
PHONE_NUMBER
=
'
phone_number
'
CapBankTransfer
.
accepted_beneficiary_types
=
(
BeneficiaryType
.
RECIPIENT
,)
try
:
__all__
+=
[
'
AccountOwnership
'
,
'
RecipientInvalidOTP
'
,
'
TransferInvalidOTP
'
]
...
...
modules/amazonstorecard/compat/weboob_exceptions.py
View file @
9d1ab5af
...
...
@@ -38,6 +38,18 @@ class AppValidation(DecoupledValidation):
pass
class
AppValidationError
(
Exception
):
pass
class
AppValidationCancelled
(
AppValidationError
):
pass
class
AppValidationExpired
(
AppValidationError
):
pass
class
NeedInteractive
(
Exception
):
pass
...
...
@@ -55,3 +67,20 @@ class NeedInteractiveFor2FA(NeedInteractive):
"""
pass
class
WrongCaptchaResponse
(
Exception
):
"""
when website tell us captcha response is not good
"""
def
__init__
(
self
,
message
=
None
):
super
(
WrongCaptchaResponse
,
self
).
__init__
(
message
or
"
Captcha response is wrong
"
)
class
RecaptchaV3Question
(
CaptchaQuestion
):
type
=
'
g_recaptcha
'
website_key
=
None
website_url
=
None
action
=
None
def
__init__
(
self
,
website_key
,
website_url
,
action
=
None
):
super
(
RecaptchaV3Question
,
self
).
__init__
(
self
.
type
,
website_key
=
website_key
,
website_url
=
website_url
)
self
.
action
=
action
modules/ameli/compat/weboob_browser_filters_standard.py
View file @
9d1ab5af
...
...
@@ -47,3 +47,6 @@ def filter(self, txt):
return
self
.
map_dict
[
key
]
return
self
.
default_or_raise
(
ItemNotFound
(
'
Unable to handle %r on %r
'
%
(
txt
,
self
.
map_dict
)))
Title
=
Capitalize
modules/ameli/compat/weboob_exceptions.py
View file @
9d1ab5af
...
...
@@ -38,6 +38,18 @@ class AppValidation(DecoupledValidation):
pass
class
AppValidationError
(
Exception
):
pass
class
AppValidationCancelled
(
AppValidationError
):
pass
class
AppValidationExpired
(
AppValidationError
):
pass
class
NeedInteractive
(
Exception
):
pass
...
...
@@ -55,3 +67,20 @@ class NeedInteractiveFor2FA(NeedInteractive):
"""
pass
class
WrongCaptchaResponse
(
Exception
):
"""
when website tell us captcha response is not good
"""
def
__init__
(
self
,
message
=
None
):
super
(
WrongCaptchaResponse
,
self
).
__init__
(
message
or
"
Captcha response is wrong
"
)
class
RecaptchaV3Question
(
CaptchaQuestion
):
type
=
'
g_recaptcha
'
website_key
=
None
website_url
=
None
action
=
None
def
__init__
(
self
,
website_key
,
website_url
,
action
=
None
):
super
(
RecaptchaV3Question
,
self
).
__init__
(
self
.
type
,
website_key
=
website_key
,
website_url
=
website_url
)
self
.
action
=
action
modules/ameli/module.py
View file @
9d1ab5af
...
...
@@ -22,7 +22,7 @@
from
weboob.capabilities.base
import
find_object
from
weboob.tools.backend
import
Module
,
BackendConfig
from
weboob.capabilities.bill
import
CapDocument
,
Document
,
DocumentTypes
,
SubscriptionNotFound
,
DocumentNotFound
from
weboob.capabilities.bill
import
CapDocument
,
Document
,
DocumentTypes
,
SubscriptionNotFound
,
DocumentNotFound
,
Subscription
from
weboob.tools.value
import
ValueBackendPassword
from
.browser
import
AmeliBrowser
...
...
@@ -56,6 +56,9 @@ def get_subscription(self, _id):
return
find_object
(
self
.
iter_subscription
(),
id
=
_id
,
error
=
SubscriptionNotFound
)
def
iter_documents
(
self
,
subscription
):
if
not
isinstance
(
subscription
,
Subscription
):
subscription
=
self
.
get_subscription
(
subscription
)
return
self
.
browser
.
iter_documents
(
subscription
)
def
get_document
(
self
,
_id
):
...
...
modules/amelipro/compat/weboob_exceptions.py
View file @
9d1ab5af
...
...
@@ -38,6 +38,18 @@ class AppValidation(DecoupledValidation):
pass
class
AppValidationError
(
Exception
):
pass
class
AppValidationCancelled
(
AppValidationError
):
pass
class
AppValidationExpired
(
AppValidationError
):
pass
class
NeedInteractive
(
Exception
):
pass
...
...
@@ -55,3 +67,20 @@ class NeedInteractiveFor2FA(NeedInteractive):
"""
pass
class
WrongCaptchaResponse
(
Exception
):
"""
when website tell us captcha response is not good
"""
def
__init__
(
self
,
message
=
None
):
super
(
WrongCaptchaResponse
,
self
).
__init__
(
message
or
"
Captcha response is wrong
"
)
class
RecaptchaV3Question
(
CaptchaQuestion
):
type
=
'
g_recaptcha
'
website_key
=
None
website_url
=
None
action
=
None
def
__init__
(
self
,
website_key
,
website_url
,
action
=
None
):
super
(
RecaptchaV3Question
,
self
).
__init__
(
self
.
type
,
website_key
=
website_key
,
website_url
=
website_url
)
self
.
action
=
action
Prev
1
2
3
4
5
…
33
Next