Skip to content
base.py 19.4 KiB
Newer Older
Romain Bignon's avatar
Romain Bignon committed
# -*- coding: utf-8 -*-

# Copyright(C) 2010-2013 Christophe Benz, Romain Bignon, Julien Hebert
Romain Bignon's avatar
Romain Bignon committed
# This file is part of weboob.
Romain Bignon's avatar
Romain Bignon committed
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
Romain Bignon's avatar
Romain Bignon committed
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
Romain Bignon's avatar
Romain Bignon committed
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
Romain Bignon's avatar
Romain Bignon committed
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
Romain Bignon's avatar
Romain Bignon committed

from collections import OrderedDict
import warnings
import re
from decimal import Decimal
Romain Bignon's avatar
Romain Bignon committed
from copy import deepcopy, copy
from weboob.tools.compat import unicode, long, with_metaclass, StrConv
from weboob.tools.misc import to_unicode
__all__ = ['UserError', 'FieldNotFound', 'NotAvailable', 'FetchError',
Florent's avatar
Florent committed
           'NotLoaded', 'Capability', 'Field', 'IntField', 'DecimalField',
           'FloatField', 'StringField', 'BytesField', 'BoolField',
           'empty', 'BaseObject']
class EnumMeta(type):
    @classmethod
    def __prepare__(mcs, name, bases, **kwargs):
        # in python3.6, default namespace keeps declaration order
        # in python>=3 but <3.6, force ordered namespace
        # doesn't work in python2
        return OrderedDict()

    def __init__(cls, name, bases, attrs, *args, **kwargs):
        super(EnumMeta, cls).__init__(name, bases, attrs, *args, **kwargs)
        attrs = [(k, v) for k, v in attrs.items() if not callable(v) and not k.startswith('__')]
        if sys.version_info.major < 3:
            # can't have original declaration order, at least sort by value
            attrs.sort(key=lambda kv: kv[1])
        cls.__members__ = OrderedDict(attrs)

    def __setattr__(cls, name, value):
        super(EnumMeta, cls).__setattr__(name, value)
        if not callable(value) and not name.startswith('__'):
            cls.__members__[name] = value

    def __call__(cls, *args, **kwargs):
        raise ValueError("Enum type can't be instanciated")

    @property
    def _items(cls):
        return cls.__members__.items()

    @property
    def _keys(cls):
        return cls.__members__.keys()
    @property
    def _values(cls):
        return cls.__members__.values()
    @property
    def _types(cls):
        return set(map(type, cls._values))
    def __iter__(cls):
        return iter(cls.__members__.values())
    def __len__(cls):
        return len(cls.__members__)

    def __contains__(cls, value):
        return value in cls.__members__.values()

    def __getitem__(cls, k):
        return cls.__members__[k]


class Enum(with_metaclass(EnumMeta, object)):
    pass
def empty(value):
    """
    Checks if a value is empty (None, NotLoaded or NotAvailable).

    :rtype: :class:`bool`
    """
    return value is None or isinstance(value, EmptyType)
def find_object(mylist, error=None, **kwargs):
Florent's avatar
Florent committed
    """
    Very simple tools to return an object with the matching parameters in
    kwargs.
Florent's avatar
Florent committed
    """
    for a in mylist:
        for key, value in kwargs.items():
            if getattr(a, key) != value:
                break
Florent's avatar
Florent committed
            return a
Florent's avatar
Florent committed
    if error is not None:
        raise error()
    return None
def strict_find_object(mylist, error=None, **kwargs):
    """
    Tools to return an object with the matching parameters in kwargs.
    Parameters with empty value are skipped
    """
    kwargs = {k: v for k, v in kwargs.items() if not empty(v)}
    if kwargs:
        return find_object(mylist, error=error, **kwargs)
class UserError(Exception):
    """
    Exception containing an error message for user.
    """


