Skip to content
boobot.py 14.9 KiB
Newer Older
Romain Bignon's avatar
Romain Bignon committed
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright(C) 2012  Romain Bignon
#
# This file is part of woob.
Romain Bignon's avatar
Romain Bignon committed
#
# woob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
Romain Bignon's avatar
Romain Bignon committed
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# woob is distributed in the hope that it will be useful,
Romain Bignon's avatar
Romain Bignon committed
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
Romain Bignon's avatar
Romain Bignon committed
#
# You should have received a copy of the GNU Lesser General Public License
# along with woob. If not, see <http://www.gnu.org/licenses/>.
Romain Bignon's avatar
Romain Bignon committed

from __future__ import print_function
Romain Bignon's avatar
Romain Bignon committed

Laurent Bachelier's avatar
Laurent Bachelier committed
import itertools
import logging
Laurent Bachelier's avatar
Laurent Bachelier committed
import re
Romain Bignon's avatar
Romain Bignon committed
import sys
import urllib
Laurent Bachelier's avatar
Laurent Bachelier committed
import urlparse
from datetime import datetime, timedelta
from math import log
from random import choice, randint
from threading import Event, Thread

from dateutil.parser import parse as parse_date
from irc.bot import SingleServerIRCBot
Romain Bignon's avatar
Romain Bignon committed

Romain Bignon's avatar
Romain Bignon committed
from weboob.browser import Browser
from weboob.browser.exceptions import HTTPNotFound
from weboob.browser.pages import HTMLPage
Laurent Bachelier's avatar
Laurent Bachelier committed
from weboob.core import Weboob
from weboob.exceptions import BrowserHTTPError, BrowserUnavailable
Romain Bignon's avatar
Romain Bignon committed
from weboob.tools.application.base import ApplicationStorage
Laurent Bachelier's avatar
Laurent Bachelier committed
from weboob.tools.misc import get_backtrace, to_unicode
from weboob.tools.storage import StandardStorage
Romain Bignon's avatar
Romain Bignon committed

IRC_CHANNELS = os.getenv('BOOBOT_CHANNELS', '#weboob').split(',')
IRC_NICKNAME = os.getenv('BOOBOT_NICKNAME', 'boobot')
Romain Bignon's avatar
Romain Bignon committed
IRC_SERVER = os.getenv('BOOBOT_SERVER', 'dickson.freenode.net')
IRC_IGNORE = [re.compile(i) for i in os.getenv('BOOBOT_IGNORE', '!~?irker@').split(',')]
STORAGE_FILE = os.getenv('BOOBOT_STORAGE', 'boobot.storage')
Romain Bignon's avatar
Romain Bignon committed

def fixurl(url):
    url = to_unicode(url)

    # remove javascript crap
    url = url.replace('/#!/', '/')

    # parse it
    parsed = urlparse.urlsplit(url)

    # divide the netloc further
    userpass, at, hostport = parsed.netloc.rpartition('@')
    user, colon1, pass_ = userpass.partition(':')
    host, colon2, port = hostport.partition(':')

    # encode each component
    scheme = parsed.scheme.encode('utf8')
    user = urllib.quote(user.encode('utf8'))
    colon1 = colon1.encode('utf8')
    pass_ = urllib.quote(pass_.encode('utf8'))
    at = at.encode('utf8')
    host = host.encode('idna')
    colon2 = colon2.encode('utf8')
    port = port.encode('utf8')
    path = '/'.join(pce.encode('utf8') for pce in parsed.path.split('/'))
    # while valid, it is most likely an error
    path = path.replace('//', '/')
    query = parsed.query.encode('utf8')
    fragment = parsed.fragment.encode('utf8')

    # put it back together
    netloc = ''.join((user, colon1, pass_, at, host, colon2, port))
    return urlparse.urlunsplit((scheme, netloc, path, query, fragment))


Romain Bignon's avatar
Romain Bignon committed
class BoobotBrowser(Browser):
    TIMEOUT = 3.0
    def urlinfo(self, url, maxback=2):
        if urlparse.urlsplit(url).netloc == 'mobile.twitter.com':
            url = url.replace('mobile.twitter.com', 'twitter.com', 1)
Romain Bignon's avatar
Romain Bignon committed
            r = self.open(url, method='HEAD')
Romain Bignon's avatar
Romain Bignon committed
        except HTTPNotFound as e:
            if maxback and not url[-1].isalnum():
                return self.urlinfo(url[:-1], maxback-1)
Romain Bignon's avatar
Romain Bignon committed
            raise e
        except BrowserHTTPError as e:
            if e.response.status_code in (501, 405):
                r = self.open(url)
                body = True
            else:
                raise e
