pax_global_header 0000666 0000000 0000000 00000000064 11666415431 0014521 g ustar 00root root 0000000 0000000 52 comment=037c35f40908f1833bda001fd8cdb2c9338a53d4
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/ 0000775 0000000 0000000 00000000000 11666415431 0023334 5 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/ 0000775 0000000 0000000 00000000000 11666415431 0024611 5 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/ 0000775 0000000 0000000 00000000000 11666415431 0027277 5 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/__init__.py 0000664 0000000 0000000 00000000001 11666415431 0031377 0 ustar 00root root 0000000 0000000
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/boobank/ 0000775 0000000 0000000 00000000000 11666415431 0030712 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000001426 11666415431 0032747 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/boobank # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Christophe Benz
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from .boobank import Boobank
__all__ = ['Boobank']
boobank.py 0000664 0000000 0000000 00000021111 11666415431 0032614 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/boobank # -*- coding: utf-8 -*-
# Copyright(C) 2009-2011 Romain Bignon, Christophe Benz
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
import sys
from weboob.capabilities.bank import ICapBank
from weboob.tools.application.repl import ReplApplication
from weboob.tools.application.formatters.iformatter import IFormatter
__all__ = ['Boobank']
class QifFormatter(IFormatter):
MANDATORY_FIELDS = ('id', 'date', 'label', 'amount', 'category')
count = 0
def flush(self):
self.count = 0
def format_dict(self, item):
result = u''
if self.count == 0:
result += u'!type:Bank\n'
result += u'D%s\n' % item['date'].strftime('%d/%m/%y')
result += u'T%s\n' % item['amount']
if item['category']:
result += u'N%s\n' % item['category']
result += u'M%s\n' % item['label']
result += u'^\n'
self.count += 1
return result
class TransferFormatter(IFormatter):
MANDATORY_FIELDS = ('id', 'date', 'origin', 'recipient', 'amount')
def flush(self):
pass
def format_dict(self, item):
result = u'------- Transfer %s -------\n' % item['id']
result += u'Date: %s\n' % item['date']
result += u'Origin: %s\n' % item['origin']
result += u'Recipient: %s\n' % item['recipient']
result += u'Amount: %.2f\n' % item['amount']
return result
class RecipientListFormatter(IFormatter):
MANDATORY_FIELDS = ('id', 'label')
count = 0
def flush(self):
self.count = 0
def format_dict(self, item):
self.count += 1
if self.interactive:
backend = item['id'].split('@', 1)[1]
id = '#%d (%s)' % (self.count, backend)
else:
id = item['id']
return u'%s %-30s %s %s' % (self.BOLD, id, self.NC, item['label'])
class AccountListFormatter(IFormatter):
MANDATORY_FIELDS = ('id', 'label', 'balance', 'coming')
count = 0
tot_balance = 0.0
tot_coming = 0.0
def flush(self):
if self.count < 1:
return
result = u'------------------------------------------%s+----------+----------\n' % (('-' * 15) if not self.interactive else '')
result +=u'%s Total %8s %8s' % ((' ' * 15) if not self.interactive else '',
'%.2f' % self.tot_balance, '%.2f' % self.tot_coming)
self.after_format(result)
self.tot_balance = 0.0
self.tot_coming = 0.0
self.count = 0
def format_dict(self, item):
self.count += 1
if self.interactive:
backend = item['id'].split('@', 1)[1]
id = '#%d (%s)' % (self.count, backend)
else:
id = item['id']
result = u''
if self.count == 1:
result += ' %s Account Balance Coming \n' % ((' ' * 15) if not self.interactive else '')
result += '------------------------------------------%s+----------+----------\n' % (('-' * 15) if not self.interactive else '')
result += (u' %s%-' + (u'15' if self.interactive else '30') + u's%s %-25s %8s %8s') % \
(self.BOLD, id, self.NC,
item['label'], '%.2f' % item['balance'], '%.2f' % (item['coming'] or 0.0))
self.tot_balance += item['balance']
if item['coming']:
self.tot_coming += item['coming']
return result
class Boobank(ReplApplication):
APPNAME = 'boobank'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2010-2011 Romain Bignon, Christophe Benz'
CAPS = ICapBank
DESCRIPTION = "Console application allowing to list your bank accounts and get their balance, " \
"display accounts history and coming bank operations, and transfer money from an account to " \
"another (if available)."
EXTRA_FORMATTERS = {'account_list': AccountListFormatter,
'recipient_list': RecipientListFormatter,
'transfer': TransferFormatter,
'qif': QifFormatter,
}
DEFAULT_FORMATTER = 'table'
COMMANDS_FORMATTERS = {'ls': 'account_list',
'list': 'account_list',
'transfer': 'transfer',
}
def _complete_account(self, exclude=None):
if exclude:
exclude = '%s@%s' % self.parse_id(exclude)
return [s for s in self._complete_object() if s != exclude]
def do_list(self, line):
"""
list
List accounts.
"""
return self.do_ls(line)
def complete_history(self, text, line, *ignored):
args = line.split(' ')
if len(args) == 2:
return self._complete_account()
def do_history(self, id):
"""
history ID
Display old operations.
"""
id, backend_name = self.parse_id(id)
if not id:
print >>sys.stderr, 'Error: please give an account ID (hint: use list command)'
return 2
names = (backend_name,) if backend_name is not None else None
def do(backend):
account = backend.get_account(id)
return backend.iter_history(account)
for backend, operation in self.do(do, backends=names):
self.format(operation)
self.flush()
def complete_coming(self, text, line, *ignored):
args = line.split(' ')
if len(args) == 2:
return self._complete_account()
def do_coming(self, id):
"""
coming ID
Display all future operations.
"""
id, backend_name = self.parse_id(id)
names = (backend_name,) if backend_name is not None else None
def do(backend):
account = backend.get_account(id)
return backend.iter_operations(account)
for backend, operation in self.do(do, backends=names):
self.format(operation)
self.flush()
def complete_transfer(self, text, line, *ignored):
args = line.split(' ')
if len(args) == 2:
return self._complete_account()
if len(args) == 3:
return self._complete_account(args[1])
def do_transfer(self, line):
"""
transfer ACCOUNT [RECIPIENT AMOUNT [REASON]]
Make a transfer beetwen two account
- ACCOUNT the source account
- RECIPIENT the recipient
- AMOUNT amount to transfer
- REASON reason of transfer
If you give only the ACCOUNT parameter, it lists all the
available recipients for this account.
"""
id_from, id_to, amount, reason = self.parse_command_args(line, 4, 1)
id_from, backend_name_from = self.parse_id(id_from)
if not id_to:
self.objects = []
self.set_formatter('recipient_list')
self.set_formatter_header(u'Available recipients')
names = (backend_name_from,) if backend_name_from is not None else None
for backend, recipient in self.do('iter_transfer_recipients', id_from, backends=names):
self.format(recipient)
self.add_object(recipient)
self.flush()
return 0
id_to, backend_name_to = self.parse_id(id_to)
try:
amount = float(amount)
except (TypeError,ValueError):
print >>sys.stderr, 'Error: please give a decimal amount to transfer'
return 2
if backend_name_from != backend_name_to:
print >>sys.stderr, "Transfer between different backends is not implemented"
return 4
else:
backend_name = backend_name_from
names = (backend_name,) if backend_name is not None else None
for backend, transfer in self.do('transfer', id_from, id_to, amount, reason, backends=names):
self.format(transfer)
self.flush()
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/boobathon/ 0000775 0000000 0000000 00000000000 11666415431 0031252 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000001426 11666415431 0033307 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/boobathon # -*- coding: utf-8 -*-
# Copyright(C) 2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from .boobathon import Boobathon
__all__ = ['Boobathon']
boobathon.py 0000664 0000000 0000000 00000065335 11666415431 0033534 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/boobathon # -*- coding: utf-8 -*-
# Copyright(C) 2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from datetime import datetime, timedelta
import re
import sys
from urlparse import urlsplit
from random import choice
from weboob.capabilities.content import ICapContent
from weboob.tools.application.repl import ReplApplication
from weboob.tools.ordereddict import OrderedDict
__all__ = ['Boobathon']
class Task(object):
STATUS_NONE = 0
STATUS_PROGRESS = 1
STATUS_DONE = 2
def __init__(self, backend, capability):
self.backend = backend
self.capability = capability
self.status = self.STATUS_NONE
self.date = None
self.branch = u''
def __repr__(self):
return '' % (self.backend, self.capability)
class Member(object):
def __init__(self, id, name):
self.name = name
self.id = id
self.tasks = []
self.availabilities = u''
self.repository = None
self.hardware = u''
self.is_me = False
def shortname(self):
name = self.name
if len(name) > 20:
name = '%s..' % name[:18]
return name
class Event(object):
def __init__(self, name, backend):
self.my_id = backend.browser.get_userid()
self.name = 'wiki/weboob/%s' % name
self.description = None
self.date = None
self.begin = None
self.end = None
self.location = None
self.winner = None
self.backend = backend
self.members = OrderedDict()
self.load()
def get_me(self):
return self.members.get(self.backend.browser.get_userid(), None)
def currently_in_event(self):
if not self.date or not self.begin or not self.end:
return False
return self.begin < datetime.now() < self.end
def is_closed(self):
return self.end < datetime.now()
def format_duration(self):
if not self.begin or not self.end:
return None
delta = self.end - self.begin
return '%02d:%02d' % (delta.seconds/3600, delta.seconds%3600)
def check_time_coherence(self):
"""
Check if the end's day is before the begin's one, in
case it stops at the next day (eg. 15h->1h).
If it occures, add a day.
"""
if self.begin > self.end:
self.end = self.end + timedelta(1)
def load(self):
self.content = self.backend.get_content(self.name)
self.members.clear()
member = None
for line in self.content.content.split('\n'):
line = line.strip()
if line.startswith('h1. '):
self.title = line[4:]
elif line.startswith('h3=. '):
m = re.match('h3=. Event finished. Winner is "(.*)":/users/(\d+)\!', line)
if not m:
print >>sys.stderr, 'Unable to parse h3=: %s' % line
continue
self.winner = Member(int(m.group(2)), m.group(1))
elif line.startswith('h2. '):
continue
elif line.startswith('h3. '):
m = re.match('h3. "(.*)":/users/(\d+)', line)
if not m:
print >>sys.stderr, 'Unable to parse user "%s"' % line
continue
member = Member(int(m.group(2)), m.group(1))
if member.id == self.my_id:
member.is_me = True
if self.winner is not None and member.id == self.winner.id:
self.winner = member
self.members[member.id] = member
elif self.description is None and len(line) > 0 and line != '{{TOC}}':
self.description = line
elif line.startswith('* '):
m = re.match('\* \*(\w+)\*: (.*)', line)
if not m:
continue
key, value = m.group(1), m.group(2)
if member is None:
if key == 'Date':
self.date = self.parse_date(value)
elif key == 'Start' or key == 'Begin':
self.begin = self.parse_time(value)
elif key == 'End':
self.end = self.parse_time(value)
self.check_time_coherence()
elif key == 'Location':
self.location = value
else:
if key == 'Repository':
m = re.match('"(.*.git)":.*', value)
if m:
member.repository = m.group(1)
else:
member.repository = value
elif key == 'Hardware':
member.hardware = value
elif key == 'Availabilities':
member.availabilities = value
elif line.startswith('[['):
m = re.match('\[\[(\w+)\]\]\|\[\[(\w+)\]\]\|(.*)\|', line)
if not m:
print >>sys.stderr, 'Unable to parse task: "%s"' % line
continue
task = Task(m.group(1), m.group(2))
member.tasks.append(task)
if m.group(3) == '!/img/weboob/_progress.png!':
task.status = task.STATUS_PROGRESS
continue
mm = re.match('!/img/weboob/_done.png! (\d+):(\d+) (\w+)', m.group(3))
if mm and self.date:
task.status = task.STATUS_DONE
task.date = datetime(self.date.year,
self.date.month,
self.date.day,
int(mm.group(1)),
int(mm.group(2)))
task.branch = mm.group(3)
def parse_date(self, value):
try:
return datetime.strptime(value, '%Y-%m-%d')
except ValueError:
return None
def parse_time(self, value):
m = re.match('(\d+):(\d+)', value)
if not m:
return
try:
return self.date.replace(hour=int(m.group(1)),
minute=int(m.group(2)))
except ValueError:
return None
def save(self, message):
if self.winner:
finished = u'\nh3=. Event finished. Winner is "%s":/users/%d!\n' % (self.winner.name,
self.winner.id)
else:
finished = u''
s = u"""h1. %s
{{TOC}}
%s
h2. Event
%s
* *Date*: %s
* *Begin*: %s
* *End*: %s
* *Duration*: %s
* *Location*: %s
h2. Attendees
""" % (self.title,
finished,
self.description,
self.date.strftime('%Y-%m-%d') if self.date else '_Unknown_',
self.begin.strftime('%H:%M') if self.begin else '_Unknown_',
self.end.strftime('%H:%M') if self.end else '_Unknown_',
self.format_duration() or '_Unknown_',
self.location or '_Unknown_')
for member in self.members.itervalues():
if self.date:
availabilities = ''
else:
availabilities = '* *Availabilities*: %s' % member.availabilities
if member.repository is None:
repository = '_Unknown_'
elif member.repository.endswith('.git'):
repository = '"%s":git://git.symlink.me/pub/%s ("http":http://git.symlink.me/?p=%s;a=summary)'
repository = repository.replace('%s', member.repository)
else:
repository = member.repository
s += u"""h3. "%s":/users/%d
* *Repository*: %s
* *Hardware*: %s
%s
|_.Backend|_.Capabilities|_.Status|""" % (member.name,
member.id,
repository,
member.hardware,
availabilities)
for task in member.tasks:
if task.status == task.STATUS_DONE:
status = '!/img/weboob/_done.png! %02d:%02d %s' % (task.date.hour,
task.date.minute,
task.branch)
elif task.status == task.STATUS_PROGRESS:
status = '!/img/weboob/_progress.png!'
else:
status = ' '
s += u"""
|=.!/img/weboob/%s.png!:/projects/weboob/wiki/%s
[[%s]]|[[%s]]|%s|""" % (task.backend.lower(), task.backend, task.backend, task.capability, status)
s += '\n\n'
self.content.content = s
self.backend.push_content(self.content, message)
class Boobathon(ReplApplication):
APPNAME = 'boobathon'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2011 Romain Bignon'
DESCRIPTION = 'Console application to participate to a Boobathon.'
CAPS = ICapContent
SYNOPSIS = 'Usage: %prog [-dqv] [-b backends] [-cnfs] boobathon\n'
SYNOPSIS += ' %prog [--help] [--version]'
radios = []
def __init__(self, *args, **kwargs):
ReplApplication.__init__(self, *args, **kwargs)
def main(self, argv):
if len(argv) < 2:
print >>sys.stderr, 'Please give the name of the boobathon'
return 1
self.event = Event(argv[1], choice(self.weboob.backend_instances.values()))
if self.event.description is None:
if not self.ask("This event doesn't seem to exist. Do you want to create it?", default=True):
return 1
self.edit_event()
self.save_event('Event created')
return ReplApplication.main(self, [argv[0]])
def save_event(self, message):
if self.ask("Do you confirm your changes?", default=True):
self.event.save(message)
return True
return False
def edit_event(self):
self.event.title = self.ask('Enter a title', default=self.event.title)
self.event.description = self.ask('Enter a description', default=self.event.description)
self.event.date = self.ask('Enter a date (yyyy-mm-dd)',
default=self.event.date.strftime('%Y-%m-%d') if self.event.date else '',
regexp='^(\d{4}-\d{2}-\d{2})?$')
if self.event.date:
self.event.date = datetime.strptime(self.event.date, '%Y-%m-%d')
s = self.ask('Begin at (HH:MM)',
default=self.event.begin.strftime('%H:%M') if self.event.begin else '',
regexp='^(\d{2}:\d{2})?$')
if s:
h, m = s.split(':')
self.event.begin = self.event.date.replace(hour=int(h), minute=int(m))
s = self.ask('End at (HH:MM)',
default=self.event.end.strftime('%H:%M') if self.event.end else '',
regexp='^(\d{2}:\d{2})?$')
if s:
h, m = s.split(':')
self.event.end = self.event.date.replace(hour=int(h), minute=int(m))
self.event.check_time_coherence()
self.event.location = self.ask('Enter a location', default=self.event.location)
def edit_member(self, member):
if member.name is None:
firstname = self.ask('Enter your firstname')
lastname = self.ask('Enter your lastname')
member.name = '%s %s' % (firstname, lastname)
else:
member.name = self.ask('Enter your name', default=member.name)
if self.event.date is None:
member.availabilities = self.ask('Enter availabilities', default=member.availabilities)
member.repository = self.ask('Enter your repository (ex. romain/weboob.git)', default=member.repository)
member.hardware = self.ask('Enter your hardware', default=member.hardware)
def do_progress(self, line):
"""
progress
Display progress of members.
"""
self.event.load()
for member in self.event.members.itervalues():
if member.is_me and member is self.event.winner:
status = '\o/ ->'
elif member.is_me:
status = ' ->'
elif member is self.event.winner:
status = ' \o/'
else:
status = ' '
s = u' %s%20s %s|' % (status, member.shortname(), self.BOLD)
for task in member.tasks:
if task.status == task.STATUS_DONE:
s += '##'
elif task.status == task.STATUS_PROGRESS:
s += u'=>'
else:
s += ' '
s += '|%s' % self.NC
print s
print ''
now = datetime.now()
if self.event.begin > now:
d = self.event.begin - now
msg = 'The event will start in %d days, %02d:%02d:%02d'
elif self.event.end < now:
d = now - self.event.end
msg = 'The event is finished since %d days, %02d:%02d:%02d'
else:
tot = (self.event.end - self.event.begin).seconds
cur = (datetime.now() - self.event.begin).seconds
pct = cur*20/tot
progress = ''
for i in xrange(20):
if i < pct:
progress += '='
elif i == pct:
progress += '>'
else:
progress += ' '
print 'Event started: %s |%s| %s' % (self.event.begin.strftime('%H:%M'),
progress,
self.event.end.strftime('%H:%M'))
d = self.event.end - now
msg = 'The event will be finished in %d days, %02d:%02d:%02d'
print msg % (d.days, d.seconds/3600, d.seconds%3600/60, d.seconds%60)
def do_tasks(self, line):
"""
tasks
Display all tasks of members.
"""
self.event.load()
stop = False
i = -2
while not stop:
if i >= 0 and not i%2:
sys.stdout.write(' #%-2d' % (i/2))
else:
sys.stdout.write(' ')
if i >= 0 and i%2:
# second line of task, see if we'll stop
stop = True
for mem in self.event.members.itervalues():
if len(mem.tasks) > (i/2+1):
# there are more tasks, don't stop now
stop = False
if i == -2:
sys.stdout.write(' %s%-20s%s' % (self.BOLD, mem.shortname().encode('utf-8'), self.NC))
elif i == -1:
sys.stdout.write(' %s%-20s%s' % (self.BOLD, '-' * len(mem.shortname()), self.NC))
elif len(mem.tasks) <= (i/2):
sys.stdout.write(' ' * (20+1))
else:
task = mem.tasks[i/2]
if task.status == task.STATUS_DONE:
status = u'#'
elif task.status == task.STATUS_PROGRESS:
if not i%2:
status = u'|' #1st line
else:
status = u'v' #2nd line
else:
status = u' '
if not i%2: #1st line
line = u'%s %s' % (status, task.backend)
else: #2nd line
line = u'%s `-%s' % (status, task.capability[3:])
sys.stdout.write((u' %-20s' % line).encode('utf-8'))
sys.stdout.write('\n')
i += 1
def complete_close(self, text, line, *ignored):
args = line.split(' ')
if len(args) == 2:
self.event.load()
return [member.name for member in self.event.members.itervalues()]
def do_close(self, name):
"""
close WINNER
Close the event and set the winner.
"""
self.event.load()
for member in self.event.members.itervalues():
if member.name == name:
self.winner = member
if self.save_event('Close event'):
print 'Event is now closed. Winner is %s!' % self.winner.name
return
print >>sys.stderr, '"%s" not found' % name
return 3
def complete_edit(self, text, line, *ignored):
args = line.split(' ')
if len(args) == 2:
return ['event', 'me']
def do_edit(self, line):
"""
edit [event | me]
Edit information about you or about event.
"""
if not line:
print >>sys.stderr, 'Syntax: edit [event | me]'
return 2
self.event.load()
if line == 'event':
self.edit_event()
self.save_event('Event edited')
elif line == 'me':
mem = self.event.get_me()
if not mem:
print >>sys.stderr, 'You haven\'t joined the event.'
return 1
self.edit_member(mem)
self.save_event('Member edited')
else:
print >>sys.stderr, 'Unable to edit "%s"' % line
return 1
def do_info(self, line):
"""
info
Display information about this event.
"""
self.event.load()
print self.event.title
print '-' * len(self.event.title)
print self.event.description
print ''
print 'Date:', self.event.date.strftime('%Y-%m-%d') if self.event.date else 'Unknown'
print 'Begin:', self.event.begin.strftime('%H:%M') if self.event.begin else 'Unknown'
print 'End:', self.event.end.strftime('%H:%M') if self.event.end else 'Unknown'
print 'Duration:', self.event.format_duration() or 'Unknown'
print 'Location:', self.event.location or 'Unknown'
print ''
print 'There are %d members, use the "members" command to list them' % len(self.event.members)
if self.event.get_me() is None:
print 'To join this event, use the command "join".'
def do_members(self, line):
"""
members
Display members informations.
"""
self.event.load()
for member in self.event.members.itervalues():
print member.name
print '-' * len(member.name)
print 'Repository:', member.repository
if self.event.date is None:
print 'Availabilities:', member.availabilities
print 'Hardware:', member.hardware
accompl = 0
for task in member.tasks:
if task.status == task.STATUS_DONE:
accompl += 1
print '%d tasks (%d accomplished)' % (len(member.tasks), accompl)
if member is self.event.winner:
print '=== %s is the winner!' % member.name
print ''
print 'Use the "tasks" command to display all tasks'
def do_join(self, line):
"""
join
Join this event.
"""
self.event.load()
if self.event.backend.browser.get_userid() in self.event.members:
print >>sys.stderr, 'You have already joined this event.'
return 1
if self.event.is_closed():
print >>sys.stderr, "Boobathon is closed."
return 1
m = Member(self.event.backend.browser.get_userid(), None)
self.edit_member(m)
self.event.members[m.id] = m
self.save_event('Joined the event')
def do_leave(self, line):
"""
leave
Leave this event.
"""
self.event.load()
if self.event.currently_in_event():
print >>sys.stderr, 'Unable to leave during the event, loser!'
return 1
if self.event.is_closed():
print >>sys.stderr, "Boobathon is closed."
return 1
try:
self.event.members.pop(self.event.backend.browser.get_userid())
except KeyError:
print >>sys.stderr, "You have not joined this event."
return 1
else:
self.save_event('Left the event')
def do_remtask(self, line):
"""
remtask TASK_ID
Remove a task.
"""
self.event.load()
mem = self.event.get_me()
if not mem:
print >>sys.stderr, "You have not joined this event."
return 1
if self.event.is_closed():
print >>sys.stderr, "Boobathon is closed."
return 1
try:
task_id = int(line)
except ValueError:
print >>sys.stderr, 'The task ID should be a number'
return 2
try:
task = mem.tasks.pop(task_id)
except IndexError:
print >>sys.stderr, 'Unable to find task #%d' % task_id
return 1
else:
print 'Removing task #%d (%s,%s).' % (task_id, task.backend, task.capability)
self.save_event('Remove task')
def do_addtask(self, line):
"""
addtask BACKEND CAPABILITY
Add a new task.
"""
self.event.load()
mem = self.event.get_me()
if not mem:
print >>sys.stderr, "You have not joined this event."
return 1
if self.event.is_closed():
print >>sys.stderr, "Boobathon is closed."
return 1
backend, capability = self.parse_command_args(line, 2, 2)
if not backend[0].isupper():
print >>sys.stderr, 'The backend name "%s" needs to start with a capital.' % backend
return 2
if not capability.startswith('Cap') or not capability[3].isupper():
print >>sys.stderr, '"%s" is not a proper capability name (must start with Cap).' % capability
return 2
for task in mem.tasks:
if (task.backend,task.capability) == (backend,capability):
print >>sys.stderr, "A task already exists for that."
return 1
task = Task(backend, capability)
mem.tasks.append(task)
self.save_event('New task')
def do_start(self, line):
"""
start [TASK_ID]
Start a task. If you don't give a task ID, the first available
task will be taken.
"""
self.event.load()
mem = self.event.get_me()
if not mem:
print >>sys.stderr, "You have not joined this event."
return 1
if len(mem.tasks) == 0:
print >>sys.stderr, "You don't have any task to do."
return 1
if not self.event.currently_in_event():
print >>sys.stderr, "You can't start a task, we are not in event."
return 1
if line.isdigit():
task_id = int(line)
else:
task_id = -1
last_done = -1
for i, task in enumerate(mem.tasks):
if task.status == task.STATUS_DONE:
last_done = i
elif task.status == task.STATUS_PROGRESS:
task.status = task.STATUS_NONE
print 'Task #%s (%s,%s) canceled.' % (i, task.backend, task.capability)
if (i == task_id or task_id < 0) and task.status == task.STATUS_NONE:
break
else:
print >>sys.stderr, 'Task not found.'
return 3
if task.status == task.STATUS_DONE:
print >>sys.stderr, 'Task is already done.'
return 1
task.status = task.STATUS_PROGRESS
mem.tasks.remove(task)
mem.tasks.insert(last_done + 1, task)
self.save_event('Started a task')
def do_done(self, line):
"""
done
Set the current task as done.
"""
self.event.load()
mem = self.event.get_me()
if not mem:
print >>sys.stderr, "You have not joined this event."
return 1
if self.event.is_closed():
print >>sys.stderr, "Boobathon is closed."
return 1
for i, task in enumerate(mem.tasks):
if task.status == task.STATUS_PROGRESS:
print 'Task (%s,%s) done! (%d%%)' % (task.backend, task.capability, (i+1)*100/len(mem.tasks))
if self.event.currently_in_event():
task.status = task.STATUS_DONE
task.date = datetime.now()
task.branch = self.ask('Enter name of branch')
self.save_event('Task accomplished')
else:
task.status = task.STATUS_NONE
print >>sys.stderr, 'Oops, you are out of event. Canceling the task...'
self.save_event('Cancel task')
return 1
return
print >>sys.stderr, "There isn't any task in progress."
return 1
def do_cancel(self, line):
"""
cancel
Cancel the current task.
"""
self.event.load()
mem = self.event.get_me()
if not mem:
print >>sys.stderr, "You have not joined this event."
return 1
if self.event.is_closed():
print >>sys.stderr, "Boobathon is closed."
return 1
for task in mem.tasks:
if task.status == task.STATUS_PROGRESS:
print 'Task (%s,%s) canceled.' % (task.backend, task.capability)
task.status = task.STATUS_NONE
self.save_event('Cancel task')
return
print >>sys.stderr, "There isn't any task in progress."
return 1
def load_default_backends(self):
"""
Overload a BaseApplication method.
"""
for instance_name, backend_name, params in self.weboob.backends_config.iter_backends():
if backend_name != 'redmine':
continue
v = urlsplit(params['url'])
if v.netloc == 'symlink.me':
self.load_backends(names=[instance_name])
return
if not self.check_loaded_backends({'url': 'https://symlink.me'}):
print "Ok, so leave now, fag."
sys.exit(0)
def is_backend_loadable(self, backend):
"""
Overload a ConsoleApplication method.
"""
return backend.name == 'redmine'
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/boobmsg/ 0000775 0000000 0000000 00000000000 11666415431 0030727 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000001426 11666415431 0032764 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/boobmsg # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Christophe Benz
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from .boobmsg import Boobmsg
__all__ = ['Boobmsg']
boobmsg.py 0000664 0000000 0000000 00000031151 11666415431 0032653 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/boobmsg # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Christophe Benz
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
import sys
from weboob.core import CallErrors
from weboob.capabilities.messages import ICapMessages, Message, Thread
from weboob.capabilities.account import ICapAccount
from weboob.tools.application.repl import ReplApplication
from weboob.tools.application.formatters.iformatter import IFormatter
from weboob.tools.misc import html2text
__all__ = ['Boobmsg']
class XHtmlFormatter(IFormatter):
def flush(self):
pass
def format_dict(self, item):
result = "
\n"
result += "
%s
" % (item['title'])
result += "
"
result += "
Date
%s
" % (item['date'])
result += "
Sender
%s
" % (item['sender'])
result += "
Signature
%s
" % (item['signature'])
result += "
"
result += "
%s
" % (item['content'])
result += "
\n"
return result
class MessageFormatter(IFormatter):
def flush(self):
pass
def format_dict(self, item):
result = u'%sTitle:%s %s\n' % (self.BOLD,
self.NC, item['title'])
result += u'%sDate:%s %s\n' % (self.BOLD,
self.NC, item['date'])
result += u'%sFrom:%s %s\n' % (self.BOLD,
self.NC, item['sender'])
if item['receivers']:
result += u'%sTo:%s %s\n' % (self.BOLD,
self.NC,
', '.join(item['receivers']))
if item['flags'] & Message.IS_HTML:
content = html2text(item['content'])
else:
content = item['content']
result += '\n%s' % content
if item['signature']:
if item['flags'] & Message.IS_HTML:
signature = html2text(item['signature'])
else:
signature = item['signature']
result += '\n-- \n%s' % signature
return result
class MessagesListFormatter(IFormatter):
MANDATORY_FIELDS = ()
count = 0
_list_messages = False
def flush(self):
self.count = 0
def format_dict(self, item):
if not self._list_messages:
return self.format_dict_thread(item)
else:
return self.format_dict_messages(item)
def format_dict_thread(self, item):
self.count += 1
if item['nb_unread'] and item['nb_unread'] > 0:
unread = '[N]'
else:
unread = ' '
if self.interactive:
backend = item['id'].split('@', 1)[1]
result = u'%s* (%d) %s %s (%s)%s' % (self.BOLD,
self.count, unread,
item['title'], backend,
self.NC)
else:
result = u'%s* (%s) %s %s%s' % (self.BOLD, item['id'],
unread, item['title'],
self.NC)
if item['date']:
result += u'\n %s' % item['date']
return result
def format_dict_messages(self, item):
backend = item['id'].split('@', 1)[1]
if item['flags'] == Thread.IS_THREADS:
depth = 0
else:
depth = -1
result = self.format_message(backend, item['root'], depth)
return result
def format_message(self, backend, message, depth=0):
if not message:
return u''
self.count += 1
flags = '['
if message.flags & message.IS_UNREAD:
flags += 'N'
else:
flags += '-'
if message.flags & message.IS_NOT_ACCUSED:
flags += 'U'
elif message.flags & message.IS_ACCUSED:
flags += 'R'
else:
flags += '-'
flags += ']'
if self.interactive:
result = u'%s%s* (%d)%s %s <%s> %s (%s)\n' % (depth * ' ',
self.BOLD,
self.count,
self.NC,
flags,
message.sender,
message.title,
backend)
else:
result = u'%s%s* (%s.%s@%s)%s %s <%s> %s\n' % (depth * ' ',
self.BOLD,
message.thread.id,
message.id,
backend,
flags,
self.NC,
message.sender,
message.title)
if message.children:
if depth >= 0:
depth += 1
for m in message.children:
result += self.format_message(backend, m, depth)
return result
class Boobmsg(ReplApplication):
APPNAME = 'boobmsg'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2010-2011 Christophe Benz'
DESCRIPTION = "Console application allowing to send messages on various websites and " \
"to display message threads and contents."
CAPS = ICapMessages
EXTRA_FORMATTERS = {'msglist': MessagesListFormatter,
'msg': MessageFormatter,
'xhtml': XHtmlFormatter,
}
COMMANDS_FORMATTERS = {'list': 'msglist',
'show': 'msg',
'export_thread': 'msg',
'export_all': 'msg',
'ls': 'msglist',
}
def add_application_options(self, group):
group.add_option('-e', '--skip-empty', action='store_true',
help='Don\'t send messages with an empty body.')
group.add_option('-t', '--title', action='store',
help='For the "post" command, set a title to message',
type='string', dest='title')
def load_default_backends(self):
self.load_backends(ICapMessages, storage=self.create_storage())
def do_status(self, line):
"""
status
Display status information about a backend.
"""
if len(line) > 0:
backend_name = line
else:
backend_name = None
results = {}
for backend, field in self.do('get_account_status',
backends=backend_name,
caps=ICapAccount):
if backend.name in results:
results[backend.name].append(field)
else:
results[backend.name] = [field]
for name, fields in results.iteritems():
print ':: %s ::' % name
for f in fields:
if f.flags & f.FIELD_HTML:
value = html2text(f.value)
else:
value = f.value
print '%s: %s' % (f.label, value)
print ''
def do_post(self, line):
"""
post RECEIVER@BACKEND[,RECEIVER@BACKEND[...]] [TEXT]
Post a message to the specified receivers.
Multiple receivers are separated by a comma.
If no text is supplied on command line, the content of message is read on stdin.
"""
receivers, text = self.parse_command_args(line, 2, 1)
if text is None:
text = self.acquire_input()
if self.options.skip_empty and not text.strip():
return
for receiver in receivers.strip().split(','):
receiver, backend_name = self.parse_id(receiver.strip(),
unique_backend=True)
if not backend_name and len(self.enabled_backends) > 1:
self.logger.warning(u'No backend specified for receiver "%s": message will be sent with all the '
'enabled backends (%s)' % (receiver,
','.join(backend.name for backend in self.enabled_backends)))
if '.' in receiver:
# It's a reply
thread_id, parent_id = receiver.rsplit('.', 1)
else:
# It's an original message
thread_id = receiver
parent_id = None
thread = Thread(thread_id)
message = Message(thread,
0,
title=self.options.title,
parent=Message(thread, parent_id) if parent_id else None,
content=text)
try:
self.do('post_message', message, backends=backend_name).wait()
except CallErrors, errors:
self.bcall_errors_handler(errors)
else:
if self.interactive:
print 'Message sent sucessfully to %s' % receiver
threads = []
messages = []
def do_list(self, arg):
"""
list
Display all threads.
"""
if len(arg) > 0:
try:
thread = self.threads[int(arg) - 1]
except (IndexError, ValueError):
id, backend_name = self.parse_id(arg)
else:
id = thread.id
backend_name = thread.backend
self.messages = []
cmd = self.do('get_thread', id, backends=backend_name)
self.formatter._list_messages = True
else:
self.threads = []
cmd = self.do('iter_threads')
self.formatter._list_messages = False
for backend, thread in cmd:
if not thread:
continue
if len(arg) > 0:
for m in thread.iter_all_messages():
if not m.backend:
m.backend = thread.backend
self.messages.append(m)
else:
self.threads.append(thread)
self.format(thread)
self.flush()
def do_export_all(self, arg):
"""
export_all
Export All threads
"""
def func(backend):
for thread in backend.iter_threads():
if not thread:
continue
t = backend.fillobj(thread, None)
for msg in t.iter_all_messages():
yield msg
for backend, msg in self.do(func):
self.format(msg)
def do_export_thread(self, arg):
"""
export_thread
Export a thread
"""
_id, backend_name = self.parse_id(arg)
cmd = self.do('get_thread', _id, backends=backend_name)
for backend, thread in cmd:
if thread is not None :
for msg in thread.iter_all_messages():
self.format(msg)
def do_show(self, arg):
"""
show MESSAGE
Read a message
"""
if len(arg) == 0:
print >>sys.stderr, 'Please give a message ID.'
return 2
try:
message = self.messages[int(arg) - 1]
except (IndexError, ValueError):
id, backend_name = self.parse_id(arg)
else:
self.format(message)
self.weboob.do('set_message_read', message, backends=message.backend)
return
if not self.interactive:
print >>sys.stderr, 'Oops, you need to be in interactive mode to read messages'
return 1
else:
print >>sys.stderr, 'Message not found'
return 3
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/boobtracker/ 0000775 0000000 0000000 00000000000 11666415431 0031574 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000001433 11666415431 0033627 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/boobtracker # -*- coding: utf-8 -*-
# Copyright(C) 2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from .boobtracker import BoobTracker
__all__ = ['BoobTracker']
boobtracker.py 0000664 0000000 0000000 00000031232 11666415431 0034365 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/boobtracker # -*- coding: utf-8 -*-
# Copyright(C) 2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from datetime import timedelta
import sys
from weboob.capabilities.bugtracker import ICapBugTracker, Query, Update
from weboob.tools.application.repl import ReplApplication
from weboob.tools.application.formatters.iformatter import IFormatter
from weboob.tools.misc import html2text
__all__ = ['BoobTracker']
class IssueFormatter(IFormatter):
MANDATORY_FIELDS = ('id', 'project', 'title', 'body', 'author')
def flush(self):
pass
def format_dict(self, item):
result = u'%s%s - #%s - %s%s\n' % (self.BOLD, item['project'].name, item['id'], item['title'], self.NC)
result += '\n%s\n\n' % item['body']
result += 'Author: %s (%s)\n' % (item['author'].name, item['creation'])
if item['status']:
result += 'Status: %s\n' % item['status'].name
if item['version']:
result += 'Version: %s\n' % item['version'].name
if item['category']:
result += 'Category: %s\n' % item['category']
if item['assignee']:
result += 'Assignee: %s\n' % (item['assignee'].name)
if item['attachments']:
result += '\nAttachments:\n'
for a in item['attachments']:
result += '* %s%s%s <%s>\n' % (self.BOLD, a.filename, self.NC, a.url)
if item['history']:
result += '\nHistory:\n'
for u in item['history']:
result += '* %s%s - %s%s\n' % (self.BOLD, u.date, u.author.name, self.NC)
for change in u.changes:
result += ' - %s%s%s: %s -> %s\n' % (self.BOLD, change.field, self.NC, change.last, change.new)
if u.message:
result += html2text(u.message)
return result
class IssuesListFormatter(IFormatter):
MANDATORY_FIELDS = ('id', 'project', 'status', 'title', 'category')
count = 0
def flush(self):
self.count = 0
pass
def format_dict(self, item):
self.count += 1
result = u'%s* (%s) %s - [%s] %s%s\n' % (self.BOLD, item['id'], item['project'].name, item['status'].name, item['title'], self.NC)
result += ' %s' % (item['category'])
return result
class BoobTracker(ReplApplication):
APPNAME = 'boobtracker'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2011 Romain Bignon'
DESCRIPTION = "Console application allowing to send messages on various websites and " \
"to display message threads and contents."
CAPS = ICapBugTracker
EXTRA_FORMATTERS = {'issue_info': IssueFormatter,
'issues_list': IssuesListFormatter,
}
COMMANDS_FORMATTERS = {'get': 'issue_info',
'post': 'issue_info',
'edit': 'issue_info',
'search': 'issues_list',
'ls': 'issues_list',
}
def add_application_options(self, group):
group.add_option('--author')
group.add_option('--title')
group.add_option('--assignee')
group.add_option('--target-version', dest='version')
group.add_option('--category')
group.add_option('--status')
def do_search(self, line):
"""
search PROJECT
List issues for a project.
You can use these filters from command line:
--author AUTHOR
--title TITLE_PATTERN
--assignee ASSIGNEE
--target-version VERSION
--category CATEGORY
--status STATUS
"""
query = Query()
path = self.working_path.get()
backends = []
if line.strip():
query.project, backends = self.parse_id(line, unique_backend=True)
elif len(path) > 0:
query.project = path[0]
else:
print >>sys.stderr, 'Please enter a project name'
return 1
query.author = self.options.author
query.title = self.options.title
query.assignee = self.options.assignee
query.version = self.options.version
query.category = self.options.category
query.status = self.options.status
self.change_path('/%s/search' % query.project)
for backend, issue in self.do('iter_issues', query, backends=backends):
self.add_object(issue)
self.format(issue)
self.flush()
def complete_get(self, text, line, *ignored):
args = line.split(' ')
if len(args) == 2:
return self._complete_object()
def do_get(self, line):
"""
get ISSUE
Get an issue and display it.
"""
if not line:
print >>sys.stderr, 'This command takes an argument: %s' % self.get_command_help('get', short=True)
return 2
issue = self.get_object(line, 'get_issue')
if not issue:
print >>sys.stderr, 'Issue not found: %s' % line
return 3
self.format(issue)
self.flush()
def complete_comment(self, text, line, *ignored):
args = line.split(' ')
if len(args) == 2:
return self._complete_object()
def do_comment(self, line):
"""
comment ISSUE [TEXT]
Comment an issue. If no text is given, enter it in standard input.
"""
id, text = self.parse_command_args(line, 2, 1)
if text is None:
text = self.acquire_input()
id, backend_name = self.parse_id(id, unique_backend=True)
update = Update(0)
update.message = text
self.do('update_issue', id, update, backends=backend_name).wait()
def do_logtime(self, line):
"""
logtime ISSUE HOURS [TEXT]
Log spent time on an issue.
"""
id, hours, text = self.parse_command_args(line, 3, 2)
if text is None:
text = self.acquire_input()
try:
hours = float(hours)
except ValueError:
print >>sys.stderr, 'Error: HOURS parameter may be a float'
return 1
id, backend_name = self.parse_id(id, unique_backend=True)
update = Update(0)
update.message = text
update.hours = timedelta(hours=hours)
self.do('update_issue', id, update, backends=backend_name).wait()
def complete_remove(self, text, line, *ignored):
args = line.split(' ')
if len(args) == 2:
return self._complete_object()
def do_remove(self, line):
"""
remove ISSUE
Remove an issue.
"""
id, backend_name = self.parse_id(line, unique_backend=True)
self.do('remove_issue', id, backends=backend_name).wait()
ISSUE_FIELDS = (('title', (None, False)),
('assignee', ('members', True)),
('version', ('versions', True)),
('category', ('categories', False)),
('status', ('statuses', True)),
)
def get_list_item(self, objects_list, name):
if name is None:
return None
for obj in objects_list:
if obj.name.lower() == name.lower():
return obj
print 'Error: "%s" is not found' % name
return None
def prompt_issue(self, issue, requested_key=None, requested_value=None):
for key, (list_name, is_list_object) in self.ISSUE_FIELDS:
if requested_key and requested_key != key:
continue
if requested_value:
value = requested_value
elif not self.interactive:
value = getattr(self.options, key)
else:
value = None
if sys.stdin.isatty():
default = getattr(issue, key)
if not default:
default = None
elif 'name' in dir(default):
default = default.name
if list_name is None:
if value is not None:
setattr(issue, key, value)
print '%s: %s' % (key.capitalize(), value)
continue
setattr(issue, key, self.ask(key.capitalize(), default=default))
else:
objects_list = getattr(issue.project, list_name)
if len(objects_list) == 0:
continue
print '----------'
if value is not None:
if is_list_object:
value = self.get_list_item(objects_list, value)
if value is not None:
setattr(issue, key, value)
print '%s: %s' % (key.capitalize(), value.name)
continue
while value is None:
print 'Availables:', ', '.join([(o if isinstance(o, basestring) else o.name) for o in objects_list])
if is_list_object and getattr(issue, key):
default = getattr(issue, key).name
else:
default = getattr(issue, key) or ''
text = self.ask(key.capitalize(), default=default)
if not text:
break
if is_list_object:
value = self.get_list_item(objects_list, text)
else:
value = text
if value is not None:
setattr(issue, key, value)
def do_post(self, line):
"""
post PROJECT
Post a new issue.
If you are not in interactive mode, you can use these parameters:
--title TITLE
--assignee ASSIGNEE
--target-version VERSION
--category CATEGORY
--status STATUS
"""
if not line.strip():
print 'Please give the project name'
return 1
project, backend_name = self.parse_id(line, unique_backend=True)
backend = self.weboob.get_backend(backend_name)
issue = backend.create_issue(project)
self.prompt_issue(issue)
if sys.stdin.isatty():
print '----------'
print 'Please enter the content of this new issue.'
issue.body = self.acquire_input()
for backend, issue in self.weboob.do('post_issue', issue, backends=backend):
if issue:
print 'Issue %s%s@%s%s created' % (self.BOLD, issue.id, issue.backend, self.NC)
def complete_remove(self, text, line, *ignored):
args = line.split(' ')
if len(args) == 2:
return self._complete_object()
if len(args) == 3:
return dict(self.ISSUE_FIELDS).keys()
def do_edit(self, line):
"""
edit ISSUE [KEY [VALUE]]
Edit an issue.
If you are not in interactive mode, you can use these parameters:
--title TITLE
--assignee ASSIGNEE
--target-version VERSION
--category CATEGORY
--status STATUS
"""
_id, key, value = self.parse_command_args(line, 3, 1)
issue = self.get_object(_id, 'get_issue')
if not issue:
print >>sys.stderr, 'Issue not found: %s' % _id
return 3
self.prompt_issue(issue, key, value)
for backend, i in self.weboob.do('post_issue', issue, backends=issue.backend):
if i:
print 'Issue %s%s@%s%s updated' % (self.BOLD, issue.id, issue.backend, self.NC)
self.format(i)
self.flush()
def complete_attach(self, text, line, *ignored):
args = line.split(' ')
if len(args) == 2:
return self._complete_object()
elif len(args) >= 3:
return self.path_completer(args[2])
def do_attach(self, line):
"""
attach ISSUE FILENAME
Attach a file to an issue (Not implemented yet).
"""
print >>sys.stderr, 'Not implemented yet.'
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/chatoob/ 0000775 0000000 0000000 00000000000 11666415431 0030716 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000001425 11666415431 0032752 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/chatoob # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Christophe Benz
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from .chatoob import Chatoob
__all__ = ['Chatoob']
chatoob.py 0000664 0000000 0000000 00000004165 11666415431 0032636 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/chatoob # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Christophe Benz
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
import logging
from weboob.tools.application.repl import ReplApplication
from weboob.capabilities.chat import ICapChat
from weboob.capabilities.contact import ICapContact, Contact
__all__ = ['Chatoob']
class Chatoob(ReplApplication):
APPNAME = 'chatoob'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2010-2011 Christophe Benz'
DESCRIPTION = 'Console application allowing to chat with contacts on various websites.'
CAPS = ICapChat
def on_new_chat_message(self, message):
print 'on_new_chat_message: %s' % message
def do_list(self, line):
"""
list
List all contacts.
"""
for backend, contact in self.do('iter_contacts', status=Contact.STATUS_ONLINE, caps=ICapContact):
self.format(contact)
self.flush()
def do_messages(self, line):
"""
messages
Get messages.
"""
for backend, message in self.do('iter_chat_messages'):
self.format(message)
self.flush()
def do_send(self, line):
"""
send CONTACT MESSAGE
Send a message to the specified contact.
"""
_id, message = self.parse_command_args(line, 2, 2)
for backend, result in self.do('send_chat_message', _id, message):
if not result:
logging.error(u'Failed to send message to contact id="%s" on backend "%s"' % (_id, backend.name))
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/galleroob/ 0000775 0000000 0000000 00000000000 11666415431 0031245 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000001434 11666415431 0033301 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/galleroob # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Noé Rubinstein
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from .galleroob import Galleroob
__all__ = ['Galleroob']
galleroob.py 0000664 0000000 0000000 00000011253 11666415431 0033510 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/galleroob # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Noé Rubinstein
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
import sys
import os
from re import search, sub
from weboob.tools.application.repl import ReplApplication
from weboob.capabilities.base import NotLoaded
from weboob.capabilities.gallery import ICapGallery
from weboob.tools.application.formatters.iformatter import IFormatter
__all__ = ['Galleroob']
class GalleryListFormatter(IFormatter):
MANDATORY_FIELDS = ('id', 'title')
count = 0
def flush(self):
self.count = 0
def format_dict(self, item):
result = u'%s* (%s) %s%s' % (
ReplApplication.BOLD,
item['id'],
item['title'],
ReplApplication.NC)
if item['cardinality'] is not NotLoaded:
result += u' (%d pages)' % item['cardinality']
if item['description'] is not NotLoaded:
result += u'\n %-70s' % item['description']
return result
class Galleroob(ReplApplication):
APPNAME = 'galleroob'
VERSION = '0.9.1'
COPYRIGHT = u'Copyright(C) 2011 Noé Rubinstein'
DESCRIPTION = 'galleroob browses and downloads web image galleries'
CAPS = ICapGallery
EXTRA_FORMATTERS = {'gallery_list': GalleryListFormatter}
COMMANDS_FORMATTERS = {'search': 'gallery_list'}
def __init__(self, *args, **kwargs):
ReplApplication.__init__(self, *args, **kwargs)
def do_search(self, pattern=None):
"""
search PATTERN
List galleries matching a PATTERN.
If PATTERN is not given, the command will list all the galleries
"""
self.set_formatter_header(u'Search pattern: %s' %
pattern if pattern else u'Latest galleries')
for backend, gallery in self.do('iter_search_results',
pattern=pattern, max_results=self.options.count):
self.add_object(gallery)
self.format(gallery)
def do_download(self, line):
"""
download ID [FIRST [FOLDER]]
Download a gallery.
Begins at page FIRST (default: 0) and saves to FOLDER (default: title)
"""
_id, first, dest = self.parse_command_args(line, 3, 1)
if first is None:
first = 0
else:
first = int(first)
gallery = None
_id, backend = self.parse_id(_id)
for _backend, result in self.do('get_gallery', _id, backends=backend):
if result:
backend = _backend
gallery = result
if not gallery:
print >>sys.stderr, 'Gallery not found: %s' % _id
return 3
backend.fillobj(gallery, ('title',))
if dest is None:
dest = sub('/', ' ', gallery.title)
print "Downloading to %s" % dest
try:
os.mkdir(dest)
except OSError:
pass # ignore error on existing directory
os.chdir(dest) # fail here if dest couldn't be created
i = 0
for img in backend.iter_gallery_images(gallery):
i += 1
if i < first:
continue
backend.fillobj(img, ('url','data'))
if img.data is None:
backend.fillobj(img, ('url','data'))
if img.data is None:
print >>sys.stderr, "Couldn't get page %d, exiting" % i
break
ext = search(r"\.([^\.]{1,5})$", img.url)
if ext:
ext = ext.group(1)
else:
ext = "jpg"
name = '%03d.%s' % (i, ext)
print 'Writing file %s' % name
with open(name, 'w') as f:
f.write(img.data)
os.chdir(os.path.pardir)
def do_info(self, line):
"""
info ID
Get information about a gallery.
"""
_id, = self.parse_command_args(line, 1, 1)
gallery = self.get_object(_id, 'get_gallery')
if not gallery:
print >>sys.stderr, 'Gallery not found: %s' % _id
return 3
self.format(gallery)
self.flush()
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/geolooc/ 0000775 0000000 0000000 00000000000 11666415431 0030726 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000001530 11666415431 0032757 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/geolooc # -*- coding: utf-8 -*-
# vim: ft=python et softtabstop=4 cinoptions=4 shiftwidth=4 ts=4 ai
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from .geolooc import Geolooc
__all__ = ['Geolooc']
geolooc.py 0000664 0000000 0000000 00000002517 11666415431 0032655 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/geolooc # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
import sys
from weboob.capabilities.geolocip import ICapGeolocIp
from weboob.tools.application.repl import ReplApplication
__all__ = ['Geolooc']
class Geolooc(ReplApplication):
APPNAME = 'geolooc'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2010-2011 Romain Bignon'
DESCRIPTION = 'Console application allowing to geolocalize IP addresses.'
CAPS = ICapGeolocIp
def main(self, argv):
if len(argv) < 2:
print >>sys.stderr, 'Syntax: %s ipaddr' % argv[0]
return 2
for backend, location in self.do('get_location', argv[1]):
self.format(location)
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/havesex/ 0000775 0000000 0000000 00000000000 11666415431 0030742 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000001424 11666415431 0032775 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/havesex # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from .havesex import HaveSex
__all__ = ['HaveSex']
havesex.py 0000664 0000000 0000000 00000026301 11666415431 0032702 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/havesex # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
import sys
import weboob
from weboob.tools.application.repl import ReplApplication
from weboob.tools.application.formatters.iformatter import IFormatter
from weboob.capabilities.dating import ICapDating, OptimizationNotFound
from weboob.capabilities.contact import Contact
__all__ = ['HaveSex']
class ProfileFormatter(IFormatter):
def flush(self):
pass
def print_node(self, node, level=1):
result = u''
if node.flags & node.SECTION:
result += u'\t' * level + node.label + '\n'
for sub in node.value.itervalues():
result += self.print_node(sub, level+1)
else:
if isinstance(node.value, (tuple,list)):
value = ', '.join(unicode(v) for v in node.value)
else:
value = node.value
result += u'\t' * level + u'%-20s %s\n' % (node.label + ':', value)
return result
def format_dict(self, item):
result = u'Nickname: %s\n' % item['name']
if item['status'] & Contact.STATUS_ONLINE:
s = 'online'
elif item['status'] & Contact.STATUS_OFFLINE:
s = 'offline'
elif item['status'] & Contact.STATUS_AWAY:
s = 'away'
else:
s = 'unknown'
result += u'Status: %s (%s)\n' % (s, item['status_msg'])
result += u'Photos:\n'
for name, photo in item['photos'].iteritems():
result += u'\t%s%s\n' % (photo, ' (hidden)' if photo.hidden else '')
result += u'Profile:\n'
for head in item['profile'].itervalues():
result += self.print_node(head)
result += u'Description:\n'
for s in item['summary'].split('\n'):
result += u'\t%s\n' % s
return result
class HaveSex(ReplApplication):
APPNAME = 'havesex'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2010-2011 Romain Bignon'
DESCRIPTION = 'Console application allowing to interact with various dating websites ' \
'and to optimize seduction algorithmically.'
STORAGE_FILENAME = 'dating.storage'
STORAGE = {'optims': {}}
CAPS = ICapDating
EXTRA_FORMATTERS = {'profile': ProfileFormatter}
COMMANDS_FORMATTERS = {'optim': 'table',
'profile': 'profile'}
def load_default_backends(self):
self.load_backends(ICapDating, storage=self.create_storage(self.STORAGE_FILENAME))
def main(self, argv):
self.load_config()
try:
self.do('init_optimizations').wait()
except weboob.core.CallErrors, e:
self.bcall_errors_handler(e)
optimizations = self.storage.get('optims')
for optim, backends in optimizations.iteritems():
self.optims('start', backends, optim, store=False)
return ReplApplication.main(self, argv)
def do_profile(self, id):
"""
profile ID
Display a profile
"""
_id, backend_name = self.parse_id(id, unique_backend=True)
found = 0
for backend, contact in self.do('get_contact', _id, backends=backend_name):
if contact:
self.format(contact)
found = 1
if not found:
self.logger.error(u'Profile not found')
else:
self.flush()
def do_query(self, id):
"""
query ID
Send a query to someone.
"""
_id, backend_name = self.parse_id(id, unique_backend=True)
for backend, query in self.do('send_query', _id, backends=backend_name):
print '%s' % query.message
def edit_optims(self, backend_names, optims_names, stop=False):
if optims_names is None:
print >>sys.stderr, 'Error: missing parameters.'
return 2
for optim_name in optims_names.split():
backends_optims = {}
for backend, optim in self.do('get_optimization', optim_name, backends=backend_names):
if optim:
backends_optims[backend.name] = optim
for backend_name, optim in backends_optims.iteritems():
if len(optim.CONFIG) == 0:
print '%s.%s does not require configuration.' % (backend_name, optim_name)
continue
was_running = optim.is_running()
if stop and was_running:
print 'Stopping %s: %s' % (optim_name, backend_name)
optim.stop()
params = optim.get_config()
if params is None:
params = {}
print 'Configuration of %s.%s' % (backend_name, optim_name)
print '-----------------%s-%s' % ('-' * len(backend_name), '-' * len(optim_name))
for key, value in optim.CONFIG.iteritems():
params[key] = self.ask(value, default=params[key] if (key in params) else value.default)
optim.set_config(params)
if stop and was_running:
print 'Starting %s: %s' % (optim_name, backend_name)
optim.start()
def optims(self, function, backend_names, optims, store=True):
if optims is None:
print >>sys.stderr, 'Error: missing parameters.'
return 2
for optim_name in optims.split():
try:
if store:
storage_optim = set(self.storage.get('optims', optim_name, default=[]))
sys.stdout.write('%sing %s:' % (function.capitalize(), optim_name))
for backend, optim in self.do('get_optimization', optim_name, backends=backend_names):
if optim:
# It's useless to start a started optim, or to stop a stopped one.
if (function == 'start' and optim.is_running()) or \
(function == 'stop' and not optim.is_running()):
continue
# Optim is not configured and would be, ask user to do it.
if function == 'start' and len(optim.CONFIG) > 0 and optim.get_config() is None:
self.edit_optims(backend.name, optim_name)
ret = getattr(optim, function)()
sys.stdout.write(' ' + backend.name)
if not ret:
sys.stdout.write('(failed)')
sys.stdout.flush()
if store:
if function == 'start' and ret:
storage_optim.add(backend.name)
elif function == 'stop':
try:
storage_optim.remove(backend.name)
except KeyError:
pass
sys.stdout.write('.\n')
except weboob.core.CallErrors, errors:
for backend, error, backtrace in errors:
if isinstance(error, OptimizationNotFound):
self.logger.error(u'Error(%s): Optimization "%s" not found' % (backend.name, optim_name))
else:
self.bcall_error_handler(backend, error, backtrace)
if store:
if len(storage_optim) > 0:
self.storage.set('optims', optim_name, list(storage_optim))
else:
self.storage.delete('optims', optim_name)
if store:
self.storage.save()
def complete_optim(self, text, line, *ignored):
args = line.split(' ')
if len(args) == 2:
return ['list', 'start', 'stop', 'edit']
elif len(args) == 3:
return [backend.name for backend in self.enabled_backends]
elif len(args) >= 4:
if args[2] == '*':
backend = None
else:
backend = args[2]
optims = set()
for backend, (name, optim) in self.do('iter_optimizations', backends=backend):
optims.add(name)
return sorted(optims - set(args[3:]))
def do_optim(self, line):
"""
optim [list | start | edit | stop] BACKEND [OPTIM1 [OPTIM2 ...]]
All dating backends offer optimization services. This command can be
manage them.
Use * us BACKEND value to apply command to all backends.
Commands:
* list list all available optimizations of a backend
* start start optimization services on a backend
* edit configure an optimization service for a backend
* stop stop optimization services on a backend
"""
cmd, backend_name, optims_names = self.parse_command_args(line, 3)
if backend_name == '*':
backend_name = None
elif backend_name is not None and not backend_name in [b.name for b in self.enabled_backends]:
print >>sys.stderr, 'Error: No such backend "%s"' % backend_name
return 1
if cmd == 'start':
return self.optims('start', backend_name, optims_names)
if cmd == 'stop':
return self.optims('stop', backend_name, optims_names)
if cmd == 'edit':
self.edit_optims(backend_name, optims_names, stop=True)
return
if cmd == 'list' or cmd is None:
if optims_names is not None:
optims_names = optims_names.split()
optims = {}
backends = set()
for backend, (name, optim) in self.do('iter_optimizations', backends=backend_name):
if optims_names is not None and not name in optims_names:
continue
if optim.is_running():
status = 'RUNNING'
else:
status = '-------'
if not name in optims:
optims[name] = {backend.name: status}
else:
optims[name][backend.name] = status
backends.add(backend.name)
backends = sorted(backends)
for name, backends_status in optims.iteritems():
line = [('name', name)]
for b in backends:
try:
status = backends_status[b]
except KeyError:
status = ''
line.append((b, status))
self.format(tuple(line))
self.flush()
return
print >>sys.stderr, "No such command '%s'" % cmd
return 1
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/masstransit/ 0000775 0000000 0000000 00000000000 11666415431 0031647 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000001544 11666415431 0033705 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/masstransit # -*- coding: utf-8 -*-
# vim: ft=python et softtabstop=4 cinoptions=4 shiftwidth=4 ts=4 ai
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from .masstransit import Masstransit
__all__ = ['Masstransit']
masstransit.py 0000664 0000000 0000000 00000024163 11666415431 0034520 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/masstransit # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Julien Hébert
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from weboob.capabilities.travel import ICapTravel
from weboob.tools.application.base import BaseApplication
from logging import warning
import gtk
class FakeConic(object):
STATUS_CONNECTED = None
STATUS_DISCONNECTED = None
CONNECT_FLAG_NONE = None
def Connection(self):
raise NotImplementedError()
try:
import hildon
except ImportError:
toolkit = gtk
else:
toolkit = hildon
try :
import conic
except ImportError:
warning("conic is not found")
conic = FakeConic()
from logging import debug
__all__ = ['Masstransit']
class MasstransitHildon():
"hildon interface"
def connect_event(self, connection, event=None, c=None, d=None):
debug("DBUS-DEBUG a: %s, b:%s, c:%s,d: %s" % (connection, event, c, d))
status = event.get_status()
if status == conic.STATUS_CONNECTED:
self.connected = True
if self.touch_selector_entry_filled == False:
debug("connected, now fill")
self.fill_touch_selector_entry()
if self.refresh_in_progress:
self.refresh()
elif status == conic.STATUS_DISCONNECTED:
self.connected = False
def __init__(self, weboob):
self.touch_selector_entry_filled = False
self.refresh_in_progress = False
self.connected = False
self.weboob = weboob
try :
self.connection = conic.Connection()
self.connection.connect("connection-event", self.connect_event)
self.connection.set_property("automatic-connection-events", True)
self.connection.request_connection(conic.CONNECT_FLAG_NONE)
except NotImplementedError:
pass
horizontal_box = gtk.HBox()
self.main_window = toolkit.Window()
try :
self.refresh_button = toolkit.Button(
gtk.HILDON_SIZE_AUTO_WIDTH | gtk.HILDON_SIZE_FINGER_HEIGHT,
hildon.BUTTON_ARRANGEMENT_HORIZONTAL,
"Actualiser"
)
self.retour_button = hildon.Button(
gtk.HILDON_SIZE_AUTO_WIDTH | gtk.HILDON_SIZE_FINGER_HEIGHT,
hildon.BUTTON_ARRANGEMENT_HORIZONTAL,
"Retour"
)
self.combo_source = hildon.TouchSelectorEntry(text=True)
self.combo_dest = hildon.TouchSelectorEntry(text=True)
self.picker_button_source = hildon.PickerButton(
gtk.HILDON_SIZE_AUTO,
hildon.BUTTON_ARRANGEMENT_VERTICAL)
self.picker_button_dest = hildon.PickerButton(
gtk.HILDON_SIZE_AUTO,
hildon.BUTTON_ARRANGEMENT_VERTICAL
)
self.picker_button_source.set_sensitive(False)
self.picker_button_dest.set_sensitive(False)
self.picker_button_source.set_title("Gare de Depart")
self.picker_button_dest.set_title("Gare d'arrivee")
self.picker_button_source.set_selector(self.combo_source)
self.picker_button_dest.set_selector(self.combo_dest)
horizontal_box.pack_start(self.picker_button_source)
horizontal_box.pack_start(self.picker_button_dest)
except AttributeError:
self.refresh_button = gtk.Button("Actualiser")
self.retour_button = gtk.Button("Retour")
self.combo_source = gtk.combo_box_entry_new_text()
self.combo_dest = gtk.combo_box_entry_new_text()
horizontal_box.pack_start(self.combo_source)
horizontal_box.pack_start(self.combo_dest)
self.main_window.set_title("Horaires des Prochains Trains")
self.main_window.connect("destroy", self.on_main_window_destroy)
self.refresh_button.connect("clicked", self.on_refresh_button_clicked)
self.retour_button.set_sensitive(False)
self.retour_button.connect("clicked", self.on_retour_button_clicked)
self.treestore = gtk.TreeStore(str, str, str, str, str)
treeview = gtk.TreeView(self.treestore)
treeview.append_column(
gtk.TreeViewColumn(
'Train',
gtk.CellRendererText(),
text=0
))
treeview.append_column(
gtk.TreeViewColumn(
'Horaire',
gtk.CellRendererText(),
text=1
))
treeview.append_column(
gtk.TreeViewColumn(
'Destination',
gtk.CellRendererText(),
text=2
))
treeview.append_column(
gtk.TreeViewColumn(
'Voie',
gtk.CellRendererText(),
text=3
))
treeview.append_column(
gtk.TreeViewColumn(
'Information',
gtk.CellRendererText(),
text=4
))
vertical_box = gtk.VBox()
vertical_box.pack_start(horizontal_box)
horizontal_box.pack_start(self.retour_button)
vertical_box.pack_start(treeview)
vertical_box.pack_start(self.refresh_button)
self.main_window.add(vertical_box)
self.main_window.show_all()
self.fill_touch_selector_entry()
if toolkit != gtk:
self.picker_button_source.connect("value-changed",
self.check_station_input,
self.picker_button_source)
self.picker_button_dest.connect("value-changed",
self.check_station_input,
self.picker_button_dest)
def fill_touch_selector_entry(self):
liste = []
for backend in self.weboob.iter_backends():
for station in backend.iter_station_search(""):
liste.append(station.name.capitalize())
liste.sort()
for station in liste:
self.combo_source.append_text(station)
self.combo_dest.append_text(station)
self.touch_selector_entry_filled = True
if toolkit != gtk:
self.picker_button_source.set_sensitive(True)
def on_main_window_destroy(self, widget):
"exit application at the window close"
gtk.main_quit()
def on_main_window_show(self, param):
self.fill_touch_selector_entry()
def on_retour_button_clicked(self, widget):
"the button is clicked"
debug("on_retour_button_clicked")
self.refresh_in_progress = True
col_source = self.combo_source.get_active(0)
col_dest = self.combo_dest.get_active(0)
self.combo_source.set_active(0, col_dest)
self.combo_dest.set_active(0, col_source)
self.refresh()
def on_refresh_button_clicked(self, widget):
"the refresh button is clicked"
debug("on_refresh_button_clicked")
self.refresh_in_progress = True
try :
self.connection.request_connection(conic.CONNECT_FLAG_NONE)
except AttributeError:
if isinstance(conic, FakeConic):
self.refresh()
else:
raise
def check_station_input(self, widget, user_data):
if self.combo_source.get_current_text() is None :
self.picker_button_dest.set_sensitive(False)
self.refresh_button.set_sensitive(False)
self.retour_button.set_sensitive(False)
else:
self.picker_button_dest.set_sensitive(True)
if self.combo_dest.get_current_text() is None:
self.refresh_button.set_sensitive(False)
self.retour_button.set_sensitive(False)
else:
self.refresh_button.set_sensitive(True)
self.retour_button.set_sensitive(True)
def refresh(self):
"update departures"
banner = hildon.hildon_banner_show_information(self.main_window, "", "Chargement en cours")
banner.set_timeout(10000)
hildon.hildon_gtk_window_set_progress_indicator(self.main_window, 1)
self.treestore.clear()
try :
source_text = self.combo_source.get_current_text()
dest_text = self.combo_dest.get_current_text()
except AttributeError:
source_text = self.combo_source.child.get_text()
dest_text = self.combo_dest.child.get_text()
for backend in self.weboob.iter_backends():
for station in backend.iter_station_search(source_text):
for arrival in \
backend.iter_station_search(dest_text):
for departure in \
backend.iter_station_departures(station.id, arrival.id):
self.treestore.append(None,
[departure.type,
departure.time,
departure.arrival_station,
departure.plateform,
departure.information])
self.refresh_in_progress = False
banner.set_timeout(1)
hildon.hildon_gtk_window_set_progress_indicator(self.main_window, 0)
class Masstransit(BaseApplication):
"Application Class"
APPNAME = 'masstransit'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2010-2011 Julien Hébert'
def main(self, argv):
self.load_backends(ICapTravel)
MasstransitHildon(self.weboob)
gtk.main()
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/monboob/ 0000775 0000000 0000000 00000000000 11666415431 0030732 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000001424 11666415431 0032765 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/monboob # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from .monboob import Monboob
__all__ = ['Monboob']
monboob.py 0000664 0000000 0000000 00000031423 11666415431 0032663 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/monboob # -*- coding: utf-8 -*-
# Copyright(C) 2009-2011 Romain Bignon, Christophe Benz
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from email.mime.text import MIMEText
from smtplib import SMTP
from email.Header import Header, decode_header
from email.Utils import parseaddr, formataddr, formatdate
from email import message_from_file, message_from_string
from smtpd import SMTPServer
import time
import re
import sys
import logging
import asyncore
import subprocess
import socket
from weboob.core import Weboob, CallErrors
from weboob.core.scheduler import Scheduler
from weboob.capabilities.messages import ICapMessages, ICapMessagesPost, Thread, Message
from weboob.tools.application.repl import ReplApplication
from weboob.tools.misc import html2text, get_backtrace, utc2local, to_unicode
__all__ = ['Monboob']
class FakeSMTPD(SMTPServer):
def __init__(self, app, bindaddr, port):
SMTPServer.__init__(self, (bindaddr, port), None)
self.app = app
def process_message(self, peer, mailfrom, rcpttos, data):
msg = message_from_string(data)
self.app.process_incoming_mail(msg)
class MonboobScheduler(Scheduler):
def __init__(self, app):
Scheduler.__init__(self)
self.app = app
def run(self):
if self.app.options.smtpd:
if ':' in self.app.options.smtpd:
host, port = self.app.options.smtpd.split(':', 1)
else:
host = '127.0.0.1'
port = self.app.options.smtpd
try:
FakeSMTPD(self.app, host, int(port))
except socket.error, e:
self.logger.error('Unable to start the SMTP daemon: %s' % e)
return False
# XXX Fuck, we shouldn't copy this piece of code from
# weboob.scheduler.Scheduler.run().
try:
while 1:
self.stop_event.wait(0.1)
if self.app.options.smtpd:
asyncore.loop(timeout=0.1, count=1)
except KeyboardInterrupt:
self._wait_to_stop()
raise
else:
self._wait_to_stop()
return True
class Monboob(ReplApplication):
APPNAME = 'monboob'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2010-2011 Romain Bignon'
DESCRIPTION = 'Daemon allowing to regularly check for new messages on various websites, ' \
'and send an email for each message, and post a reply to a message on a website.'
CONFIG = {'interval': 300,
'domain': 'weboob.example.org',
'recipient': 'weboob@example.org',
'smtp': 'localhost',
'pipe': '',
'html': 0}
CAPS = ICapMessages
DISABLE_REPL = True
def add_application_options(self, group):
group.add_option('-S', '--smtpd', help='run a fake smtpd server and set the port')
def create_weboob(self):
return Weboob(scheduler=MonboobScheduler(self))
def load_default_backends(self):
self.load_backends(ICapMessages, storage=self.create_storage())
def main(self, argv):
self.load_config()
try:
self.config.set('interval', int(self.config.get('interval')))
if self.config.get('interval') < 1:
raise ValueError()
except ValueError:
print >>sys.stderr, 'Configuration error: interval must be an integer >0.'
return 1
try:
self.config.set('html', int(self.config.get('html')))
if self.config.get('html') not in (0,1):
raise ValueError()
except ValueError:
print >>sys.stderr, 'Configuration error: html must be 0 or 1.'
return 2
return ReplApplication.main(self, argv)
def get_email_address_ident(self, msg, header):
s = msg.get(header)
if not s:
return None
m = re.match('.*<([^@]*)@(.*)>', s)
if m:
return m.group(1)
else:
try:
return s.split('@')[0]
except IndexError:
return s
def do_post(self, line):
"""
post
Pipe with a mail to post message.
"""
msg = message_from_file(sys.stdin)
return self.process_incoming_mail(msg)
def process_incoming_mail(self, msg):
to = self.get_email_address_ident(msg, 'To')
sender = msg.get('From')
reply_to = self.get_email_address_ident(msg, 'In-Reply-To')
title = msg.get('Subject')
if title:
new_title = u''
for part in decode_header(title):
if part[1]:
new_title += unicode(part[0], part[1])
else:
new_title += unicode(part[0])
title = new_title
content = u''
for part in msg.walk():
if part.get_content_type() == 'text/plain':
s = part.get_payload(decode=True)
charsets = part.get_charsets() + msg.get_charsets()
for charset in charsets:
try:
if charset is not None:
content += unicode(s, charset)
else:
content += unicode(s)
except UnicodeError, e:
self.logger.warning('Unicode error: %s' % e)
continue
except Exception, e:
self.logger.exception(e)
continue
else:
break
if len(content) == 0:
print >>sys.stderr, 'Unable to send an empty message'
return 1
# remove signature
content = content.split(u'\n-- \n')[0]
parent_id = None
if reply_to is None:
# This is a new message
if '.' in to:
bname, thread_id = to.split('.', 1)
else:
bname = to
thread_id = None
else:
# This is a reply
try:
bname, id = reply_to.split('.', 1)
thread_id, parent_id = id.rsplit('.', 1)
except ValueError:
print >>sys.stderr, 'In-Reply-To header might be in form '
return 1
# Default use the To header field to know the backend to use.
if to and bname != to:
bname = to
try:
backend = self.weboob.backend_instances[bname]
except KeyError:
print >>sys.stderr, 'Backend %s not found' % bname
return 1
if not backend.has_caps(ICapMessagesPost):
print >>sys.stderr, 'The backend %s does not implement ICapMessagesPost' % bname
return 1
thread = Thread(thread_id)
message = Message(thread,
0,
title=title,
sender=sender,
receivers=[to],
parent=Message(thread, parent_id) if parent_id else None,
content=content)
try:
backend.post_message(message)
except Exception, e:
content = u'Unable to send message to %s:\n' % thread_id
content += u'\n\t%s\n' % to_unicode(e)
if logging.root.level == logging.DEBUG:
content += u'\n%s\n' % to_unicode(get_backtrace(e))
self.send_email(backend, Message(thread,
0,
title='Unable to send message',
sender='Monboob',
parent=Message(thread, parent_id) if parent_id else None,
content=content))
def do_run(self, line):
"""
run
Run the fetching daemon.
"""
self.weboob.repeat(self.config.get('interval'), self.process)
self.weboob.loop()
def process(self):
try:
for backend, message in self.weboob.do('iter_unread_messages'):
if self.send_email(backend, message):
backend.set_message_read(message)
except CallErrors, e:
self.bcall_errors_handler(e)
def send_email(self, backend, mail):
domain = self.config.get('domain')
recipient = self.config.get('recipient')
reply_id = ''
if mail.parent:
reply_id = u'<%s.%s@%s>' % (backend.name, mail.parent.full_id, domain)
subject = mail.title
sender = u'"%s" <%s@%s>' % (mail.sender.replace('"', '""') if mail.sender else '',
backend.name, domain)
# assume that .date is an UTC datetime
date = formatdate(time.mktime(utc2local(mail.date).timetuple()), localtime=True)
msg_id = u'<%s.%s@%s>' % (backend.name, mail.full_id, domain)
if self.config.get('html') and mail.flags & mail.IS_HTML:
body = mail.content
content_type = 'html'
else:
if mail.flags & mail.IS_HTML:
body = html2text(mail.content)
else:
body = mail.content
content_type = 'plain'
if body is None:
body = ''
if mail.signature:
if self.config.get('html') and mail.flags & mail.IS_HTML:
body += u'
-- %s
' % mail.signature
else:
body += u'\n\n-- \n'
if mail.flags & mail.IS_HTML:
body += html2text(mail.signature)
else:
body += mail.signature
# Header class is smart enough to try US-ASCII, then the charset we
# provide, then fall back to UTF-8.
header_charset = 'ISO-8859-1'
# We must choose the body charset manually
for body_charset in 'US-ASCII', 'ISO-8859-1', 'UTF-8':
try:
body.encode(body_charset)
except UnicodeError:
pass
else:
break
# Split real name (which is optional) and email address parts
sender_name, sender_addr = parseaddr(sender)
recipient_name, recipient_addr = parseaddr(recipient)
# We must always pass Unicode strings to Header, otherwise it will
# use RFC 2047 encoding even on plain ASCII strings.
sender_name = str(Header(unicode(sender_name), header_charset))
recipient_name = str(Header(unicode(recipient_name), header_charset))
# Make sure email addresses do not contain non-ASCII characters
sender_addr = sender_addr.encode('ascii')
recipient_addr = recipient_addr.encode('ascii')
# Create the message ('plain' stands for Content-Type: text/plain)
msg = MIMEText(body.encode(body_charset), content_type, body_charset)
msg['From'] = formataddr((sender_name, sender_addr))
msg['To'] = formataddr((recipient_name, recipient_addr))
msg['Subject'] = Header(unicode(subject), header_charset)
msg['Message-Id'] = msg_id
msg['Date'] = date
if reply_id:
msg['In-Reply-To'] = reply_id
self.logger.info('Send mail from <%s> to <%s>' % (sender, recipient))
if len(self.config.get('pipe')) > 0:
p = subprocess.Popen(self.config.get('pipe'),
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
p.stdin.write(msg.as_string())
p.stdin.close()
if p.wait() != 0:
self.logger.error('Unable to deliver mail: %s' % p.stdout.read().strip())
return False
else:
# Send the message via SMTP to localhost:25
try:
smtp = SMTP(self.config.get('smtp'))
smtp.sendmail(sender, recipient, msg.as_string())
except Exception, e:
self.logger.error('Unable to deliver mail: %s' % e)
return False
else:
smtp.quit()
return True
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/pastoob/ 0000775 0000000 0000000 00000000000 11666415431 0030746 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000001527 11666415431 0033005 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/pastoob # -*- coding: utf-8 -*-
# vim: ft=python et softtabstop=4 cinoptions=4 shiftwidth=4 ts=4 ai
# Copyright(C) 2011 Laurent Bachelier
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from .pastoob import Pastoob
__all__ = ['Pastoob']
pastoob.py 0000664 0000000 0000000 00000007133 11666415431 0032714 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/pastoob # -*- coding: utf-8 -*-
# Copyright(C) 2011 Laurent Bachelier
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from __future__ import with_statement
import os
import sys
import codecs
import locale
from random import choice
from weboob.capabilities.paste import ICapPaste, PasteNotFound
from weboob.tools.application.repl import ReplApplication
__all__ = ['Pastoob']
class Pastoob(ReplApplication):
APPNAME = 'pastoob'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2011 Laurent Bachelier'
DESCRIPTION = 'Console application allowing to post and get pastes from pastebins.'
CAPS = ICapPaste
def main(self, argv):
self.load_config()
return ReplApplication.main(self, argv)
def do_get(self, _id):
"""
get ID
Get a paste contents.
"""
if not _id:
print >>sys.stderr, 'This command takes an argument: %s' % self.get_command_help('get', short=True)
return 2
try:
paste = self.get_object(_id, 'get_paste', ['contents'])
except PasteNotFound:
print >>sys.stderr, 'Paste not found: %s' % _id
return 3
if not paste:
print >>sys.stderr, 'Unable to handle paste: %s' % _id
return 1
output = codecs.getwriter(sys.stdout.encoding or locale.getpreferredencoding())(sys.stdout)
output.write(paste.contents)
# add a newline unless we are writing
# in a file or in a pipe
if os.isatty(output.fileno()):
output.write('\n')
def do_post(self, filename):
"""
post [FILENAME]
Submit a new paste.
The filename can be '-' for reading standard input (pipe).
"""
if not filename or filename == '-':
contents = self.acquire_input()
else:
try:
with codecs.open(filename, encoding=locale.getpreferredencoding()) as fp:
contents = fp.read()
except IOError, e:
print >>sys.stderr, 'Unable to open file "%s": %s' % (filename, e.strerror)
return 1
# get and sort the backends able to satisfy our requirements
params = self._get_params()
backends = {}
for backend in self.weboob.iter_backends():
score = backend.can_post(contents, **params)
if score:
backends.setdefault(score, []).append(backend)
# select a random backend from the best scores
if len(backends):
backend = choice(backends[max(backends.keys())])
else:
print >>sys.stderr, 'No suitable backend found.'
return 1
p = backend.new_paste(_id=None)
p.public = params.get('public')
p.title = os.path.basename(filename)
p.contents = contents
backend.post_paste(p, max_age=params.get('max_age'))
print 'Successfuly posted paste: %s' % p.page_url
def _get_params(self):
return {'public': True, 'max_age': 3600*24*3}
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qboobmsg/ 0000775 0000000 0000000 00000000000 11666415431 0031110 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000000067 11666415431 0033145 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qboobmsg from .qboobmsg import QBoobMsg
__all__ = ['QBoobMsg']
main_window.py 0000664 0000000 0000000 00000003600 11666415431 0033715 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qboobmsg # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from PyQt4.QtCore import SIGNAL
from weboob.tools.application.qt import QtMainWindow
from weboob.tools.application.qt.backendcfg import BackendCfg
from weboob.capabilities.messages import ICapMessages
from .ui.main_window_ui import Ui_MainWindow
from .messages_manager import MessagesManager
class MainWindow(QtMainWindow):
def __init__(self, config, weboob, parent=None):
QtMainWindow.__init__(self, parent)
self.ui = Ui_MainWindow()
self.ui.setupUi(self)
self.config = config
self.weboob = weboob
self.manager = MessagesManager(weboob, self)
self.setCentralWidget(self.manager)
self.connect(self.ui.actionBackends, SIGNAL("triggered()"), self.backendsConfig)
self.connect(self.ui.actionRefresh, SIGNAL("triggered()"), self.refresh)
if self.weboob.count_backends() == 0:
self.backendsConfig()
else:
self.centralWidget().load()
def backendsConfig(self):
bckndcfg = BackendCfg(self.weboob, (ICapMessages,), self)
if bckndcfg.run():
self.centralWidget().load()
def refresh(self):
self.centralWidget().refreshThreads()
messages_manager.py 0000664 0000000 0000000 00000023312 11666415431 0034705 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qboobmsg # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
import time
import logging
from PyQt4.QtGui import QWidget, QTreeWidgetItem, QListWidgetItem, QMessageBox, QBrush
from PyQt4.QtCore import SIGNAL, Qt
from weboob.capabilities.messages import ICapMessages, ICapMessagesPost, Message
from weboob.tools.application.qt import QtDo
from weboob.tools.misc import to_unicode
from .ui.messages_manager_ui import Ui_MessagesManager
class MessagesManager(QWidget):
def __init__(self, weboob, parent=None):
QWidget.__init__(self, parent)
self.ui = Ui_MessagesManager()
self.ui.setupUi(self)
self.weboob = weboob
self.ui.backendsList.setCurrentRow(0)
self.backend = None
self.thread = None
self.message = None
self.ui.replyButton.setEnabled(False)
self.ui.replyWidget.hide()
self.connect(self.ui.backendsList, SIGNAL('itemSelectionChanged()'), self._backendChanged)
self.connect(self.ui.threadsList, SIGNAL('itemSelectionChanged()'), self._threadChanged)
self.connect(self.ui.messagesTree, SIGNAL('itemClicked(QTreeWidgetItem *, int)'), self._messageSelected)
self.connect(self.ui.messagesTree, SIGNAL('itemActivated(QTreeWidgetItem *, int)'), self._messageSelected)
self.connect(self.ui.replyButton, SIGNAL('clicked()'), self._replyPressed)
self.connect(self.ui.sendButton, SIGNAL('clicked()'), self._sendPressed)
def load(self):
self.ui.backendsList.clear()
self.ui.backendsList.addItem('(All)')
for backend in self.weboob.iter_backends():
if not backend.has_caps(ICapMessages):
continue
item = QListWidgetItem(backend.name.capitalize())
item.setData(Qt.UserRole, backend)
self.ui.backendsList.addItem(item)
self.refreshThreads()
def _backendChanged(self):
selection = self.ui.backendsList.selectedItems()
if not selection:
self.backend = None
return
self.backend = selection[0].data(Qt.UserRole).toPyObject()
self.refreshThreads()
def refreshThreads(self):
self.ui.messagesTree.clear()
self.ui.threadsList.clear()
self.hideReply()
self.ui.replyButton.setEnabled(False)
self.ui.backendsList.setEnabled(False)
self.ui.threadsList.setEnabled(False)
self.process_threads = QtDo(self.weboob, self._gotThread)
self.process_threads.do('iter_threads', backends=self.backend, caps=ICapMessages)
def _gotThread(self, backend, thread):
if not backend:
self.process_threads = None
self.ui.backendsList.setEnabled(True)
self.ui.threadsList.setEnabled(True)
return
item = QListWidgetItem(thread.title)
item.setData(Qt.UserRole, (thread.backend, thread.id))
self.ui.threadsList.addItem(item)
def _threadChanged(self):
self.ui.messagesTree.clear()
selection = self.ui.threadsList.selectedItems()
if not selection:
return
t = selection[0].data(Qt.UserRole).toPyObject()
self.refreshThreadMessages(*t)
def refreshThreadMessages(self, backend, id):
self.ui.messagesTree.clear()
self.ui.messageBody.clear()
self.ui.backendsList.setEnabled(False)
self.ui.threadsList.setEnabled(False)
self.ui.replyButton.setEnabled(False)
self.hideReply()
self.process = QtDo(self.weboob, self._gotThreadMessages)
self.process.do('get_thread', id, backends=backend)
def _gotThreadMessages(self, backend, thread):
if thread is None:
self.ui.backendsList.setEnabled(True)
self.ui.threadsList.setEnabled(True)
self.process = None
return
self.thread = thread
self.showMessage(thread.root)
self._insert_message(thread.root, self.ui.messagesTree.invisibleRootItem())
self.ui.messagesTree.expandAll()
def _insert_message(self, message, top):
item = QTreeWidgetItem(None, [message.title or '', message.sender or 'Unknown',
time.strftime('%Y-%m-%d %H:%M:%S', message.date.timetuple())])
item.setData(0, Qt.UserRole, message)
if message.flags & message.IS_UNREAD:
item.setForeground(0, QBrush(Qt.darkYellow))
item.setForeground(1, QBrush(Qt.darkYellow))
item.setForeground(2, QBrush(Qt.darkYellow))
top.addChild(item)
if message.children is not None:
for child in message.children:
self._insert_message(child, item)
def _messageSelected(self, item, column):
message = item.data(0, Qt.UserRole).toPyObject()
self.showMessage(message, item)
def showMessage(self, message, item=None):
backend = self.weboob.get_backend(message.thread.backend)
if backend.has_caps(ICapMessagesPost):
self.ui.replyButton.setEnabled(True)
self.message = message
if message.title.startswith('Re:'):
self.ui.titleEdit.setText(message.title)
else:
self.ui.titleEdit.setText('Re: %s' % message.title)
if message.flags & message.IS_HTML:
content = message.content
else:
content = message.content.replace('&', '&').replace('<', '<').replace('>', '>').replace('\n', ' ')
extra = u''
if message.flags & message.IS_NOT_ACCUSED:
extra += u'Status: Unread '
elif message.flags & message.IS_ACCUSED:
extra += u'Status: Read '
elif message.flags & message.IS_UNREAD:
extra += u'Status: New '
self.ui.messageBody.setText("
%s
"
"Date: %s "
"From: %s "
"%s"
"
%s
"
% (message.title, str(message.date), message.sender, extra, content))
if item and message.flags & message.IS_UNREAD:
backend.set_message_read(message)
message.flags &= ~message.IS_UNREAD
item.setForeground(0, QBrush())
item.setForeground(1, QBrush())
item.setForeground(2, QBrush())
def displayReply(self):
self.ui.replyButton.setText(self.tr('Cancel'))
self.ui.replyWidget.show()
def hideReply(self):
self.ui.replyButton.setText(self.tr('Reply'))
self.ui.replyWidget.hide()
self.ui.replyEdit.clear()
self.ui.titleEdit.clear()
def _replyPressed(self):
if self.ui.replyWidget.isVisible():
self.hideReply()
else:
self.displayReply()
def _sendPressed(self):
if not self.ui.replyWidget.isVisible():
return
text = unicode(self.ui.replyEdit.toPlainText())
title = unicode(self.ui.titleEdit.text())
self.ui.backendsList.setEnabled(False)
self.ui.threadsList.setEnabled(False)
self.ui.messagesTree.setEnabled(False)
self.ui.replyButton.setEnabled(False)
self.ui.replyWidget.setEnabled(False)
self.ui.sendButton.setText(self.tr('Sending...'))
flags = 0
if self.ui.htmlBox.currentIndex() == 0:
flags = Message.IS_HTML
m = Message(thread=self.thread,
id=0,
title=title,
sender=None,
receivers=None,
content=text,
parent=self.message,
flags=flags)
self.process_reply = QtDo(self.weboob, self._postReply_cb, self._postReply_eb)
self.process_reply.do('post_message', m, backends=self.thread.backend)
def _postReply_cb(self, backend, ignored):
if not backend:
return
self.ui.backendsList.setEnabled(True)
self.ui.threadsList.setEnabled(True)
self.ui.messagesTree.setEnabled(True)
self.ui.replyButton.setEnabled(True)
self.ui.replyWidget.setEnabled(True)
self.ui.sendButton.setEnabled(True)
self.ui.sendButton.setText(self.tr('Send'))
self.hideReply()
self.process_reply = None
self.refreshThreadMessages(backend.name, self.thread.id)
def _postReply_eb(self, backend, error, backtrace):
content = unicode(self.tr('Unable to send message:\n%s\n')) % to_unicode(error)
if logging.root.level == logging.DEBUG:
content += '\n%s\n' % to_unicode(backtrace)
QMessageBox.critical(self, self.tr('Error while posting reply'),
content, QMessageBox.Ok)
self.ui.backendsList.setEnabled(True)
self.ui.threadsList.setEnabled(True)
self.ui.messagesTree.setEnabled(True)
self.ui.replyButton.setEnabled(True)
self.ui.replyWidget.setEnabled(True)
self.ui.sendButton.setText(self.tr('Send'))
self.process_reply = None
qboobmsg.py 0000664 0000000 0000000 00000002531 11666415431 0033215 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qboobmsg # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from weboob.capabilities.messages import ICapMessages
from weboob.tools.application.qt import QtApplication
from .main_window import MainWindow
class QBoobMsg(QtApplication):
APPNAME = 'qboobmsg'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2010-2011 Romain Bignon'
DESCRIPTION = 'Qt application allowing to read messages on various websites and reply to them.'
CAPS = ICapMessages
def main(self, argv):
self.load_backends(ICapMessages, storage=self.create_storage())
self.main_window = MainWindow(self.config, self.weboob)
self.main_window.show()
return self.weboob.loop()
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qboobmsg/ui/ 0000775 0000000 0000000 00000000000 11666415431 0031525 5 ustar 00root root 0000000 0000000 Makefile 0000664 0000000 0000000 00000000265 11666415431 0033111 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qboobmsg/ui UI_FILES = $(wildcard *.ui)
UI_PY_FILES = $(UI_FILES:%.ui=%_ui.py)
PYUIC = pyuic4
all: $(UI_PY_FILES)
%_ui.py: %.ui
$(PYUIC) -o $@ $^
clean:
rm -f *.pyc
rm -f $(UI_PY_FILES)
__init__.py 0000664 0000000 0000000 00000000000 11666415431 0033545 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qboobmsg/ui main_window.ui 0000664 0000000 0000000 00000004306 11666415431 0034323 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qboobmsg/ui
MainWindow00763580QBoobMsg0076320FiletoolBarTopToolBarAreafalseBackendsQuitQuitCtrl+QRefreshactionQuittriggered()MainWindowclose()-1-1381289
messages_manager.ui 0000664 0000000 0000000 00000021005 11666415431 0035304 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qboobmsg/ui
MessagesManager00696591Qt::Horizontal0015016777215Qt::HorizontalQt::Vertical0050QFrame::StyledPanelQFrame::Raised929200+00−Qt::Horizontal4020QAbstractItemView::NoEditTriggerstruetruetruetrue150true150trueTitleFromDateQt::Vertical05trueQt::LinksAccessibleByKeyboard|Qt::LinksAccessibleByMouse|Qt::TextBrowserInteraction|Qt::TextSelectableByKeyboard|Qt::TextSelectableByMouseQFrame::StyledPanelQFrame::RaisedReply0With HTMLWithout HTMLSendexpandButtonclicked()messagesTreeexpandAll()73331527150collapseButtonclicked()messagesTreecollapseAll()73360527150
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qhavesex/ 0000775 0000000 0000000 00000000000 11666415431 0031123 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000000067 11666415431 0033160 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qhavesex from .qhavesex import QHaveSex
__all__ = ['QHaveSex']
contacts.py 0000664 0000000 0000000 00000046044 11666415431 0033244 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qhavesex # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
import time
import logging
from PyQt4.QtGui import QWidget, QListWidgetItem, QImage, QIcon, QPixmap, \
QFrame, QMessageBox, QTabWidget, QVBoxLayout, \
QFormLayout, QLabel, QPushButton
from PyQt4.QtCore import SIGNAL, Qt
from weboob.tools.application.qt import QtDo, HTMLDelegate
from weboob.tools.misc import to_unicode
from weboob.capabilities.contact import ICapContact, Contact
from weboob.capabilities.chat import ICapChat
from weboob.capabilities.messages import ICapMessages, ICapMessagesPost, Message
from weboob.capabilities.base import NotLoaded
from .ui.contacts_ui import Ui_Contacts
from .ui.contact_thread_ui import Ui_ContactThread
from .ui.thread_message_ui import Ui_ThreadMessage
from .ui.profile_ui import Ui_Profile
class ThreadMessage(QFrame):
"""
This class represents a message in the thread tab.
"""
def __init__(self, message, parent=None):
QFrame.__init__(self, parent)
self.ui = Ui_ThreadMessage()
self.ui.setupUi(self)
self.set_message(message)
def set_message(self, message):
self.message = message
self.ui.nameLabel.setText(message.sender)
header = time.strftime('%Y-%m-%d %H:%M:%S', message.date.timetuple())
if message.flags & message.IS_NOT_ACCUSED:
header += u' — Unread'
elif message.flags & message.IS_ACCUSED:
header += u' — Read'
self.ui.headerLabel.setText(header)
if message.flags & message.IS_HTML:
content = message.content
else:
content = message.content.replace('&', '&').replace('<', '<').replace('>', '>').replace('\n', ' ')
self.ui.contentLabel.setText(content)
def __eq__(self, m):
return self.message == m.message
class ContactThread(QWidget):
"""
The thread of the selected contact.
"""
def __init__(self, weboob, contact, support_reply, parent=None):
QWidget.__init__(self, parent)
self.ui = Ui_ContactThread()
self.ui.setupUi(self)
self.weboob = weboob
self.contact = contact
self.thread = None
self.messages = []
self.process_msg = None
self.connect(self.ui.refreshButton, SIGNAL('clicked()'), self.refreshMessages)
if support_reply:
self.connect(self.ui.sendButton, SIGNAL('clicked()'), self.postReply)
else:
self.ui.frame.hide()
self.refreshMessages()
def refreshMessages(self, fillobj=False):
if self.process_msg:
return
self.ui.refreshButton.setEnabled(False)
self.process_msg = QtDo(self.weboob, self.gotThread, self.gotError)
if fillobj and self.thread:
self.process_msg.do('fillobj', self.thread, ['root'], backends=self.contact.backend)
else:
self.process_msg.do('get_thread', self.contact.id, backends=self.contact.backend)
def gotError(self, backend, error, backtrace):
self.ui.textEdit.setEnabled(False)
self.ui.sendButton.setEnabled(False)
self.ui.refreshButton.setEnabled(True)
def gotThread(self, backend, thread):
if not thread:
#v = self.ui.scrollArea.verticalScrollBar()
#print v.minimum(), v.value(), v.maximum(), v.sliderPosition()
#self.ui.scrollArea.verticalScrollBar().setValue(self.ui.scrollArea.verticalScrollBar().maximum())
self.process_msg = None
return
self.ui.textEdit.setEnabled(True)
self.ui.sendButton.setEnabled(True)
self.ui.refreshButton.setEnabled(True)
self.thread = thread
if thread.root is NotLoaded:
self._insert_load_button(0)
else:
for message in thread.iter_all_messages():
self._insert_message(message)
def _insert_message(self, message):
widget = ThreadMessage(message)
if widget in self.messages:
old_widget = self.messages[self.messages.index(widget)]
if old_widget.message.flags != widget.message.flags:
old_widget.set_message(widget.message)
return
for i, m in enumerate(self.messages):
if widget.message.date > m.message.date:
self.ui.scrollAreaContent.layout().insertWidget(i, widget)
self.messages.insert(i, widget)
if message.parent is NotLoaded:
self._insert_load_button(i)
return
self.ui.scrollAreaContent.layout().addWidget(widget)
self.messages.append(widget)
if message.parent is NotLoaded:
self._insert_load_button(-1)
def _insert_load_button(self, pos):
button = QPushButton(self.tr('More messages...'))
self.connect(button, SIGNAL('clicked()'), lambda: self._load_button_pressed(button))
if pos >= 0:
self.ui.scrollAreaContent.layout().insertWidget(pos, button)
else:
self.ui.scrollAreaContent.layout().addWidget(button)
def _load_button_pressed(self, button):
self.ui.scrollAreaContent.layout().removeWidget(button)
button.hide()
button.deleteLater()
self.refreshMessages(fillobj=True)
def postReply(self):
text = unicode(self.ui.textEdit.toPlainText())
self.ui.textEdit.setEnabled(False)
self.ui.sendButton.setEnabled(False)
m = Message(thread=self.thread,
id=0,
title=u'',
sender=None,
receivers=None,
content=text,
parent=self.messages[0].message if len(self.messages) > 0 else None)
self.process_reply = QtDo(self.weboob, self._postReply_cb, self._postReply_eb)
self.process_reply.do('post_message', m, backends=self.contact.backend)
def _postReply_cb(self, backend, ignored):
if not backend:
return
self.ui.textEdit.clear()
self.ui.textEdit.setEnabled(True)
self.ui.sendButton.setEnabled(True)
self.refreshMessages()
self.process_reply = None
def _postReply_eb(self, backend, error, backtrace):
content = unicode(self.tr('Unable to send message:\n%s\n')) % to_unicode(error)
if logging.root.level == logging.DEBUG:
content += '\n%s\n' % to_unicode(backtrace)
QMessageBox.critical(self, self.tr('Error while posting reply'),
content, QMessageBox.Ok)
self.process_reply = None
class ContactProfile(QWidget):
def __init__(self, weboob, contact, parent=None):
QWidget.__init__(self, parent)
self.ui = Ui_Profile()
self.ui.setupUi(self)
self.connect(self.ui.previousButton, SIGNAL('clicked()'), self.previousClicked)
self.connect(self.ui.nextButton, SIGNAL('clicked()'), self.nextClicked)
self.weboob = weboob
self.contact = contact
self.loaded_profile = False
self.displayed_photo_idx = 0
self.process_photo = {}
missing_fields = self.gotProfile(self.weboob.get_backend(contact.backend), contact)
if len(missing_fields) > 0:
self.process_contact = QtDo(self.weboob, self.gotProfile, self.gotError)
self.process_contact.do('fillobj', self.contact, missing_fields, backends=self.contact.backend)
def gotError(self, backend, error, backtrace):
self.ui.frame_photo.hide()
self.ui.descriptionEdit.setText('
Unable to show profile
%s
' % to_unicode(error))
def gotProfile(self, backend, contact):
if not backend:
return []
missing_fields = set()
self.display_photo()
self.ui.nicknameLabel.setText('
' % contact.summary.replace('\n', ' '))
if not contact.profile:
missing_fields.add('profile')
elif not self.loaded_profile:
self.loaded_profile = True
for head in contact.profile.itervalues():
if head.flags & head.HEAD:
widget = self.ui.headWidget
else:
widget = self.ui.profileTab
self.process_node(head, widget)
return missing_fields
def process_node(self, node, widget):
# Set the value widget
value = None
if node.flags & node.SECTION:
value = QWidget()
value.setLayout(QFormLayout())
for sub in node.value.itervalues():
self.process_node(sub, value)
elif isinstance(node.value, list):
value = QLabel(' '.join(unicode(s) for s in node.value))
value.setWordWrap(True)
elif isinstance(node.value, tuple):
value = QLabel(', '.join(unicode(s) for s in node.value))
value.setWordWrap(True)
elif isinstance(node.value, (basestring,int,long,float)):
value = QLabel(unicode(node.value))
else:
logging.warning('Not supported value: %r' % node.value)
return
# Insert the value widget into the parent widget, depending
# of its type.
if isinstance(widget, QTabWidget):
widget.addTab(value, node.label)
elif isinstance(widget.layout(), QFormLayout):
label = QLabel(u'%s: ' % node.label)
widget.layout().addRow(label, value)
elif isinstance(widget.layout(), QVBoxLayout):
widget.layout().addWidget(QLabel(u'
%s
' % node.label))
widget.layout().addWidget(value)
else:
logging.warning('Not supported widget: %r' % widget)
def previousClicked(self):
self.displayed_photo_idx = (self.displayed_photo_idx - 1) % len(self.contact.photos)
self.display_photo()
def nextClicked(self):
self.displayed_photo_idx = (self.displayed_photo_idx + 1) % len(self.contact.photos)
self.display_photo()
def display_photo(self):
if self.displayed_photo_idx >= len(self.contact.photos):
self.displayed_photo_idx = len(self.contact.photos) - 1
if self.displayed_photo_idx < 0:
self.ui.photoUrlLabel.setText('')
return
photo = self.contact.photos.values()[self.displayed_photo_idx]
if photo.data:
data = photo.data
if photo.id in self.process_photo:
self.process_photo.pop(photo.id)
else:
self.process_photo[photo.id] = QtDo(self.weboob, lambda b,p: self.display_photo())
self.process_photo[photo.id].do('fillobj', photo, ['data'], backends=self.contact.backend)
if photo.thumbnail_data:
data = photo.thumbnail_data
else:
return
img = QImage.fromData(data)
img = img.scaledToWidth(self.width()/3)
self.ui.photoLabel.setPixmap(QPixmap.fromImage(img))
if photo.url is not NotLoaded:
text = '%s' % (photo.url, photo.url)
if photo.hidden:
text += ' (Hidden photo)'
self.ui.photoUrlLabel.setText(text)
class IGroup(object):
def __init__(self, weboob, id, name):
self.id = id
self.name = name
self.weboob = weboob
def iter_contacts(self, cb):
raise NotImplementedError()
class MetaGroup(IGroup):
def iter_contacts(self, cb):
if self.id == 'online':
status = Contact.STATUS_ONLINE|Contact.STATUS_AWAY
elif self.id == 'offline':
status = Contact.STATUS_OFFLINE
else:
status = Contact.STATUS_ALL
self.process = QtDo(self.weboob, lambda b, d: self.cb(cb, b, d))
self.process.do('iter_contacts', status, caps=ICapContact)
def cb(self, cb, backend, contact):
if contact:
cb(contact)
elif not backend:
self.process = None
cb(None)
class ContactsWidget(QWidget):
def __init__(self, weboob, parent=None):
QWidget.__init__(self, parent)
self.ui = Ui_Contacts()
self.ui.setupUi(self)
self.weboob = weboob
self.contact = None
self.ui.contactList.setItemDelegate(HTMLDelegate())
self.url_process = None
self.photo_processes = {}
self.ui.groupBox.addItem('All', MetaGroup(self.weboob, 'all', self.tr('All')))
self.ui.groupBox.addItem('Onlines', MetaGroup(self.weboob, 'online', self.tr('Online')))
self.ui.groupBox.addItem('Offlines', MetaGroup(self.weboob, 'offline', self.tr('Offline')))
self.ui.groupBox.setCurrentIndex(1)
self.connect(self.ui.groupBox, SIGNAL('currentIndexChanged(int)'), self.groupChanged)
self.connect(self.ui.contactList, SIGNAL('itemClicked(QListWidgetItem*)'), self.contactChanged)
self.connect(self.ui.refreshButton, SIGNAL('clicked()'), self.refreshContactList)
self.connect(self.ui.urlButton, SIGNAL('clicked()'), self.urlClicked)
def load(self):
self.refreshContactList()
self.ui.backendsList.clear()
for backend in self.weboob.iter_backends():
self.ui.backendsList.addItem(backend.name)
def groupChanged(self, i):
self.refreshContactList()
def refreshContactList(self):
self.ui.contactList.clear()
self.ui.refreshButton.setEnabled(False)
i = self.ui.groupBox.currentIndex()
group = self.ui.groupBox.itemData(i).toPyObject()
group.iter_contacts(self.addContact)
def setPhoto(self, contact, item):
if not contact:
return False
try:
self.photo_processes.pop(contact.id, None)
except KeyError:
pass
img = None
for photo in contact.photos.itervalues():
if photo.thumbnail_data:
img = QImage.fromData(photo.thumbnail_data)
break
if img:
item.setIcon(QIcon(QPixmap.fromImage(img)))
return True
return False
def addContact(self, contact):
if not contact:
self.ui.refreshButton.setEnabled(True)
return
status = ''
if contact.status == Contact.STATUS_ONLINE:
status = u'Online'
status_color = 0x00aa00
elif contact.status == Contact.STATUS_OFFLINE:
status = u'Offline'
status_color = 0xff0000
elif contact.status == Contact.STATUS_AWAY:
status = u'Away'
status_color = 0xffad16
else:
status = u'Unknown'
status_color = 0xaaaaaa
if contact.status_msg:
status += u' — %s' % contact.status_msg
item = QListWidgetItem()
item.setText('
%s
%s %s' % (contact.name, status_color, status, contact.backend))
item.setData(Qt.UserRole, contact)
if contact.photos is NotLoaded:
process = QtDo(self.weboob, lambda b, c: self.setPhoto(c, item))
process.do('fillobj', contact, ['photos'], backends=contact.backend)
self.photo_processes[contact.id] = process
elif len(contact.photos) > 0:
if not self.setPhoto(contact, item):
photo = contact.photos.values()[0]
process = QtDo(self.weboob, lambda b, p: self.setPhoto(contact, item))
process.do('fillobj', photo, ['thumbnail_data'], backends=contact.backend)
self.photo_processes[contact.id] = process
for i in xrange(self.ui.contactList.count()):
if self.ui.contactList.item(i).data(Qt.UserRole).toPyObject().status > contact.status:
self.ui.contactList.insertItem(i, item)
return
self.ui.contactList.addItem(item)
def contactChanged(self, current):
if not current:
return
contact = current.data(Qt.UserRole).toPyObject()
self.setContact(contact)
def setContact(self, contact):
if not contact or contact == self.contact:
return
self.ui.tabWidget.clear()
self.contact = contact
backend = self.weboob.get_backend(self.contact.backend)
self.ui.tabWidget.addTab(ContactProfile(self.weboob, self.contact), self.tr('Profile'))
if backend.has_caps(ICapMessages):
self.ui.tabWidget.addTab(ContactThread(self.weboob, self.contact, backend.has_caps(ICapMessagesPost)), self.tr('Messages'))
if backend.has_caps(ICapChat):
self.ui.tabWidget.addTab(QWidget(), self.tr('Chat'))
self.ui.tabWidget.addTab(QWidget(), self.tr('Calendar'))
self.ui.tabWidget.addTab(QWidget(), self.tr('Notes'))
def urlClicked(self):
url = unicode(self.ui.urlEdit.text())
if not url:
return
backend_name = unicode(self.ui.backendsList.currentText())
self.ui.urlButton.setEnabled(False)
self.url_process = QtDo(self.weboob, self.urlClicked_cb, self.urlClicked_eb)
self.url_process.do('get_contact', url, backends=backend_name)
def urlClicked_cb(self, backend, contact):
if not backend:
self.url_process = None
self.ui.urlButton.setEnabled(True)
return
self.ui.urlEdit.clear()
self.setContact(contact)
def urlClicked_eb(self, backend, error, backtrace):
content = unicode(self.tr('Unable to get contact:\n%s\n')) % to_unicode(error)
if logging.root.level == logging.DEBUG:
content += u'\n%s\n' % to_unicode(backtrace)
QMessageBox.critical(self, self.tr('Error while getting contact'),
content, QMessageBox.Ok)
main_window.py 0000664 0000000 0000000 00000005156 11666415431 0033740 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qhavesex # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from PyQt4.QtGui import QWidget
from PyQt4.QtCore import SIGNAL
from weboob.tools.application.qt import QtMainWindow
from weboob.tools.application.qt.backendcfg import BackendCfg
from weboob.capabilities.dating import ICapDating
try:
from weboob.applications.qboobmsg.messages_manager import MessagesManager
HAVE_BOOBMSG = True
except ImportError:
HAVE_BOOBMSG = False
from .ui.main_window_ui import Ui_MainWindow
from .status import AccountsStatus
from .contacts import ContactsWidget
class MainWindow(QtMainWindow):
def __init__(self, config, weboob, parent=None):
QtMainWindow.__init__(self, parent)
self.ui = Ui_MainWindow()
self.ui.setupUi(self)
self.config = config
self.weboob = weboob
self.loaded_tabs = {}
self.connect(self.ui.actionBackends, SIGNAL("triggered()"), self.backendsConfig)
self.connect(self.ui.tabWidget, SIGNAL('currentChanged(int)'), self.tabChanged)
self.ui.tabWidget.addTab(AccountsStatus(self.weboob), self.tr('Status'))
if HAVE_BOOBMSG:
self.ui.tabWidget.addTab(MessagesManager(self.weboob), self.tr('Messages'))
self.ui.tabWidget.addTab(ContactsWidget(self.weboob), self.tr('Contacts'))
self.ui.tabWidget.addTab(QWidget(), self.tr('Calendar'))
self.ui.tabWidget.addTab(QWidget(), self.tr('Optimizations'))
if self.weboob.count_backends() == 0:
self.backendsConfig()
def backendsConfig(self):
bckndcfg = BackendCfg(self.weboob, (ICapDating,), self)
if bckndcfg.run():
self.loaded_tabs.clear()
widget = self.ui.tabWidget.widget(self.ui.tabWidget.currentIndex())
widget.load()
def tabChanged(self, i):
widget = self.ui.tabWidget.currentWidget()
if hasattr(widget, 'load') and not i in self.loaded_tabs:
widget.load()
self.loaded_tabs[i] = True
qhavesex.py 0000664 0000000 0000000 00000002577 11666415431 0033255 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qhavesex # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from weboob.capabilities.dating import ICapDating
from weboob.tools.application.qt import QtApplication
from .main_window import MainWindow
class QHaveSex(QtApplication):
APPNAME = 'qhavesex'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2010-2011 Romain Bignon'
DESCRIPTION = 'Qt application allowing to interact with various dating websites.'
CAPS = ICapDating
STORAGE_FILENAME = 'dating.storage'
def main(self, argv):
self.create_storage(self.STORAGE_FILENAME)
self.load_backends(ICapDating)
self.main_window = MainWindow(self.config, self.weboob)
self.main_window.show()
return self.weboob.loop()
status.py 0000664 0000000 0000000 00000010527 11666415431 0032746 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qhavesex # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from PyQt4.QtGui import QScrollArea, QWidget, QHBoxLayout, QVBoxLayout, QFrame, QLabel, QImage, QPixmap
from weboob.capabilities.account import ICapAccount, StatusField
from weboob.tools.application.qt import QtDo
from weboob.tools.misc import to_unicode
class Account(QFrame):
def __init__(self, weboob, backend, parent=None):
QFrame.__init__(self, parent)
self.setFrameShape(QFrame.StyledPanel)
self.setFrameShadow(QFrame.Raised)
self.weboob = weboob
self.backend = backend
self.setLayout(QVBoxLayout())
self.timer = None
head = QHBoxLayout()
headw = QWidget()
headw.setLayout(head)
self.title = QLabel(u'
%s — %s
' % (backend.name, backend.DESCRIPTION))
self.body = QLabel()
if backend.ICON:
self.icon = QLabel()
img = QImage(backend.ICON)
self.icon.setPixmap(QPixmap.fromImage(img))
head.addWidget(self.icon)
head.addWidget(self.title)
head.addStretch()
self.layout().addWidget(headw)
if backend.has_caps(ICapAccount):
self.body.setText(u'Waiting...')
self.layout().addWidget(self.body)
self.timer = self.weboob.repeat(60, self.updateStats)
def deinit(self):
if self.timer is not None:
self.weboob.stop(self.timer)
def updateStats(self):
self.process = QtDo(self.weboob, self.updateStats_cb, self.updateStats_eb)
self.process.body = u''
self.process.in_p = False
self.process.do('get_account_status', backends=self.backend)
def updateStats_cb(self, backend, field):
if not field:
if self.process.in_p:
self.process.body += u"
"
self.body.setText(self.process.body)
self.process = None
return
if field.flags & StatusField.FIELD_HTML:
value = u'%s' % field.value
else:
value = (u'%s' % field.value).replace('&', '&').replace('<', '<').replace('>', '>')
if field.flags & StatusField.FIELD_TEXT:
if self.process.in_p:
self.process.body += u''
self.process.body += u'
%s
' % value
self.process.in_p = False
else:
if not self.process.in_p:
self.process.body += u"
"
self.process.in_p = True
else:
self.process.body += u" "
self.process.body += u'%s: %s' % (field.label, field.value)
def updateStats_eb(self, backend, err, backtrace):
self.body.setText(u'Unable to connect: %s' % to_unicode(err))
self.title.setText(u'%s' % unicode(self.title.text()))
class AccountsStatus(QScrollArea):
def __init__(self, weboob, parent=None):
QScrollArea.__init__(self, parent)
self.weboob = weboob
self.setFrameShadow(self.Plain)
self.setFrameShape(self.NoFrame)
self.setWidgetResizable(True)
widget = QWidget(self)
widget.setLayout(QVBoxLayout())
widget.show()
self.setWidget(widget)
def load(self):
while self.widget().layout().count() > 0:
item = self.widget().layout().takeAt(0)
if item.widget():
item.widget().deinit()
item.widget().hide()
item.widget().deleteLater()
for backend in self.weboob.iter_backends():
account = Account(self.weboob, backend)
self.widget().layout().addWidget(account)
self.widget().layout().addStretch()
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qhavesex/ui/ 0000775 0000000 0000000 00000000000 11666415431 0031540 5 ustar 00root root 0000000 0000000 Makefile 0000664 0000000 0000000 00000000265 11666415431 0033124 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qhavesex/ui UI_FILES = $(wildcard *.ui)
UI_PY_FILES = $(UI_FILES:%.ui=%_ui.py)
PYUIC = pyuic4
all: $(UI_PY_FILES)
%_ui.py: %.ui
$(PYUIC) -o $@ $^
clean:
rm -f *.pyc
rm -f $(UI_PY_FILES)
__init__.py 0000664 0000000 0000000 00000000000 11666415431 0033560 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qhavesex/ui contact_thread.ui 0000664 0000000 0000000 00000011121 11666415431 0034776 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qhavesex/ui
ContactThread00578429FormQt::Vertical00QFrame::StyledPanelQFrame::Raisedfalsefalse00Send050Qt::Horizontal4020../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../usr/share/icons/oxygen/16x16/actions/view-refresh.png../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../usr/share/icons/oxygen/16x16/actions/view-refresh.png01Qt::ScrollBarAsNeededtrueQt::AlignBottom|Qt::AlignLeading|Qt::AlignLeft00556154QWidget#scrollAreaContent {
background-color: rgb(255, 255, 255);
}
contacts.ui 0000664 0000000 0000000 00000007306 11666415431 0033644 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qhavesex/ui
Contacts00478374FormQt::Horizontal00QFrame::StyledPanelQFrame::Raised../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../usr/share/icons/oxygen/16x16/actions/view-refresh.png../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../../usr/share/icons/oxygen/16x16/actions/view-refresh.png1201201truefalsetrueFrom URLDisplay10
main_window.ui 0000664 0000000 0000000 00000004166 11666415431 0034342 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qhavesex/ui
MainWindow00763580QHaveSex-10076324FiletoolBarTopToolBarAreafalseBackendsQuitQuitactionQuittriggered()MainWindowclose()-1-1381289
profile.ui 0000664 0000000 0000000 00000020655 11666415431 0033470 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qhavesex/ui
Profile00755647Form0QFrame::NoFrameQFrame::Plaintrue0075564700000QFrame::StyledPanelQFrame::Raised<h1>Loading...</h1><b>URL:</b>trueQt::LinksAccessibleByMouse|Qt::TextSelectableByMouse0900Qt::Vertical200QFrame::StyledPanelQFrame::Raised002016777215<Qt::AlignCenter002016777215>Qt::AlignCenterQt::LinksAccessibleByMouse|Qt::TextSelectableByMouseQt::Vertical130Qt::Horizontal00true20-1
thread_message.ui 0000664 0000000 0000000 00000005367 11666415431 0035006 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qhavesex/ui
ThreadMessage0055276FrameQFrame::StyledPanelQFrame::RaisedQt::Vertical20100trueQt::LinksAccessibleByMouse|Qt::TextSelectableByMousetrueQt::LinksAccessibleByMouse|Qt::TextSelectableByMouseQt::Vertical201
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qvideoob/ 0000775 0000000 0000000 00000000000 11666415431 0031107 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000000067 11666415431 0033144 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qvideoob from .qvideoob import QVideoob
__all__ = ['QVideoob']
main_window.py 0000664 0000000 0000000 00000012421 11666415431 0033715 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qvideoob # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from PyQt4.QtCore import SIGNAL
from weboob.capabilities.video import ICapVideo
from weboob.tools.application.qt import QtMainWindow, QtDo
from weboob.tools.application.qt.backendcfg import BackendCfg
from weboob.applications.qvideoob.ui.main_window_ui import Ui_MainWindow
from .video import Video
from .minivideo import MiniVideo
class MainWindow(QtMainWindow):
def __init__(self, config, weboob, parent=None):
QtMainWindow.__init__(self, parent)
self.ui = Ui_MainWindow()
self.ui.setupUi(self)
self.config = config
self.weboob = weboob
self.minivideos = []
self.ui.sortbyEdit.setCurrentIndex(int(self.config.get('settings', 'sortby')))
self.ui.nsfwCheckBox.setChecked(int(self.config.get('settings', 'nsfw')))
self.ui.sfwCheckBox.setChecked(int(self.config.get('settings', 'sfw')))
self.connect(self.ui.searchEdit, SIGNAL("returnPressed()"), self.search)
self.connect(self.ui.urlEdit, SIGNAL("returnPressed()"), self.openURL)
self.connect(self.ui.nsfwCheckBox, SIGNAL("stateChanged(int)"), self.nsfwChanged)
self.connect(self.ui.sfwCheckBox, SIGNAL("stateChanged(int)"), self.sfwChanged)
self.connect(self.ui.actionBackends, SIGNAL("triggered()"), self.backendsConfig)
self.loadBackendsList()
if self.ui.backendEdit.count() == 0:
self.backendsConfig()
def backendsConfig(self):
bckndcfg = BackendCfg(self.weboob, (ICapVideo,), self)
if bckndcfg.run():
self.loadBackendsList()
def loadBackendsList(self):
self.ui.backendEdit.clear()
for i, backend in enumerate(self.weboob.iter_backends()):
if i == 0:
self.ui.backendEdit.addItem('All backends', '')
self.ui.backendEdit.addItem(backend.name, backend.name)
if backend.name == self.config.get('settings', 'backend'):
self.ui.backendEdit.setCurrentIndex(i+1)
if self.ui.backendEdit.count() == 0:
self.ui.searchEdit.setEnabled(False)
self.ui.urlEdit.setEnabled(False)
else:
self.ui.searchEdit.setEnabled(True)
self.ui.urlEdit.setEnabled(True)
def nsfwChanged(self, state):
self.config.set('settings', 'nsfw', int(self.ui.nsfwCheckBox.isChecked()))
self.updateVideosDisplay()
def sfwChanged(self, state):
self.config.set('settings', 'sfw', int(self.ui.sfwCheckBox.isChecked()))
self.updateVideosDisplay()
def updateVideosDisplay(self):
for minivideo in self.minivideos:
if (minivideo.video.nsfw and self.ui.nsfwCheckBox.isChecked() or
not minivideo.video.nsfw and self.ui.sfwCheckBox.isChecked()):
minivideo.show()
else:
minivideo.hide()
def search(self):
pattern = unicode(self.ui.searchEdit.text())
if not pattern:
return
for minivideo in self.minivideos:
self.ui.scrollAreaContent.layout().removeWidget(minivideo)
minivideo.hide()
minivideo.deleteLater()
self.minivideos = []
self.ui.searchEdit.setEnabled(False)
backend_name = str(self.ui.backendEdit.itemData(self.ui.backendEdit.currentIndex()).toString())
self.process = QtDo(self.weboob, self.addVideo)
self.process.do('iter_search_results', pattern, self.ui.sortbyEdit.currentIndex(), nsfw=True, max_results=20, backends=backend_name)
def addVideo(self, backend, video):
if not backend:
self.ui.searchEdit.setEnabled(True)
self.process = None
return
minivideo = MiniVideo(self.weboob, backend, video)
self.ui.scrollAreaContent.layout().addWidget(minivideo)
self.minivideos.append(minivideo)
if (video.nsfw and not self.ui.nsfwCheckBox.isChecked() or
not video.nsfw and not self.ui.sfwCheckBox.isChecked()):
minivideo.hide()
def openURL(self):
url = unicode(self.ui.urlEdit.text())
if not url:
return
for backend in self.weboob.iter_backends():
video = backend.get_video(url)
if video:
video_widget = Video(video, self)
video_widget.show()
self.ui.urlEdit.clear()
def closeEvent(self, ev):
self.config.set('settings', 'backend', str(self.ui.backendEdit.itemData(self.ui.backendEdit.currentIndex()).toString()))
self.config.set('settings', 'sortby', self.ui.sortbyEdit.currentIndex())
self.config.save()
ev.accept()
minivideo.py 0000664 0000000 0000000 00000005016 11666415431 0033367 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qvideoob # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from PyQt4.QtGui import QFrame, QImage, QPixmap
from weboob.tools.application.qt import QtDo
from weboob.applications.qvideoob.ui.minivideo_ui import Ui_MiniVideo
from .video import Video
class MiniVideo(QFrame):
def __init__(self, weboob, backend, video, parent=None):
QFrame.__init__(self, parent)
self.ui = Ui_MiniVideo()
self.ui.setupUi(self)
self.weboob = weboob
self.backend = backend
self.video = video
self.ui.titleLabel.setText(video.title)
self.ui.backendLabel.setText(backend.name)
self.ui.durationLabel.setText(unicode(video.duration))
self.ui.authorLabel.setText(unicode(video.author))
self.ui.dateLabel.setText(video.date and unicode(video.date) or '')
if video.rating_max:
self.ui.ratingLabel.setText('%s / %s' % (video.rating, video.rating_max))
else:
self.ui.ratingLabel.setText('%s' % video.rating)
self.process_thumbnail = QtDo(self.weboob, self.gotThumbnail)
self.process_thumbnail.do('fillobj', self.video, ['thumbnail'], backends=backend)
def gotThumbnail(self, backend, video):
if not backend:
return
if video.thumbnail and video.thumbnail.data:
img = QImage.fromData(video.thumbnail.data)
self.ui.imageLabel.setPixmap(QPixmap.fromImage(img))
def enterEvent(self, event):
self.setFrameShadow(self.Sunken)
QFrame.enterEvent(self, event)
def leaveEvent(self, event):
self.setFrameShadow(self.Raised)
QFrame.leaveEvent(self, event)
def mousePressEvent(self, event):
QFrame.mousePressEvent(self, event)
video = self.backend.get_video(self.video.id)
if video:
video_widget = Video(video, self)
video_widget.show()
qvideoob.py 0000664 0000000 0000000 00000003013 11666415431 0033207 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qvideoob # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from weboob.capabilities.video import ICapVideo
from weboob.tools.application.qt import QtApplication
from .main_window import MainWindow
class QVideoob(QtApplication):
APPNAME = 'qvideoob'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2010-2011 Romain Bignon'
DESCRIPTION = 'Qt application allowing to search videos on various websites and play them.'
CAPS = ICapVideo
CONFIG = {'settings': {'nsfw': 1,
'sfw': 1,
'sortby': 0,
'backend': ''
}
}
def main(self, argv):
self.load_backends(ICapVideo)
self.load_config()
self.main_window = MainWindow(self.config, self.weboob)
self.main_window.show()
return self.weboob.loop()
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qvideoob/ui/ 0000775 0000000 0000000 00000000000 11666415431 0031524 5 ustar 00root root 0000000 0000000 Makefile 0000664 0000000 0000000 00000000265 11666415431 0033110 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qvideoob/ui UI_FILES = $(wildcard *.ui)
UI_PY_FILES = $(UI_FILES:%.ui=%_ui.py)
PYUIC = pyuic4
all: $(UI_PY_FILES)
%_ui.py: %.ui
$(PYUIC) -o $@ $^
clean:
rm -f *.pyc
rm -f $(UI_PY_FILES)
__init__.py 0000664 0000000 0000000 00000000000 11666415431 0033544 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qvideoob/ui main_window.ui 0000664 0000000 0000000 00000012651 11666415431 0034324 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qvideoob/ui
MainWindow00582463QVideoobQFrame::StyledPanelQFrame::RaisedSearch: 00RelevanceRatingDurationDate10000Display:SFWtrueNSFWtrueQt::Horizontal4020true00560230QWidget#scrollAreaContent {
background-color: rgb(255, 255, 255);
}QFrame::StyledPanelQFrame::RaisedURL: 0058225toolBarTopToolBarAreafalseBackends
minivideo.ui 0000664 0000000 0000000 00000011337 11666415431 0033774 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qvideoob/ui
MiniVideo00464132FormQFrame::StyledPanelQFrame::Raised95500QFormLayout::ExpandingFieldsGrow75trueTitle0050truefalseTextLabel75trueDurationTextLabel75trueAuthorTextLabel75trueDateTextLabel75trueRatingTextLabel75trueWhereTextLabel
video.ui 0000664 0000000 0000000 00000013711 11666415431 0033115 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qvideoob/ui
Video00647404Video1275trueQFrame::BoxQFrame::RaisedTextLabelQt::AlignCenter00background-color: rgb(255, 255, 255);QFrame::StyledPanelQFrame::Sunken00truetrueQFrame::StyledPanelQFrame::RaisedQFormLayout::ExpandingFieldsGrow75trueURLtrueArrowCursortruefalsetrue75trueDurationTextLabel75trueAuthorTextLabel75trueDateTextLabel75trueRatingTextLabelPhonon::VideoPlayerQWidgetphonon/videoplayer.hPhonon::SeekSliderQWidgetphonon/seekslider.h
video.py 0000664 0000000 0000000 00000003524 11666415431 0032514 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qvideoob # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from PyQt4.QtCore import QUrl
from PyQt4.QtGui import QDialog
from PyQt4.phonon import Phonon
from weboob.applications.qvideoob.ui.video_ui import Ui_Video
class Video(QDialog):
def __init__(self, video, parent=None):
QDialog.__init__(self, parent)
self.ui = Ui_Video()
self.ui.setupUi(self)
self.video = video
self.setWindowTitle("Video - %s" % video.title)
self.ui.urlEdit.setText(video.url)
self.ui.titleLabel.setText(video.title)
self.ui.durationLabel.setText(unicode(video.duration))
self.ui.authorLabel.setText(unicode(video.author))
self.ui.dateLabel.setText(unicode(video.date))
if video.rating_max:
self.ui.ratingLabel.setText('%s / %s' % (video.rating, video.rating_max))
else:
self.ui.ratingLabel.setText('%s' % video.rating)
self.ui.seekSlider.setMediaObject(self.ui.videoPlayer.mediaObject())
self.ui.videoPlayer.load(Phonon.MediaSource(QUrl(video.url)))
self.ui.videoPlayer.play()
def closeEvent(self, event):
self.ui.videoPlayer.stop()
event.accept()
qwebcontentedit/ 0000775 0000000 0000000 00000000000 11666415431 0032417 5 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications __init__.py 0000664 0000000 0000000 00000000113 11666415431 0034523 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qwebcontentedit from .qwebcontentedit import QWebContentEdit
__all__ = ['QWebContentEdit'] main_window.py 0000664 0000000 0000000 00000012467 11666415431 0035316 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qwebcontentedit # -*- coding: utf-8 -*-
# Copyright(C) 2011 Clément Schreiner
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
import logging
from copy import deepcopy
from PyQt4.QtCore import SIGNAL
from PyQt4.QtGui import QMessageBox
from weboob.tools.application.qt import QtMainWindow, QtDo
from weboob.tools.application.qt.backendcfg import BackendCfg
from weboob.capabilities.content import ICapContent
from weboob.tools.misc import to_unicode
from .ui.main_window_ui import Ui_MainWindow
class MainWindow(QtMainWindow):
def __init__(self, config, weboob, parent=None):
QtMainWindow.__init__(self, parent)
self.ui = Ui_MainWindow()
self.ui.setupUi(self)
self.config = config
self.weboob = weboob
self.backend = None
self.connect(self.ui.idEdit, SIGNAL("returnPressed()"), self.loadPage)
self.connect(self.ui.loadButton, SIGNAL("clicked()"), self.loadPage)
self.connect(self.ui.tabWidget, SIGNAL("currentChanged(int)"),
self._currentTabChanged)
self.connect(self.ui.saveButton, SIGNAL("clicked()"), self.savePage)
self.connect(self.ui.actionBackends, SIGNAL("triggered()"), self.backendsConfig)
if self.weboob.count_backends() == 0:
self.backendsConfig()
else:
self.loadBackends()
def backendsConfig(self):
bckndcfg = BackendCfg(self.weboob, (ICapContent,), self)
if bckndcfg.run():
self.loadBackends()
def loadBackends(self):
self.ui.backendBox.clear()
for backend in self.weboob.iter_backends():
self.ui.backendBox.insertItem(0, backend.name)
def _currentTabChanged(self):
if self.ui.tabWidget.currentIndex() == 1:
if self.backend is not None:
self.showPreview()
def loadPage(self):
_id = unicode(self.ui.idEdit.text())
if not _id:
return
self.ui.saveButton.setEnabled(False)
backend = str(self.ui.backendBox.currentText())
self.process = QtDo(self.weboob, self._loadPage_cb, self._loadPage_eb)
self.process.do('get_content', _id, backends=(backend,))
def _loadPage_cb(self, backend, data):
if not backend:
self.process = None
if self.backend:
self.ui.saveButton.setEnabled(True)
return
if not data:
self.content = None
self.backend = None
QMessageBox.critical(self, self.tr('Unable to open page'),
'Unable to open page "%s" on %s: it does not exist.'
% (self.ui.idEdit.text(), self.ui.backendBox.currentText()),
QMessageBox.Ok)
return
self.content = data
self.ui.contentEdit.setPlainText(self.content.content)
self.setWindowTitle("QWebcontentedit - %s@%s" %(self.content.id, backend.name))
self.backend = backend
def _loadPage_eb(self, backend, error, backtrace):
content = unicode(self.tr('Unable to load page:\n%s\n')) % to_unicode(error)
if logging.root.level == logging.DEBUG:
content += '\n%s\n' % to_unicode(backtrace)
QMessageBox.critical(self, self.tr('Error while loading page'),
content, QMessageBox.Ok)
def savePage(self):
if self.backend is None:
return
new_content = unicode(self.ui.contentEdit.toPlainText())
minor = self.ui.minorBox.isChecked()
if new_content != self.content.content:
self.ui.saveButton.setEnabled(False)
self.content.content = new_content
message = unicode(self.ui.descriptionEdit.text())
self.process = QtDo(self.weboob, self._savePage_cb, self._savePage_eb)
self.process.do('push_content', self.content, message, minor=minor, backends=self.backend)
def _savePage_cb(self, backend, data):
if not backend:
self.process = None
self.ui.saveButton.setEnabled(True)
return
self.ui.descriptionEdit.clear()
def _savePage_eb(self, backend, error, backtrace):
content = unicode(self.tr('Unable to save page:\n%s\n')) % to_unicode(error)
if logging.root.level == logging.DEBUG:
content += '\n%s\n' % to_unicode(backtrace)
QMessageBox.critical(self, self.tr('Error while saving page'),
content, QMessageBox.Ok)
self.ui.saveButton.setEnabled(True)
def showPreview(self):
tmp_content = deepcopy(self.content)
tmp_content.content=unicode(self.ui.contentEdit.toPlainText())
self.ui.previewEdit.setHtml(self.backend.get_content_preview(tmp_content))
qwebcontentedit.py 0000664 0000000 0000000 00000002525 11666415431 0036174 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qwebcontentedit # -*- coding: utf-8 -*-
# Copyright(C) 2011 Clément Schreiner
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from weboob.tools.application.qt import QtApplication
from weboob.capabilities.content import ICapContent
from .main_window import MainWindow
class QWebContentEdit(QtApplication):
APPNAME = 'qwebcontentedit'
VERSION = '0.9.1'
COPYRIGHT = u'Copyright(C) 2011 Clément Schreiner'
DESCRIPTION = 'Qt application allowing to manage contents of various websites.'
CAPS = ICapContent
def main(self, argv):
self.load_backends(ICapContent, storage=self.create_storage())
self.main_window = MainWindow(self.config, self.weboob)
self.main_window.show()
return self.weboob.loop()
ui/ 0000775 0000000 0000000 00000000000 11666415431 0033034 5 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qwebcontentedit Makefile 0000664 0000000 0000000 00000000265 11666415431 0034477 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qwebcontentedit/ui UI_FILES = $(wildcard *.ui)
UI_PY_FILES = $(UI_FILES:%.ui=%_ui.py)
PYUIC = pyuic4
all: $(UI_PY_FILES)
%_ui.py: %.ui
$(PYUIC) -o $@ $^
clean:
rm -f *.pyc
rm -f $(UI_PY_FILES)
__init__.py 0000664 0000000 0000000 00000000000 11666415431 0035133 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qwebcontentedit/ui main_window.ui 0000664 0000000 0000000 00000010000 11666415431 0035675 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qwebcontentedit/ui
MainWindow00469520QWebcontenteditfalseLoad0EditPreviewtrueEdit descriptionMinor editfalseSave0046920FiletoolBarTopToolBarAreafalseExitBackendsactionExittriggered()MainWindowclose()-1-1343306
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qweboobcfg/ 0000775 0000000 0000000 00000000000 11666415431 0031415 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000001435 11666415431 0033452 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qweboobcfg # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from .qweboobcfg import QWeboobCfg
__all__ = ['QWeboobCfg']
qweboobcfg.py 0000664 0000000 0000000 00000002500 11666415431 0034023 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/qweboobcfg #!/usr/bin/env python
# -*- coding: utf-8 -*-
# vim: ft=python et softtabstop=4 cinoptions=4 shiftwidth=4 ts=4 ai
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from weboob.tools.application.qt import BackendCfg, QtApplication
class QWeboobCfg(QtApplication):
APPNAME = 'qweboobcfg'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2010-2011 Romain Bignon'
DESCRIPTION = "weboob-config-qt is a graphical application to add/edit/remove backends, " \
"and to register new website accounts."
def main(self, argv):
self.load_backends()
self.dlg = BackendCfg(self.weboob)
self.dlg.show()
return self.weboob.loop()
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/radioob/ 0000775 0000000 0000000 00000000000 11666415431 0030716 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000001424 11666415431 0032751 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/radioob # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from .radioob import Radioob
__all__ = ['Radioob']
radioob.py 0000664 0000000 0000000 00000011465 11666415431 0032637 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/radioob # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
import sys
from weboob.capabilities.radio import ICapRadio
from weboob.capabilities.base import NotLoaded
from weboob.tools.application.repl import ReplApplication
from weboob.tools.application.media_player import InvalidMediaPlayer, MediaPlayer, MediaPlayerNotFound
from weboob.tools.application.formatters.iformatter import IFormatter
__all__ = ['Radioob']
class RadioListFormatter(IFormatter):
MANDATORY_FIELDS = ('id', 'title', 'description')
count = 0
def flush(self):
self.count = 0
pass
def format_dict(self, item):
self.count += 1
if self.interactive:
backend = item['id'].split('@', 1)[1]
result = u'%s* (%d) %s (%s)%s\n' % (ReplApplication.BOLD, self.count, item['title'], backend, ReplApplication.NC)
else:
result = u'%s* (%s) %s%s\n' % (ReplApplication.BOLD, item['id'], item['title'], ReplApplication.NC)
result += ' %-30s' % item['description']
if item['current'] is not NotLoaded:
if item['current'].artist:
result += ' (Current: %s - %s)' % (item['current'].artist, item['current'].title)
else:
result += ' (Current: %s)' % item['current'].title
return result
class Radioob(ReplApplication):
APPNAME = 'radioob'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2010-2011 Romain Bignon'
DESCRIPTION = 'Console application allowing to search for web radio stations, listen to them and get information ' \
'like the current song.'
CAPS = ICapRadio
EXTRA_FORMATTERS = {'radio_list': RadioListFormatter}
COMMANDS_FORMATTERS = {'ls': 'radio_list',
'search': 'radio_list',
}
def __init__(self, *args, **kwargs):
ReplApplication.__init__(self, *args, **kwargs)
self.player = MediaPlayer(self.logger)
def main(self, argv):
self.load_config()
return ReplApplication.main(self, argv)
def complete_play(self, text, line, *ignored):
args = line.split(' ')
if len(args) == 2:
return self._complete_object()
def do_play(self, _id):
"""
play ID
Play a radio with a found player.
"""
if not _id:
print >>sys.stderr, 'This command takes an argument: %s' % self.get_command_help('play', short=True)
return 2
radio = self.get_object(_id, 'get_radio', ['streams'])
if not radio:
print >>sys.stderr, 'Radio not found: ' % _id
return 1
try:
player_name = self.config.get('media_player')
if not player_name:
self.logger.debug(u'You can set the media_player key to the player you prefer in the radioob '
'configuration file.')
self.player.play(radio.streams[0], player_name=player_name)
except (InvalidMediaPlayer, MediaPlayerNotFound), e:
print '%s\nRadio URL: %s' % (e, radio.streams[0].url)
def complete_info(self, text, line, *ignored):
args = line.split(' ')
if len(args) == 2:
return self._complete_object()
def do_info(self, _id):
"""
info ID
Get information about a radio.
"""
if not _id:
print >>sys.stderr, 'This command takes an argument: %s' % self.get_command_help('info', short=True)
return 2
radio = self.get_object(_id, 'get_radio')
if not radio:
print >>sys.stderr, 'Radio not found:', _id
return 3
self.format(radio)
self.flush()
def do_search(self, pattern=None):
"""
search PATTERN
List radios matching a PATTERN.
If PATTERN is not given, this command will list all the radios.
"""
self.set_formatter_header(u'Search pattern: %s' % pattern if pattern else u'All radios')
self.change_path('/search')
for backend, radio in self.do('iter_radios_search', pattern=pattern):
self.add_object(radio)
self.format(radio)
self.flush()
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/traveloob/ 0000775 0000000 0000000 00000000000 11666415431 0031274 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000001536 11666415431 0033333 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/traveloob # -*- coding: utf-8 -*-
# vim: ft=python et softtabstop=4 cinoptions=4 shiftwidth=4 ts=4 ai
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from .traveloob import Traveloob
__all__ = ['Traveloob']
traveloob.py 0000664 0000000 0000000 00000007776 11666415431 0033605 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/traveloob # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon, Julien Hébert
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
import sys
from datetime import datetime
import logging
from weboob.capabilities.travel import ICapTravel, RoadmapFilters
from weboob.tools.application.repl import ReplApplication
__all__ = ['Traveloob']
class Traveloob(ReplApplication):
APPNAME = 'traveloob'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2010-2011 Romain Bignon'
DESCRIPTION = 'Console application allowing to search for train stations and get departure times.'
CAPS = ICapTravel
DEFAULT_FORMATTER = 'table'
def add_application_options(self, group):
group.add_option('--departure-time')
group.add_option('--arrival-time')
def do_stations(self, pattern):
"""
stations PATTERN
Search stations.
"""
for backend, station in self.do('iter_station_search', pattern):
self.format(station)
self.flush()
def do_departures(self, line):
"""
departures STATION [ARRIVAL]
List all departures for a given station.
"""
station, arrival = self.parse_command_args(line, 2, 1)
station_id, backend_name = self.parse_id(station)
if arrival:
arrival_id, backend_name2 = self.parse_id(arrival)
if backend_name and backend_name2 and backend_name != backend_name2:
logging.error('Departure and arrival aren\'t on the same backend')
return 1
else:
arrival_id = backend_name2 = None
if backend_name:
backends = [backend_name]
elif backend_name2:
backends = [backend_name2]
else:
backends = None
for backend, departure in self.do('iter_station_departures', station_id, arrival_id, backends=backends):
self.format(departure)
self.flush()
def do_roadmap(self, line):
"""
roadmap DEPARTURE ARRIVAL
Display the roadmap to travel from DEPARTURE to ARRIVAL.
Command-line parameters:
--departure-time TIME requested departure time
--arrival-time TIME requested arrival time
TIME might be in form "yyyy-mm-dd HH:MM" or "HH:MM".
Example:
> roadmap Puteaux Aulnay-sous-Bois --arrival-time 22:00
"""
departure, arrival = self.parse_command_args(line, 2, 2)
filters = RoadmapFilters()
try:
filters.departure_time = self.parse_datetime(self.options.departure_time)
filters.arrival_time = self.parse_datetime(self.options.arrival_time)
except ValueError, e:
print >>sys.stderr, 'Invalid datetime value: %s' % e
print >>sys.stderr, 'Please enter a datetime in form "yyyy-mm-dd HH:MM" or "HH:MM".'
return 1
for backend, route in self.do('iter_roadmap', departure, arrival, filters):
self.format(route)
self.flush()
def parse_datetime(self, text):
if text is None:
return None
try:
date = datetime.strptime(text, '%Y-%m-%d %H:%M')
except ValueError:
try:
date = datetime.strptime(text, '%H:%M')
except ValueError:
raise ValueError(text)
date = datetime.now().replace(hour=date.hour, minute=date.minute)
return date
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/videoob/ 0000775 0000000 0000000 00000000000 11666415431 0030726 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000001426 11666415431 0032763 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/videoob # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Christophe Benz
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from .videoob import Videoob
__all__ = ['Videoob']
videoob.py 0000664 0000000 0000000 00000017147 11666415431 0032662 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/videoob # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Christophe Benz, Romain Bignon, John Obbele, Nicolas Duhamel
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from __future__ import with_statement
import subprocess
import sys
import os
from weboob.capabilities.video import ICapVideo
from weboob.capabilities.base import NotLoaded
from weboob.tools.application.repl import ReplApplication
from weboob.tools.application.media_player import InvalidMediaPlayer, MediaPlayer, MediaPlayerNotFound
from weboob.tools.application.formatters.iformatter import IFormatter
__all__ = ['Videoob']
class VideoListFormatter(IFormatter):
MANDATORY_FIELDS = ('id', 'title', 'duration', 'date')
count = 0
def flush(self):
self.count = 0
pass
def format_dict(self, item):
self.count += 1
if self.interactive:
backend = item['id'].split('@', 1)[1]
result = u'%s* (%d) %s (%s)%s\n' % (self.BOLD, self.count, item['title'], backend, self.NC)
else:
result = u'%s* (%s) %s%s\n' % (self.BOLD, item['id'], item['title'], self.NC)
result += ' %s' % (item['duration'] if item['duration'] else item['date'])
if item['author'] is not NotLoaded:
result += ' - %s' % item['author']
if item['rating'] is not NotLoaded:
result += u' (%s/%s)' % (item['rating'], item['rating_max'])
return result
class Videoob(ReplApplication):
APPNAME = 'videoob'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2010-2011 Christophe Benz, Romain Bignon, John Obbele'
DESCRIPTION = 'Console application allowing to search for videos on various websites, ' \
'play and download them and get information.'
CAPS = ICapVideo
EXTRA_FORMATTERS = {'video_list': VideoListFormatter}
COMMANDS_FORMATTERS = {'search': 'video_list',
'ls': 'video_list'}
nsfw = True
def __init__(self, *args, **kwargs):
ReplApplication.__init__(self, *args, **kwargs)
self.player = MediaPlayer(self.logger)
def main(self, argv):
self.load_config()
return ReplApplication.main(self, argv)
def complete_download(self, text, line, *ignored):
args = line.split(' ')
if len(args) == 2:
return self._complete_object()
elif len(args) >= 3:
return self.path_completer(args[2])
def do_download(self, line):
"""
download ID [FILENAME]
Download a video
"""
_id, dest = self.parse_command_args(line, 2, 1)
video = self.get_object(_id, 'get_video', ['url'])
if not video:
print >>sys.stderr, 'Video not found: %s' % _id
return 3
if not video.url:
print >>sys.stderr, 'Error: the direct URL is not available.'
return 4
def check_exec(executable):
with open('/dev/null', 'w') as devnull:
process = subprocess.Popen(['which', executable], stdout=devnull)
if process.wait() != 0:
print >>sys.stderr, 'Please install "%s"' % executable
return False
return True
if dest is None:
ext = video.ext
if not ext:
ext = 'avi'
dest = '%s.%s' % (video.id, ext)
if video.url.find('rtmp') == 0:
if not check_exec('rtmpdump'):
return 1
args = ('rtmpdump', '-r', video.url, '-o', dest)
elif video.url.find('mms') == 0:
if not check_exec('mimms'):
return 1
args = ('mimms', video.url, dest)
else:
if not check_exec('wget'):
return 1
args = ('wget', video.url, '-O', dest)
os.spawnlp(os.P_WAIT, args[0], *args)
def complete_play(self, text, line, *ignored):
args = line.split(' ')
if len(args) == 2:
return self._complete_object()
def do_play(self, _id):
"""
play ID
Play a video with a found player.
"""
if not _id:
print >>sys.stderr, 'This command takes an argument: %s' % self.get_command_help('play', short=True)
return 2
video = self.get_object(_id, 'get_video', ['url'])
if not video:
print >>sys.stderr, 'Video not found: %s' % _id
return 3
if not video.url:
print >>sys.stderr, 'Error: the direct URL is not available.'
return 4
try:
player_name = self.config.get('media_player')
if not player_name:
self.logger.info(u'You can set the media_player key to the player you prefer in the videoob '
'configuration file.')
self.player.play(video, player_name=player_name)
except (InvalidMediaPlayer, MediaPlayerNotFound), e:
print '%s\nVideo URL: %s' % (e, video.url)
def complete_info(self, text, line, *ignored):
args = line.split(' ')
if len(args) == 2:
return self._complete_object()
def do_info(self, _id):
"""
info ID
Get information about a video.
"""
if not _id:
print >>sys.stderr, 'This command takes an argument: %s' % self.get_command_help('info', short=True)
return 2
video = self.get_object(_id, 'get_video')
if not video:
print >>sys.stderr, 'Video not found: %s' % _id
return 3
self.format(video)
self.flush()
def complete_nsfw(self, text, line, begidx, endidx):
return ['on', 'off']
def do_nsfw(self, line):
"""
nsfw [on | off]
If argument is given, enable or disable the non-suitable for work behavior.
If no argument is given, print the current behavior.
"""
line = line.strip()
if line:
if line == 'on':
self.nsfw = True
elif line == 'off':
self.nsfw = False
else:
print 'Invalid argument "%s".' % line
return 2
else:
print "on" if self.nsfw else "off"
def do_search(self, pattern=None):
"""
search [PATTERN]
Search for videos matching a PATTERN.
If PATTERN is not given, this command will search for the latest videos.
"""
if len(self.enabled_backends) == 0:
if self.interactive:
print >>sys.stderr, 'No backend loaded. Please use the "backends" command.'
else:
print >>sys.stderr, 'No backend loaded.'
return 1
self.set_formatter_header(u'Search pattern: %s' % pattern if pattern else u'Latest videos')
self.change_path('/search')
for backend, video in self.do('iter_search_results', pattern=pattern, nsfw=self.nsfw,
max_results=self.options.count):
self.add_object(video)
self.format(video)
self.flush()
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/videoob_web/ 0000775 0000000 0000000 00000000000 11666415431 0031563 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000001440 11666415431 0033614 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/videoob_web # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Christophe Benz
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from .videoob_web import VideoobWeb
__all__ = ['VideoobWeb']
public/ 0000775 0000000 0000000 00000000000 11666415431 0032762 5 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/videoob_web style.css 0000664 0000000 0000000 00000000076 11666415431 0034637 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/videoob_web/public .video-item
{
margin-bottom: 5ex;
margin-left: 2em;
}
templates/ 0000775 0000000 0000000 00000000000 11666415431 0033502 5 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/videoob_web base.mako 0000664 0000000 0000000 00000000622 11666415431 0035265 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/videoob_web/templates ## -*- coding: utf-8 -*-
<%def name="title()" filter="trim">
Videoob Web
%def>
% if merge:
% for item in results:
${video_item(item)}
% endfor
% else:
% for backend, items in sorted(results.iteritems()):
${backend}
% for item in items:
${video_item(item)}
% endfor
% endfor
% endif
%def>
videoob_web.py 0000664 0000000 0000000 00000010520 11666415431 0034340 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/videoob_web # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Christophe Benz
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
import os
from mako.lookup import TemplateLookup
from mako.runtime import Context
from routes import Mapper
from StringIO import StringIO
from webob.dec import wsgify
from webob import exc
from wsgiref.simple_server import make_server
from weboob.capabilities.video import ICapVideo
from weboob.tools.application.base import BaseApplication
__all__ = ['VideoobWeb']
template_lookup = TemplateLookup(directories=[os.path.join(os.path.dirname(__file__), 'templates')],
output_encoding='utf-8', encoding_errors='replace')
class VideoobWeb(BaseApplication):
APPNAME = 'videoob-webserver'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2010-2011 Christophe Benz'
DESCRIPTION = 'WSGI web server application allowing to search for videos on various websites.'
CAPS = ICapVideo
CONFIG = dict(host='localhost', port=8080)
@wsgify
def make_app(self, req):
map = Mapper()
map.connect('index', '/', method='index')
results = map.routematch(environ=req.environ)
if results:
match, route = results
req.urlvars = ((), match)
kwargs = match.copy()
method = kwargs.pop('method')
return getattr(self, method)(req, **kwargs)
else:
public_path = os.path.join(os.path.dirname(__file__), 'public')
if not os.path.exists(public_path):
return exc.HTTPNotFound()
path = req.path
if path.startswith('/'):
path = path[1:]
public_file_path = os.path.join(public_path, path)
if os.path.exists(public_file_path):
if path.endswith('.css'):
req.response.content_type = 'text/css'
elif path.endswith('.js'):
req.response.content_type = 'text/javascript'
return open(public_file_path, 'r').read().strip()
else:
return exc.HTTPNotFound()
def main(self, argv):
self.load_config()
self.weboob.load_backends(ICapVideo)
print 'Web server created. Listening on http://%s:%s' % (
self.config.get('host'), int(self.config.get('port')))
srv = make_server(self.config.get('host'), int(self.config.get('port')), self.make_app)
srv.serve_forever()
def index(self, req):
c = {}
nsfw = req.params.get('nsfw')
nsfw = False if not nsfw or nsfw == '0' else True
q = req.params.get('q', u'')
merge = req.params.get('merge')
merge = False if not merge or merge == '0' else True
c['merge'] = merge
c['form_data'] = dict(q=q)
c['results'] = [] if merge else {}
if q:
for backend in self.weboob.iter_backends():
videos = [dict(title=video.title,
page_url=video.page_url,
url=video.url if video.url else '/download?id=%s' % video.id,
thumbnail_url=video.thumbnail_url,
) \
for video in backend.iter_search_results(pattern=q, nsfw=nsfw)]
if videos:
if merge:
c['results'].extend(videos)
else:
c['results'][backend.name] = videos
if merge:
c['results'] = sorted(c['results'], key=lambda video: video['title'].lower())
template = template_lookup.get_template('index.mako')
buf = StringIO()
ctx = Context(buf, **c)
template.render_context(ctx)
return buf.getvalue().strip()
webcontentedit/ 0000775 0000000 0000000 00000000000 11666415431 0032236 5 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications __init__.py 0000664 0000000 0000000 00000001451 11666415431 0034350 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/webcontentedit # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from .webcontentedit import WebContentEdit
__all__ = ['WebContentEdit']
webcontentedit.py 0000664 0000000 0000000 00000011331 11666415431 0035625 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/webcontentedit # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
# python2.5 compatibility
from __future__ import with_statement
import os
import sys
import tempfile
from weboob.core.bcall import CallErrors
from weboob.capabilities.content import ICapContent
from weboob.tools.application.repl import ReplApplication
__all__ = ['WebContentEdit']
class WebContentEdit(ReplApplication):
APPNAME = 'webcontentedit'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2010-2011 Romain Bignon'
DESCRIPTION = 'Console application allowing to display and edit contents on various websites.'
CAPS = ICapContent
def do_edit(self, line):
"""
edit ID [ID...]
Edit a content with $EDITOR, then push it on the website.
"""
contents = []
for id in line.split():
_id, backend_name = self.parse_id(id, unique_backend=True)
backend_names = (backend_name,) if backend_name is not None else self.enabled_backends
contents += [content for backend, content in self.do('get_content', _id, backends=backend_names) if content]
if len(contents) == 0:
print >>sys.stderr, 'No contents found'
return 3
paths = {}
for content in contents:
tmpdir = os.path.join(tempfile.gettempdir(), "weboob")
if not os.path.isdir(tmpdir):
os.makedirs(tmpdir)
fd, path = tempfile.mkstemp(prefix='%s_' % content.id.replace(os.path.sep, '_'), dir=tmpdir)
with os.fdopen(fd, 'w') as f:
data = content.content
if isinstance(data, unicode):
data = data.encode('utf-8')
elif data is None:
content.content = u''
data = ''
f.write(data)
paths[path.encode('utf-8')] = content
params = ''
editor = os.environ.get('EDITOR', 'vim')
if editor == 'vim':
params = '-p'
os.system("%s %s %s" % (editor, params, ' '.join(['"%s"' % path.replace('"', '\\"') for path in paths.iterkeys()])))
for path, content in paths.iteritems():
with open(path, 'r') as f:
data = f.read()
try:
data = data.decode('utf-8')
except UnicodeError:
pass
if content.content != data:
content.content = data
else:
contents.remove(content)
if len(contents) == 0:
print >>sys.stderr, 'No changes. Abort.'
return 1
print 'Contents changed:\n%s' % ('\n'.join(' * %s' % content.id for content in contents))
message = self.ask('Enter a commit message', default='')
minor = self.ask('Is this a minor edit?', default=False)
if not self.ask('Do you want to push?', default=True):
return
errors = CallErrors([])
for content in contents:
path = [path for path, c in paths.iteritems() if c == content][0]
sys.stdout.write('Pushing %s...' % content.id.encode('utf-8'))
sys.stdout.flush()
try:
self.do('push_content', content, message, minor=minor, backends=[content.backend]).wait()
except CallErrors, e:
errors.errors += e.errors
sys.stdout.write(' error (content saved in %s)\n' % path)
else:
sys.stdout.write(' done\n')
os.unlink(path)
if len(errors.errors) > 0:
raise errors
def do_log(self, line):
"""
log ID
Display log of a page
"""
if not line:
print >>sys.stderr, 'Error: please give a page ID'
return 2
_id, backend_name = self.parse_id(line)
backend_names = (backend_name,) if backend_name is not None else self.enabled_backends
_id = _id.encode('utf-8')
for backend, revision in self.do('iter_revisions', _id, max_results=self.options.count, backends=backend_names):
self.format(revision)
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/weboobcfg/ 0000775 0000000 0000000 00000000000 11666415431 0031234 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000001432 11666415431 0033266 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/weboobcfg # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from .weboobcfg import WeboobCfg
__all__ = ['WeboobCfg']
weboobcfg.py 0000664 0000000 0000000 00000020657 11666415431 0033476 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/weboobcfg # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon, Christophe Benz
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
import os
import sys
import re
from copy import copy
from weboob.capabilities.account import ICapAccount
from weboob.core.modules import ModuleLoadError
from weboob.tools.application.repl import ReplApplication
from weboob.tools.ordereddict import OrderedDict
__all__ = ['WeboobCfg']
class WeboobCfg(ReplApplication):
APPNAME = 'weboob-config'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2010-2011 Christophe Benz, Romain Bignon'
DESCRIPTION = "Weboob-Config is a console application to add/edit/remove backends, " \
"and to register new website accounts."
COMMANDS_FORMATTERS = {'backends': 'table',
'list': 'table',
}
DISABLE_REPL = True
weboob_commands = copy(ReplApplication.weboob_commands)
weboob_commands.remove('backends')
def load_default_backends(self):
pass
def do_add(self, line):
"""
add NAME [OPTIONS ...]
Add a configured backend.
"""
if not line:
print >>sys.stderr, 'You must specify a backend name. Hint: use the "backends" command.'
return 2
name, options = self.parse_command_args(line, 2, 1)
if options:
options = options.split(' ')
else:
options = ()
params = {}
# set backend params from command-line arguments
for option in options:
try:
key, value = option.split('=', 1)
except ValueError:
print >>sys.stderr, 'Parameters have to be formatted "key=value"'
return 2
params[key] = value
self.add_backend(name, params)
def do_register(self, line):
"""
register NAME
Register a new account on a backend.
"""
self.register_backend(line)
def do_confirm(self, backend_name):
"""
confirm BACKEND
For a backend which support CapAccount, parse a confirmation mail
after using the 'register' command to automatically confirm the
subscribe.
It takes mail from stdin. Use it with postfix for example.
"""
# Do not use the ReplApplication.load_backends() method because we
# don't want to prompt user to create backend.
self.weboob.load_backends(names=[backend_name])
try:
backend = self.weboob.get_backend(backend_name)
except KeyError:
print >>sys.stderr, 'Error: backend "%s" not found.' % backend_name
return 1
if not backend.has_caps(ICapAccount):
print >>sys.stderr, 'Error: backend "%s" does not support accounts management' % backend_name
return 1
mail = self.acquire_input()
if not backend.confirm_account(mail):
print >>sys.stderr, 'Error: Unable to confirm account creation'
return 1
return 0
def do_list(self, line):
"""
list [CAPS ..]
Show configured backends.
"""
caps = line.split()
for instance_name, name, params in sorted(self.weboob.backends_config.iter_backends()):
backend = self.weboob.modules_loader.get_or_load_module(name)
if caps and not self.caps_included(backend.iter_caps(), caps):
continue
row = OrderedDict([('Instance name', instance_name),
('Backend', name),
('Configuration', ', '.join(
'%s=%s' % (key, ('*****' if key in backend.config and backend.config[key].masked \
else value)) \
for key, value in params.iteritems())),
])
self.format(row)
self.flush()
def do_remove(self, instance_name):
"""
remove NAME
Remove a configured backend.
"""
if not self.weboob.backends_config.remove_backend(instance_name):
print >>sys.stderr, 'Backend instance "%s" does not exist' % instance_name
return 1
def _do_toggle(self, name, state):
try:
bname, items = self.weboob.backends_config.get_backend(name)
except KeyError:
print >>sys.stderr, 'Backend instance "%s" does not exist' % name
return 1
self.weboob.backends_config.edit_backend(name, bname, {'_enabled': state})
def do_enable(self, name):
"""
enable NAME
Enable a disabled backend
"""
self._do_toggle(name, 1)
def do_disable(self, name):
"""
disable NAME
Disable a backend
"""
self._do_toggle(name, 0)
def do_edit(self, line):
"""
edit NAME
Edit a backend
"""
try:
self.edit_backend(line)
except KeyError:
print >>sys.stderr, 'Error: backend "%s" not found' % line
return 1
def do_backends(self, line):
"""
backends [CAPS ...]
Show available backends.
"""
caps = line.split()
self.weboob.modules_loader.load_all()
for name, backend in sorted(self.weboob.modules_loader.loaded.iteritems()):
if caps and not self.caps_included(backend.iter_caps(), caps):
continue
row = OrderedDict([('Name', name),
('Capabilities', ', '.join(cap.__name__ for cap in backend.iter_caps())),
('Description', backend.description),
])
self.format(row)
self.flush()
def do_info(self, line):
"""
info NAME
Display information about a backend.
"""
if not line:
print >>sys.stderr, 'You must specify a backend name. Hint: use the "backends" command.'
return 2
try:
backend = self.weboob.modules_loader.get_or_load_module(line)
except ModuleLoadError:
backend = None
if not backend:
print >>sys.stderr, 'Backend "%s" does not exist.' % line
return 1
print '.------------------------------------------------------------------------------.'
print '| Backend %-68s |' % backend.name
print "+-----------------.------------------------------------------------------------'"
print '| Version | %s' % backend.version
print '| Maintainer | %s' % backend.maintainer
print '| License | %s' % backend.license
print '| Description | %s' % backend.description
print '| Capabilities | %s' % ', '.join([cap.__name__ for cap in backend.iter_caps()])
first = True
for key, field in backend.config.iteritems():
value = field.label
if not field.default is None:
value += ' (default: %s)' % field.default
if first:
print '| | '
print '| Configuration | %s: %s' % (key, value)
first = False
else:
print '| | %s: %s' % (key, value)
print "'-----------------'"
def do_applications(self, line):
"""
applications
Show applications.
"""
applications = set()
import weboob.applications
for path in weboob.applications.__path__:
regexp = re.compile('^%s/([\w\d_]+)$' % path)
for root, dirs, files in os.walk(path):
m = regexp.match(root)
if m and '__init__.py' in files:
applications.add(m.group(1))
print ' '.join(sorted(applications)).encode('utf-8')
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/weboobcli/ 0000775 0000000 0000000 00000000000 11666415431 0031244 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000001432 11666415431 0033276 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/weboobcli # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from .weboobcli import WeboobCli
__all__ = ['WeboobCli']
weboobcli.py 0000664 0000000 0000000 00000003254 11666415431 0033510 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/weboobcli # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
import sys
from weboob.tools.application.repl import ReplApplication
__all__ = ['WeboobCli']
class WeboobCli(ReplApplication):
APPNAME = 'weboob-cli'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2010-2011 Romain Bignon'
SYNOPSIS = 'Usage: %prog [-dqv] [-b backends] [-cnfs] capability method [arguments..]\n'
SYNOPSIS += ' %prog [--help] [--version]'
DESCRIPTION = "Weboob-Cli is a console application to call a specific method on backends " \
"which implement the given capability."
DISABLE_REPL = True
def load_default_backends(self):
pass
def main(self, argv):
if len(argv) < 3:
print >>sys.stderr, "Syntax: %s capability method [args ..]" % argv[0]
return 2
cap_s = argv[1]
cmd = argv[2]
args = argv[3:]
self.load_backends(cap_s)
for backend, obj in self.do(cmd, *args):
self.format(obj)
return 0
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/weboobdebug/ 0000775 0000000 0000000 00000000000 11666415431 0031563 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000001441 11666415431 0033615 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/weboobdebug # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Christophe Benz
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from .weboobdebug import WeboobDebug
__all__ = ['WeboobDebug']
weboobdebug.py 0000664 0000000 0000000 00000003453 11666415431 0034347 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/weboobdebug # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Christophe Benz
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
import sys
from weboob.tools.application.repl import ReplApplication
class WeboobDebug(ReplApplication):
APPNAME = 'weboobdebug'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2010-2011 Christophe Benz'
DESCRIPTION = "Weboob-Debug is a console application to debug backends."
def load_default_backends(self):
pass
def do_shell(self, backend_name):
"""
shell BACKEND
Debug a backend.
"""
try:
backend = self.weboob.load_backends(names=[backend_name])[backend_name]
except KeyError:
print >>sys.stderr, u'Unable to load backend "%s"' % backend_name
return 1
browser = backend.browser
from IPython.Shell import IPShellEmbed
shell = IPShellEmbed(argv=[])
locs = dict(backend=backend, browser=browser, application=self, weboob=self.weboob)
banner = 'Weboob debug shell\nBackend "%s" loaded.\nAvailable variables: %s' % (backend_name, locs)
shell.set_banner(shell.IP.BANNER + '\n\n' + banner)
shell(local_ns=locs, global_ns={})
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/weboorrents/ 0000775 0000000 0000000 00000000000 11666415431 0031650 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000001440 11666415431 0033701 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/weboorrents # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from .weboorrents import Weboorrents
__all__ = ['Weboorrents']
weboorrents.py 0000664 0000000 0000000 00000012664 11666415431 0034525 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/weboorrents # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from __future__ import with_statement
import sys
from weboob.capabilities.torrent import ICapTorrent
from weboob.tools.application.repl import ReplApplication
from weboob.tools.application.formatters.iformatter import IFormatter
__all__ = ['Weboorrents']
def sizeof_fmt(num):
for x in ['bytes','KB','MB','GB','TB']:
if num < 1024.0:
return "%-4.1f%s" % (num, x)
num /= 1024.0
class TorrentInfoFormatter(IFormatter):
MANDATORY_FIELDS = ('id', 'name', 'size', 'seeders', 'leechers', 'url', 'files', 'description')
def flush(self):
pass
def format_dict(self, item):
result = u'%s%s%s\n' % (self.BOLD, item['name'], self.NC)
result += 'ID: %s\n' % item['id']
result += 'Size: %s\n' % sizeof_fmt(item['size'])
result += 'Seeders: %s\n' % item['seeders']
result += 'Leechers: %s\n' % item['leechers']
result += 'URL: %s\n' % item['url']
if item['files']:
result += '\n%sFiles%s\n' % (self.BOLD, self.NC)
for f in item['files']:
result += ' * %s\n' % f
result += '\n%sDescription%s\n' % (self.BOLD, self.NC)
result += item['description']
return result
class TorrentListFormatter(IFormatter):
MANDATORY_FIELDS = ('id', 'name', 'size', 'seeders', 'leechers')
count = 0
def flush(self):
self.count = 0
pass
def format_dict(self, item):
self.count += 1
if self.interactive:
backend = item['id'].split('@', 1)[1]
result = u'%s* (%d) %s (%s)%s\n' % (self.BOLD, self.count, item['name'], backend, self.NC)
else:
result = u'%s* (%s) %s%s\n' % (self.BOLD, item['id'], item['name'], self.NC)
size = sizeof_fmt(item['size'])
result += ' %10s (Seed: %2d / Leech: %2d)' % (size, item['seeders'], item['leechers'])
return result
class Weboorrents(ReplApplication):
APPNAME = 'weboorrents'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2010-2011 Romain Bignon'
DESCRIPTION = 'Console application allowing to search for torrents on various trackers ' \
'and download .torrent files.'
CAPS = ICapTorrent
EXTRA_FORMATTERS = {'torrent_list': TorrentListFormatter,
'torrent_info': TorrentInfoFormatter,
}
COMMANDS_FORMATTERS = {'search': 'torrent_list',
'info': 'torrent_info',
}
def complete_info(self, text, line, *ignored):
args = line.split(' ')
if len(args) == 2:
return self._complete_object()
def do_info(self, id):
"""
info ID
Get information about a torrent.
"""
_id, backend_name = self.parse_id(id)
found = 0
for backend, torrent in self.do('get_torrent', _id, backends=backend_name):
if torrent:
self.format(torrent)
found = 1
if not found:
print >>sys.stderr, 'Torrent "%s" not found' % id
return 3
else:
self.flush()
def complete_getfile(self, text, line, *ignored):
args = line.split(' ', 2)
if len(args) == 2:
return self._complete_object()
elif len(args) >= 3:
return self.path_completer(args[2])
def do_getfile(self, line):
"""
getfile ID [FILENAME]
Get the .torrent file.
FILENAME is where to write the file. If FILENAME is '-',
the file is written to stdout.
"""
id, dest = self.parse_command_args(line, 2, 1)
_id, backend_name = self.parse_id(id)
if dest is None:
dest = '%s.torrent' % _id
for backend, buf in self.do('get_torrent_file', _id, backends=backend_name):
if buf:
if dest == '-':
print buf
else:
try:
with open(dest, 'w') as f:
f.write(buf)
except IOError, e:
print >>sys.stderr, 'Unable to write .torrent in "%s": %s' % (dest, e)
return 1
return
print >>sys.stderr, 'Torrent "%s" not found' % id
return 3
def do_search(self, pattern):
"""
search [PATTERN]
Search torrents.
"""
self.change_path('/search')
if not pattern:
pattern = None
self.set_formatter_header(u'Search pattern: %s' % pattern if pattern else u'Latest torrents')
for backend, torrent in self.do('iter_torrents', pattern=pattern):
self.add_object(torrent)
self.format(torrent)
self.flush()
woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/wetboobs/ 0000775 0000000 0000000 00000000000 11666415431 0031123 5 ustar 00root root 0000000 0000000 __init__.py 0000664 0000000 0000000 00000001427 11666415431 0033161 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/wetboobs # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from .wetboobs import WetBoobs
__all__ = ['WetBoobs']
wetboobs.py 0000664 0000000 0000000 00000011601 11666415431 0033241 0 ustar 00root root 0000000 0000000 woob-037c35f40908f1833bda001fd8cdb2c9338a53d4-weboob-applications/weboob/applications/wetboobs # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see .
from datetime import datetime
from weboob.capabilities.weather import ICapWeather
from weboob.tools.application.repl import ReplApplication
from weboob.tools.application.formatters.iformatter import IFormatter
__all__ = ['WetBoobs']
class ForecastsFormatter(IFormatter):
MANDATORY_FIELDS = ('id', 'date', 'low', 'high', 'unit')
def flush(self):
pass
def format_dict(self, item):
result = u'%s* %-15s%s (%s°%s - %s°%s)' % (self.BOLD, '%s:' % item['date'], self.NC, item['low'], item['unit'], item['high'], item['unit'])
if 'text' in item and item['text']:
result += ' %s' % item['text']
return result
class CurrentFormatter(IFormatter):
MANDATORY_FIELDS = ('id', 'date', 'temp')
def flush(self):
pass
def format_dict(self, item):
if isinstance(item['date'], datetime):
date = item['date'].strftime('%y-%m-%d %H:%M:%S')
else:
date = item['date']
result = u'%s%s%s: %s' % (self.BOLD, date, self.NC, item['temp'])
if 'unit' in item and item['unit']:
result += u'°%s' % item['unit']
if 'text' in item and item['text']:
result += u' - %s' % item['text']
return result
class CitiesFormatter(IFormatter):
MANDATORY_FIELDS = ('id', 'name')
count = 0
def flush(self):
self.count = 0
def format_dict(self, item):
self.count += 1
if self.interactive:
backend = item['id'].split('@', 1)[1]
result = u'%s* (%d) %s (%s)%s' % (self.BOLD, self.count, item['name'], backend, self.NC)
else:
result = u'%s* (%s) %s%s' % (self.BOLD, item['id'], item['name'], self.NC)
return result
class WetBoobs(ReplApplication):
APPNAME = 'wetboobs'
VERSION = '0.9.1'
COPYRIGHT = 'Copyright(C) 2010-2011 Romain Bignon'
DESCRIPTION = 'Console application allowing to display weather and forecasts in your city.'
CAPS = ICapWeather
EXTRA_FORMATTERS = {'cities': CitiesFormatter,
'current': CurrentFormatter,
'forecasts': ForecastsFormatter,
}
COMMANDS_FORMATTERS = {'search': 'cities',
'current': 'current',
'forecasts': 'forecasts',
}
cities = []
def do_search(self, pattern):
"""
search PATTERN
Search cities.
"""
self.cities = []
for backend, city in self.do('iter_city_search', pattern):
self.format(city)
self.cities.append(city)
self.flush()
def parse_id(self, id):
if self.interactive:
try:
city = self.cities[int(id) - 1]
except (IndexError,ValueError):
pass
else:
id = '%s@%s' % (city.id, city.backend)
return ReplApplication.parse_id(self, id)
def _complete_id(self):
return ['%s@%s' % (city.id, city.backend) for city in self.cities]
def complete_current(self, text, line, *ignored):
args = line.split(' ')
if len(args) == 2:
return self._complete_id()
def do_current(self, line):
"""
current CITY_ID
Get current weather for specified city. Use the 'search' command to find
its ID.
"""
city, = self.parse_command_args(line, 1, 1)
_id, backend_name = self.parse_id(city)
for backend, current in self.do('get_current', _id, backends=backend_name):
if current:
self.format(current)
self.flush()
def complete_forecasts(self, text, line, *ignored):
args = line.split(' ')
if len(args) == 2:
return self._complete_id()
def do_forecasts(self, line):
"""
forecasts CITY_ID
Get forecasts for specified city. Use the 'search' command to find
its ID.
"""
city, = self.parse_command_args(line, 1, 1)
_id, backend_name = self.parse_id(city)
for backend, forecast in self.do('iter_forecast', _id, backends=backend_name):
self.format(forecast)
self.flush()