class FieldNotFound(Exception):
    :type obj: :class:`BaseObject`
    :param field: field not found
    :type field: :class:`Field`
    """
    def __init__(self, obj, field):
        super(FieldNotFound, self).__init__(u'Field "%s" not found for object %s' % (field, obj))
class ConversionWarning(UserWarning):
    """
    A field's type was changed when setting it.
    Ideally, the module should use the right type before setting it.
    """
    pass
class AttributeCreationWarning(UserWarning):
    """
    A non-field attribute has been created with a name not
    prefixed with a _.
    """

    Parent class for NotAvailableType, NotLoadedType and FetchErrorType.
Christophe Benz's avatar
Christophe Benz committed
    def __str__(self):
    def __copy__(self):
        return self

    def __deepcopy__(self, memo):
        return self

    def __nonzero__(self):
        return False

    __bool__ = __nonzero__


class NotAvailableType(EmptyType):
    """
    NotAvailable is a constant to use on non available fields.
    """

    def __unicode__(self):
        return u'Not available'
NotAvailable = NotAvailableType()
class NotLoadedType(EmptyType):
    NotLoaded is a constant to use on not loaded fields.
    When you use :func:`weboob.tools.backend.Module.fillobj` on a object based on :class:`BaseObject`,
    it will request all fields with this value.
    """
        return 'NotLoaded'
    def __unicode__(self):
        return u'Not loaded'
NotLoaded = NotLoadedType()
class FetchErrorType(EmptyType):
    """
    FetchError is a constant to use when parsing a non-mandatory field raises an exception.
    """

    def __repr__(self):
        return 'FetchError'

    def __unicode__(self):
        return u'Not mandatory'


FetchError = FetchErrorType()


Florent's avatar
Florent committed
class Capability(object):
    """
    This is the base class for all capabilities.

    A capability may define abstract methods (which raise :class:`NotImplementedError`)
    with an explicit docstring to tell backends how to implement them.

    Also, it may define some *objects*, using :class:`BaseObject`.
    Field of a :class:`BaseObject` class.

    :param doc: docstring of the field
    :type doc: :class:`str`
    :param args: list of types accepted
    :param default: default value of this field. If not specified, :class:`NotLoaded` is used.
    """
    _creation_counter = 0
    def __init__(self, doc, *args, **kwargs):
        self.types = ()
        self.value = kwargs.get('default', NotLoaded)
        self.doc = doc
        self.mandatory = kwargs.get('mandatory', True)
            if isinstance(arg, type) or isinstance(arg, str):
                raise TypeError('Arguments must be types or strings of type name')

        self._creation_counter = Field._creation_counter
        Field._creation_counter += 1

    def convert(self, value):
        """
        Convert value to the wanted one.
        """
        return value

class IntField(Field):
    """
    A field which accepts only :class:`int` and :class:`long` types.
    """
        super(IntField, self).__init__(doc, int, long, **kwargs)
class BoolField(Field):
    """
    A field which accepts only :class:`bool` type.
    """

    def __init__(self, doc, **kwargs):
        super(BoolField, self).__init__(doc, bool, **kwargs)

    def convert(self, value):
        return bool(value)


class DecimalField(Field):
    """
    A field which accepts only :class:`decimal` type.
    """
    def __init__(self, doc, **kwargs):
        super(DecimalField, self).__init__(doc, Decimal, **kwargs)

    def convert(self, value):
        if isinstance(value, Decimal):
            return value
        return Decimal(value)

class FloatField(Field):
    """
    A field which accepts only :class:`float` type.
    """
        super(FloatField, self).__init__(doc, float, **kwargs)
class StringField(Field):
    """
    A field which accepts only :class:`unicode` strings.
    """
        super(StringField, self).__init__(doc, unicode, **kwargs)

    def convert(self, value):
        return to_unicode(value)

    A field which accepts only :class:`bytes` strings.
        super(BytesField, self).__init__(doc, bytes, **kwargs)

    def convert(self, value):
        if isinstance(value, unicode):
            value = value.encode('utf-8')
        return bytes(value)
class EnumField(Field):
    def __init__(self, doc, enum, **kwargs):
        if not issubclass(enum, Enum):
            raise TypeError('invalid enum type: %r' % enum)
        super(EnumField, self).__init__(doc, *enum._types, **kwargs)
        self.enum = enum

    def convert(self, value):
        if value not in self.enum._values:
            raise ValueError('value %r does not belong to enum %s' % (value, self.enum))
        return value
class _BaseObjectMeta(type):
    def __new__(cls, name, bases, attrs):
        fields = [(field_name, attrs.pop(field_name)) for field_name, obj in list(attrs.items()) if isinstance(obj, Field)]
        fields.sort(key=lambda x: x[1]._creation_counter)

        new_class = super(_BaseObjectMeta, cls).__new__(cls, name, bases, attrs)
        if new_class._fields is None:
            new_class._fields = OrderedDict()
        else:
            new_class._fields = deepcopy(new_class._fields)
        new_class._fields.update(fields)

        if new_class.__doc__ is None:
            new_class.__doc__ = ''
        for name, field in fields:
            doc = '(%s) %s' % (', '.join([':class:`%s`' % v.__name__ if isinstance(v, type) else v for v in field.types]), field.doc)
            if field.value is not NotLoaded:
                doc += ' (default: %s)' % field.value
            new_class.__doc__ += '\n:var %s: %s' % (name, doc)
        return new_class