Romain Bignon's avatar
Romain Bignon committed
        content_type = r.headers.get('Content-Type')
Romain Bignon's avatar
Romain Bignon committed
            size = int(r.headers.get('Content-Length'))
            hsize = self.human_size(size)
        except TypeError:
            size = None
            hsize = None
Romain Bignon's avatar
Romain Bignon committed
        is_html = ('html' in content_type) if content_type else re.match(r'\.x?html?$', url)
        title = None
        if is_html:
Romain Bignon's avatar
Romain Bignon committed
                r = self.open(url)
            # update size has we might not have it from headers
Romain Bignon's avatar
Romain Bignon committed
            size = len(r.content)
            hsize = self.human_size(size)
Romain Bignon's avatar
Romain Bignon committed

            page = HTMLPage(self, r)

            for title in page.doc.xpath('//head/title'):
                title = to_unicode(title.text_content()).strip()
                title = ' '.join(title.split())
            if urlparse.urlsplit(url).netloc.endswith('twitter.com'):
                for title in page.doc.getroot().cssselect('.permalink-tweet .tweet-text'):
                    title = to_unicode(title.text_content()).strip()
Romain Bignon's avatar
Romain Bignon committed
                    title = ' '.join(title.splitlines())
        return content_type, hsize, title

    def human_size(self, size):
        if size:
            units = ('B', 'KiB', 'MiB', 'GiB',
                     'TiB', 'PiB', 'EiB', 'ZiB', 'YiB')
            exponent = int(log(size, 1024))
            return "%.1f %s" % (float(size) / pow(1024, exponent), units[exponent])
        return '0 B'


Romain Bignon's avatar
Romain Bignon committed
class Task(object):
    def __init__(self, datetime, message, channel=None):
        self.datetime = datetime
        self.message = message
        self.channel = channel


Romain Bignon's avatar
Romain Bignon committed
class MyThread(Thread):
    daemon = True

Romain Bignon's avatar
Romain Bignon committed
    def __init__(self, bot):
        Thread.__init__(self)
        self.weboob = Weboob(storage=StandardStorage(STORAGE_FILE))
        self.weboob.load_backends()
        self.bot = bot
Romain Bignon's avatar
Romain Bignon committed
        self.bot.set_weboob(self.weboob)
Romain Bignon's avatar
Romain Bignon committed

    def run(self):
        for ev in self.bot.joined.values():
Romain Bignon's avatar
Romain Bignon committed

Romain Bignon's avatar
Romain Bignon committed
        self.weboob.repeat(5, self.check_tasks)
        self.weboob.repeat(300, self.check_board)
Romain Bignon's avatar
Romain Bignon committed
        self.weboob.repeat(600, self.check_dlfp)
        self.weboob.repeat(600, self.check_twitter)
Romain Bignon's avatar
Romain Bignon committed

        self.weboob.loop()

    def find_keywords(self, text):
        for word in [
Laurent Bachelier's avatar
Laurent Bachelier committed
                     'weboob', 'videoob', 'havesex', 'havedate', 'monboob', 'boobmsg',
                     'flatboob', 'boobill', 'pastoob', 'radioob', 'translaboob', 'traveloob', 'handjoob',
                     'boobathon', 'boobank', 'boobtracker', 'comparoob', 'wetboobs',
                     'webcontentedit', 'weboorrents', 'assnet',
                     'budget insight', 'budget-insight', 'budgetinsight', 'budgea']:
            if word in text.lower():
                return word
        return None

    def check_twitter(self):
        nb_tweets = 10

        for backend in self.weboob.iter_backends(module='twitter'):
            for thread in list(itertools.islice(backend.iter_resources(None, ['search', 'weboob']),
                                                0,
                                                nb_tweets)):

                if not backend.storage.get('lastpurge'):
                    backend.storage.set('lastpurge', datetime.now() - timedelta(days=60))
                    backend.storage.save()

                if thread.id not in backend.storage.get('seen', default={}) and\
                   thread.date > backend.storage.get('lastpurge'):
                    _item = thread.id.split('#')
                    url = 'https://twitter.com/%s/status/%s' % (_item[0], _item[1])
                    for msg in self.bot.on_url(url):
                        self.bot.send_message('%s: %s' % (_item[0], url))
                        self.bot.send_message(msg)

                    backend.set_message_read(backend.fill_thread(thread, ['root']).root)
Romain Bignon's avatar
Romain Bignon committed
    def check_dlfp(self):