class BaseObject(with_metaclass(_BaseObjectMeta, StrConv, object)):
    """
    This is the base class for a capability object.

    A capability interface may specify to return several kind of objects, to formalise
    retrieved information from websites.

    As python is a flexible language where variables are not typed, we use a system to
    force backends to set wanted values on all fields. To do that, we use the :class:`Field`
    class and all derived ones.

    For example::

        class Transfer(BaseObject):
            " Transfer from an account to a recipient.  "

            amount =    DecimalField('Amount to transfer')
            date =      Field('Date of transfer', basestring, date, datetime)
            origin =    Field('Origin of transfer', int, long, basestring)
            recipient = Field('Recipient', int, long, basestring)

    The docstring is mandatory.
    """

    url = StringField('url')
    def __init__(self, id=u'', url=NotLoaded, backend=None):
        self.id = to_unicode(id) if id is not None else u''
        self._fields = deepcopy(self._fields)
        self.__setattr__('url', url)
    @property
    def fullid(self):
        Full ID of the object, in form '**ID@backend**'.
        return '%s@%s' % (self.id, self.backend)
    def __iscomplete__(self):
        """
        Return True if the object is completed.

        It is useful when the object is a field of an other object which is
        going to be filled.

        The default behavior is to iter on fields (with iter_fields) and if
        a field is NotLoaded, return False.
        """
        for key, value in self.iter_fields():
            if value is NotLoaded:
                return False
        return True

Romain Bignon's avatar
Romain Bignon committed
    def copy(self):
        obj = copy(self)
        obj._fields = copy(self._fields)
        for k in obj._fields:
            obj._fields[k] = copy(obj._fields[k])
Romain Bignon's avatar
Romain Bignon committed
        return obj

    def __deepcopy__(self, memo):
        return self.copy()

    def set_empty_fields(self, value, excepts=()):
        """
        Set the same value on all empty fields.

        :param value: value to set on all empty fields
        :param excepts: if specified, do not change fields listed
        """
        for key, old_value in self.iter_fields():
            if empty(old_value) and key not in excepts:
                setattr(self, key, value)

        Iterate on the fields keys and values.

        Can be overloaded to iterate on other things.

        if hasattr(self, 'id') and self.id is not None:
Romain Bignon's avatar
Romain Bignon committed
            yield 'id', self.id
        for name, field in self._fields.items():
        if isinstance(obj, BaseObject):
            return self.backend == obj.backend and self.id == obj.id
        else:
            return False

    def __getattr__(self, name):
        if self._fields is not None and name in self._fields:
            return self._fields[name].value
Laurent Bachelier's avatar
Laurent Bachelier committed
            raise AttributeError("'%s' object has no attribute '%s'" % (
                self.__class__.__name__, name))

    def __setattr__(self, name, value):
        try:
        except KeyError:
            if name not in dir(self) and not name.startswith('_'):
                warnings.warn('Creating a non-field attribute %s. Please prefix it with _' % name,
                              AttributeCreationWarning, stacklevel=2)
            object.__setattr__(self, name, value)
        else:
            if not empty(value):
                try:
                    # Try to convert value to the wanted one.
                    nvalue = attr.convert(value)
                    # If the value was converted
                    if nvalue is not value:
                        warnings.warn('Value %s was converted from %s to %s' %
                                      (name, type(value), type(nvalue)),
                                      ConversionWarning, stacklevel=2)
                    value = nvalue
                except Exception:
                    # error during conversion, it will probably not
                    # match the wanted following types, so we'll
                    # raise ValueError.
                    pass
            from collections import deque
            actual_types = ()
                if isinstance(v, str):
                    # the following is a (almost) copy/paste from
                    # https://stackoverflow.com/questions/11775460/lexical-cast-from-string-to-type
                    q = deque([object])
                        t = q.popleft()
                            actual_types += (t,)
                        else:
                            try:
                                # keep looking!
                                q.extend(t.__subclasses__())
                            except TypeError:
                                # type.__subclasses__ needs an argument for
                                # whatever reason.
                                if t is type:
                                    continue
                                else:
                                    raise
                else:
                    actual_types += (v,)

            if not isinstance(value, actual_types) and not empty(value):
juke's avatar
juke committed
                raise ValueError(
                    'Value for "%s" needs to be of type %r, not %r' % (
                        name, actual_types, type(value)))
            attr.value = value
Romain Bignon's avatar
Romain Bignon committed

    def __delattr__(self, name):
        try:
            self._fields.pop(name)
        except KeyError:
            object.__delattr__(self, name)

    def to_dict(self):
        def iter_decorate(d):
            for key, value in d:
                if key == 'id' and self.backend is not None:
                    value = self.fullid
                yield key, value

        fields_iterator = self.iter_fields()
        return OrderedDict(iter_decorate(fields_iterator))
    def __getstate__(self):
        d = self.to_dict()
        d.update((k, v) for k, v in self.__dict__.items() if k != '_fields')
        return d

    @classmethod
    def from_dict(cls, values, backend=None):
        self = cls()

        for attr in values:
            setattr(self, attr, values[attr])

        return self

    def __setstate__(self, state):
        self._fields = deepcopy(self._fields)  # because yaml does not call __init__
        for k in state:
            setattr(self, k, state[k])

    if sys.version_info.major >= 3:
        def __dir__(self):
            return list(super(BaseObject, self).__dir__()) + list(self._fields.keys())


class Currency(object):
    CURRENCIES = OrderedDict([
        (u'EUR', (u'', u'EURO', u'EUROS')),
        (u'CHF', (u'CHF',)),
        (u'USD', (u'$',)),
        (u'GBP', (u'£',)),
        (u'LBP', (u'ل.ل',)),
        (u'AED', (u'AED',)),
        (u'XOF', (u'XOF',)),
        (u'RUB', (u'руб',)),
        (u'SGD', (u'SGD',)),
        (u'BRL', (u'R$',)),
        (u'MXN', (u'$',)),
        (u'JPY', (u'¥',)),
        (u'TRY', (u'', u'TRY')),
        (u'RON', (u'lei',)),
        (u'COP', (u'$',)),
        (u'NOK', (u'kr',)),
        (u'CNY', (u'¥',)),
        (u'RSD', (u'din',)),
        (u'ZAR', (u'rand',)),
        (u'MYR', (u'RM',)),
        (u'HUF', (u'Ft',)),
        (u'HKD', (u'HK$',)),
        (u'TWD', (u'NT$',)),
        (u'QAR', (u'QR',)),
        (u'MAD', (u'MAD',)),
        (u'ARS', (u'ARS',)),
        (u'AUD', (u'AUD',)),
        (u'CAD', (u'CAD',)),
        (u'NZD', (u'NZD',)),
        (u'BHD', (u'BHD',)),
        (u'SEK', (u'SEK',)),
        (u'DKK', (u'DKK',)),
        (u'LUF', (u'LUF',)),
        (u'KZT', (u'KZT',)),
        (u'PLN', (u'PLN',)),
        (u'ILS', (u'ILS',)),
        (u'THB', (u'THB',)),
        (u'INR', (u'', u'INR')),
        (u'PEN', (u'S/',)),
        (u'IDR', (u'Rp',)),
        (u'KWD', (u'KD',)),
        (u'KRW', (u'',)),
        (u'CZK', (u'',)),
        (u'EGP', (u'',)),
        (u'ISK', (u'Íkr', u'kr')),
        (u'XPF', (u'XPF',)),
    EXTRACTOR = re.compile(r'[()\d\s,\.\-]', re.UNICODE)

    @classmethod
    def get_currency(klass, text):
        u"""
        >>> Currency.get_currency(u'42')
        >>> Currency.get_currency(u'42 €')
        u'EUR'
        >>> Currency.get_currency(u'$42')
        u'USD'
        >>> Currency.get_currency(u'42.000,00€')
        u'EUR'
        >>> Currency.get_currency(u'$42 USD')
        u'USD'
        >>> Currency.get_currency(u'%42 USD')
        u'USD'
        >>> Currency.get_currency(u'US1D')
        """
        curtexts = klass.EXTRACTOR.sub(' ', text.upper()).split()

        for currency, symbols in klass.CURRENCIES.items():
            for curtext in curtexts:
                if curtext == currency:
Vincent Paredes's avatar
Vincent Paredes committed
                for symbol in symbols:
Vincent Paredes's avatar
Vincent Paredes committed
                        return currency

    @classmethod
    def currency2txt(klass, currency):
        _currency = klass.CURRENCIES.get(currency, (u'',))