Florent's avatar
Florent committed
        for msg in self.weboob.do('iter_unread_messages', backends=['dlfp']):
            word = self.find_keywords(msg.content)
            if word is not None:
                url = msg.signature[msg.signature.find('https://linuxfr'):]
                self.bot.send_message('[DLFP] %s talks about %s: %s' % (
                    msg.sender, word, url))
Florent's avatar
Florent committed
            self.weboob[msg.backend].set_message_read(msg)

    def check_board(self):
        def iter_messages(backend):
            return backend.browser.iter_new_board_messages()
Florent's avatar
Florent committed
        for msg in self.weboob.do(iter_messages, backends=['dlfp']):
            word = self.find_keywords(msg.message)
            if word is not None and msg.login != 'moules':
                message = msg.message.replace(word, '\002%s\002' % word)
                self.bot.send_message('[DLFP] <%s> %s' % (msg.login, message))
Romain Bignon's avatar
Romain Bignon committed

Romain Bignon's avatar
Romain Bignon committed
    def check_tasks(self):
        for task in list(self.bot.tasks_queue):
            if task.datetime < datetime.now():
                self.bot.send_message(task.message, task.channel)
                self.bot.tasks_queue.remove(task)

Romain Bignon's avatar
Romain Bignon committed
    def stop(self):
        self.weboob.want_stop()
        self.weboob.deinit()
Romain Bignon's avatar
Romain Bignon committed

class Boobot(SingleServerIRCBot):
    def __init__(self, channels, nickname, server, port=6667):
        SingleServerIRCBot.__init__(self, [(server, port)], nickname, nickname)
        # self.connection.add_global_handler('pubmsg', self.on_pubmsg)
        self.connection.add_global_handler('join', self.on_join)
        self.connection.add_global_handler('welcome', self.on_welcome)
        self.connection.buffer_class.errors = 'replace'
        self.mainchannel = channels[0]
        self.joined = dict()
        for channel in channels:
            self.joined[channel] = Event()
Romain Bignon's avatar
Romain Bignon committed
        self.weboob = None
Romain Bignon's avatar
Romain Bignon committed
        self.storage = None

Romain Bignon's avatar
Romain Bignon committed
        self.tasks_queue = []

Romain Bignon's avatar
Romain Bignon committed
    def set_weboob(self, weboob):
        self.weboob = weboob
        self.storage = ApplicationStorage('boobot', weboob.storage)
        self.storage.load({})
Romain Bignon's avatar
Romain Bignon committed

    def on_welcome(self, c, event):
        for channel in self.joined.keys():
            c.join(channel)
    def on_join(self, c, event):
        # irclib 5.0 compatibility
        if callable(event.target):
            channel = event.target()
        else:
            channel = event.target
        self.joined[channel].set()
Romain Bignon's avatar
Romain Bignon committed

    def send_message(self, msg, channel=None):
        for m in msg.splitlines():
Romain Bignon's avatar
Romain Bignon committed
            msg = to_unicode(m).encode('utf-8')[:450].decode('utf-8')
            self.connection.privmsg(to_unicode(channel or self.mainchannel), msg)
Romain Bignon's avatar
Romain Bignon committed

    def on_pubmsg(self, c, event):
        # irclib 5.0 compatibility
        if callable(event.arguments):
            text = ' '.join(event.arguments())
            channel = event.target()
            nick = event.source()
        else:
            text = ' '.join(event.arguments)
            channel = event.target
            nick = event.source
        for ignore in IRC_IGNORE:
            if ignore.search(nick):
                return
Romain Bignon's avatar
Romain Bignon committed
        for m in re.findall('([\w\d_\-]+@\w+)', text):
            for msg in self.on_boobid(m):
                self.send_message(msg, channel)
        for m in re.findall(u'(https?://[^\s\xa0+]+)', text):
            for msg in self.on_url(m):
                self.send_message(msg, channel)
Romain Bignon's avatar
Romain Bignon committed
        m = re.match('^%(?P<cmd>\w+)(?P<args>.*)$', text)
        if m and hasattr(self, 'cmd_%s' % m.groupdict()['cmd']):
Laurent Bachelier's avatar
Laurent Bachelier committed
            getattr(self, 'cmd_%s' % m.groupdict()['cmd'])(nick, channel, m.groupdict()['args'].strip())
Romain Bignon's avatar
Romain Bignon committed
    def cmd_at(self, nick, channel, text):
        try:
            datetime, message = text.split(' ', 1)
        except ValueError:
            self.send_message('Syntax: %at [YYYY-MM-DDT]HH:MM[:SS] message', channel)
            return

        try:
            datetime = parse_date(datetime)
        except ValueError:
            self.send_message('Unable to read date %r' % datetime)
            return

        self.tasks_queue.append(Task(datetime, message, channel))

Romain Bignon's avatar
Romain Bignon committed
    def cmd_addquote(self, nick, channel, text):
        quotes = self.storage.get(channel, 'quotes', default=[])
        quotes.append({'author': nick, 'timestamp': datetime.now(), 'text': text})
        self.storage.set(channel, 'quotes', quotes)
        self.storage.save()
Romain Bignon's avatar
Romain Bignon committed
        self.send_message('Quote #%s added' % (len(quotes) - 1), channel)
Romain Bignon's avatar
Romain Bignon committed

    def cmd_delquote(self, nick, channel, text):
        quotes = self.storage.get(channel, 'quotes', default=[])

        try:
            n = int(text)
        except ValueError:
            self.send_message("Quote #%s not found gros" % text, channel)
            return

        quotes.pop(n)
        self.storage.set(channel, 'quotes', quotes)
        self.storage.save()
        self.send_message('Quote #%s removed' % n, channel)
Romain Bignon's avatar
Romain Bignon committed

    def cmd_searchquote(self, nick, channel, text):
        try:
Laurent Bachelier's avatar
Laurent Bachelier committed
            pattern = re.compile(to_unicode(text), re.IGNORECASE | re.UNICODE)
Romain Bignon's avatar
Romain Bignon committed
        except Exception as e:
            self.send_message(str(e), channel)
            return

        quotes = []
        for quote in self.storage.get(channel, 'quotes', default=[]):
            if pattern.search(to_unicode(quote['text'])):
Romain Bignon's avatar
Romain Bignon committed
                quotes.append(quote)

        try:
            quote = choice(quotes)
        except IndexError:
            self.send_message('No match', channel)
        else:
            self.send_message('%s' % quote['text'], channel)

    def cmd_getquote(self, nick, channel, text):
        quotes = self.storage.get(channel, 'quotes', default=[])
        if len(quotes) == 0:
            return

        try:
            n = int(text)
        except ValueError:
            n = randint(0, len(quotes)-1)

        try:
            quote = quotes[n]
        except IndexError:
            self.send_message('Unable to find quote #%s' % n, channel)
        else:
            self.send_message('[%s] %s' % (n, quote['text']), channel)

    def on_boobid(self, boobid):
        _id, backend_name = boobid.split('@', 1)
        if backend_name in self.weboob.backend_instances:
            backend = self.weboob.backend_instances[backend_name]
            for cap in backend.iter_caps():
Romain Bignon's avatar
Romain Bignon committed
                func = 'obj_info_%s' % cap.__name__[3:].lower()
                if hasattr(self, func):
                    try:
                        for msg in getattr(self, func)(backend, _id):
                            yield msg
                    except Exception as e:
                        print(get_backtrace())
Romain Bignon's avatar
Romain Bignon committed
                        yield u'Oops: [%s] %s' % (type(e).__name__, e)
                    break

    def on_url(self, url):
        url = fixurl(url)
        try:
            content_type, hsize, title = BoobotBrowser().urlinfo(url)
            if title:
                yield u'URL: %s' % title
            elif hsize:
                yield u'URL (file): %s, %s' % (content_type, hsize)
                yield u'URL (file): %s' % content_type
        except BrowserUnavailable as e:
            yield u'URL (error): %s' % e
        except Exception as e:
            print(get_backtrace())
Romain Bignon's avatar
Romain Bignon committed
            yield u'Oops: [%s] %s' % (type(e).__name__, e)
Romain Bignon's avatar
Romain Bignon committed

    def obj_info_video(self, backend, id):
        v = backend.get_video(id)
        if v:
            yield u'Video: %s (%s)' % (v.title, v.duration)
Romain Bignon's avatar
Romain Bignon committed

    def obj_info_housing(self, backend, id):
        h = backend.get_housing(id)
        if h:
            yield u'Housing: %s (%sm² / %s%s)' % (h.title, h.area, h.cost, h.currency)
Romain Bignon's avatar
Romain Bignon committed

Romain Bignon's avatar
Romain Bignon committed
def main():
    logging.basicConfig(level=logging.DEBUG)
    bot = Boobot(IRC_CHANNELS, IRC_NICKNAME, IRC_SERVER)
Romain Bignon's avatar
Romain Bignon committed

    thread = MyThread(bot)
    thread.start()

    try:
Laurent Bachelier's avatar
Laurent Bachelier committed
        bot.start()
Romain Bignon's avatar
Romain Bignon committed
    except KeyboardInterrupt:
        print("Stopped.")
Romain Bignon's avatar
Romain Bignon committed

    thread.stop()

Romain Bignon's avatar
Romain Bignon committed
if __name__ == "__main__":
    sys.exit(main())