# -*- coding: utf-8 -*- # Copyright(C) 2011 Romain Bignon # # This file is part of weboob. # # weboob is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # weboob is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with weboob. If not, see . from datetime import datetime, timedelta import re import sys from urlparse import urlsplit from random import choice from weboob.capabilities.content import ICapContent from weboob.tools.application.repl import ReplApplication from weboob.tools.ordereddict import OrderedDict __all__ = ['Boobathon'] class Task(object): STATUS_NONE = 0 STATUS_PROGRESS = 1 STATUS_DONE = 2 def __init__(self, backend, capability): self.backend = backend self.capability = capability self.status = self.STATUS_NONE self.date = None self.branch = u'' def __repr__(self): return '' % (self.backend, self.capability) class Member(object): def __init__(self, id, name): self.name = name self.id = id self.tasks = [] self.availabilities = u'' self.repository = None self.hardware = u'' self.is_me = False def shortname(self): name = self.name if len(name) > 20: name = '%s..' % name[:18] return name class Event(object): def __init__(self, name, backend): self.my_id = backend.browser.get_userid() self.name = 'wiki/weboob/%s' % name self.description = None self.date = None self.begin = None self.end = None self.location = None self.winner = None self.backend = backend self.members = OrderedDict() self.load() def get_me(self): return self.members.get(self.backend.browser.get_userid(), None) def currently_in_event(self): if not self.date or not self.begin or not self.end: return False return self.begin < datetime.now() < self.end def is_closed(self): return self.end < datetime.now() def format_duration(self): if not self.begin or not self.end: return None delta = self.end - self.begin return '%02d:%02d' % (delta.seconds/3600, delta.seconds%3600) def check_time_coherence(self): """ Check if the end's day is before the begin's one, in case it stops at the next day (eg. 15h->1h). If it occures, add a day. """ if self.begin > self.end: self.end = self.end + timedelta(1) def load(self): self.content = self.backend.get_content(self.name) self.members.clear() member = None for line in self.content.content.split('\n'): line = line.strip() if line.startswith('h1. '): self.title = line[4:] elif line.startswith('h3=. '): m = re.match('h3=. Event finished. Winner is "(.*)":/users/(\d+)\!', line) if not m: print >>sys.stderr, 'Unable to parse h3=: %s' % line continue self.winner = Member(int(m.group(2)), m.group(1)) elif line.startswith('h2. '): continue elif line.startswith('h3. '): m = re.match('h3. "(.*)":/users/(\d+)', line) if not m: print >>sys.stderr, 'Unable to parse user "%s"' % line continue member = Member(int(m.group(2)), m.group(1)) if member.id == self.my_id: member.is_me = True if self.winner is not None and member.id == self.winner.id: self.winner = member self.members[member.id] = member elif self.description is None and len(line) > 0 and line != '{{TOC}}': self.description = line elif line.startswith('* '): m = re.match('\* \*(\w+)\*: (.*)', line) if not m: continue key, value = m.group(1), m.group(2) if member is None: if key == 'Date': self.date = self.parse_date(value) elif key == 'Start' or key == 'Begin': self.begin = self.parse_time(value) elif key == 'End': self.end = self.parse_time(value) self.check_time_coherence() elif key == 'Location': self.location = value else: if key == 'Repository': m = re.match('"(.*.git)":.*', value) if m: member.repository = m.group(1) else: member.repository = value elif key == 'Hardware': member.hardware = value elif key == 'Availabilities': member.availabilities = value elif line.startswith('[['): m = re.match('\[\[(\w+)\]\]\|\[\[(\w+)\]\]\|(.*)\|', line) if not m: print >>sys.stderr, 'Unable to parse task: "%s"' % line continue task = Task(m.group(1), m.group(2)) member.tasks.append(task) if m.group(3) == '!/img/weboob/_progress.png!': task.status = task.STATUS_PROGRESS continue mm = re.match('!/img/weboob/_done.png! (\d+):(\d+) (\w+)', m.group(3)) if mm and self.date: task.status = task.STATUS_DONE task.date = datetime(self.date.year, self.date.month, self.date.day, int(mm.group(1)), int(mm.group(2))) task.branch = mm.group(3) def parse_date(self, value): try: return datetime.strptime(value, '%Y-%m-%d') except ValueError: return None def parse_time(self, value): m = re.match('(\d+):(\d+)', value) if not m: return try: return self.date.replace(hour=int(m.group(1)), minute=int(m.group(2))) except ValueError: return None def save(self, message): if self.winner: finished = u'\nh3=. Event finished. Winner is "%s":/users/%d!\n' % (self.winner.name, self.winner.id) else: finished = u'' s = u"""h1. %s {{TOC}} %s h2. Event %s * *Date*: %s * *Begin*: %s * *End*: %s * *Duration*: %s * *Location*: %s h2. Attendees """ % (self.title, finished, self.description, self.date.strftime('%Y-%m-%d') if self.date else '_Unknown_', self.begin.strftime('%H:%M') if self.begin else '_Unknown_', self.end.strftime('%H:%M') if self.end else '_Unknown_', self.format_duration() or '_Unknown_', self.location or '_Unknown_') for member in self.members.itervalues(): if self.date: availabilities = '' else: availabilities = '* *Availabilities*: %s' % member.availabilities if member.repository is None: repository = '_Unknown_' elif member.repository.endswith('.git'): repository = '"%s":git://git.symlink.me/pub/%s ("http":http://git.symlink.me/?p=%s;a=summary)' repository = repository.replace('%s', member.repository) else: repository = member.repository s += u"""h3. "%s":/users/%d * *Repository*: %s * *Hardware*: %s %s |_.Backend|_.Capabilities|_.Status|""" % (member.name, member.id, repository, member.hardware, availabilities) for task in member.tasks: if task.status == task.STATUS_DONE: status = '!/img/weboob/_done.png! %02d:%02d %s' % (task.date.hour, task.date.minute, task.branch) elif task.status == task.STATUS_PROGRESS: status = '!/img/weboob/_progress.png!' else: status = ' ' s += u""" |=.!/img/weboob/%s.png!:/projects/weboob/wiki/%s [[%s]]|[[%s]]|%s|""" % (task.backend.lower(), task.backend, task.backend, task.capability, status) s += '\n\n' self.content.content = s self.backend.push_content(self.content, message) class Boobathon(ReplApplication): APPNAME = 'boobathon' VERSION = '0.9.1' COPYRIGHT = 'Copyright(C) 2011 Romain Bignon' DESCRIPTION = 'Console application to participate to a Boobathon.' CAPS = ICapContent SYNOPSIS = 'Usage: %prog [-dqv] [-b backends] [-cnfs] boobathon\n' SYNOPSIS += ' %prog [--help] [--version]' radios = [] def __init__(self, *args, **kwargs): ReplApplication.__init__(self, *args, **kwargs) def main(self, argv): if len(argv) < 2: print >>sys.stderr, 'Please give the name of the boobathon' return 1 self.event = Event(argv[1], choice(self.weboob.backend_instances.values())) if self.event.description is None: if not self.ask("This event doesn't seem to exist. Do you want to create it?", default=True): return 1 self.edit_event() self.save_event('Event created') return ReplApplication.main(self, [argv[0]]) def save_event(self, message): if self.ask("Do you confirm your changes?", default=True): self.event.save(message) return True return False def edit_event(self): self.event.title = self.ask('Enter a title', default=self.event.title) self.event.description = self.ask('Enter a description', default=self.event.description) self.event.date = self.ask('Enter a date (yyyy-mm-dd)', default=self.event.date.strftime('%Y-%m-%d') if self.event.date else '', regexp='^(\d{4}-\d{2}-\d{2})?$') if self.event.date: self.event.date = datetime.strptime(self.event.date, '%Y-%m-%d') s = self.ask('Begin at (HH:MM)', default=self.event.begin.strftime('%H:%M') if self.event.begin else '', regexp='^(\d{2}:\d{2})?$') if s: h, m = s.split(':') self.event.begin = self.event.date.replace(hour=int(h), minute=int(m)) s = self.ask('End at (HH:MM)', default=self.event.end.strftime('%H:%M') if self.event.end else '', regexp='^(\d{2}:\d{2})?$') if s: h, m = s.split(':') self.event.end = self.event.date.replace(hour=int(h), minute=int(m)) self.event.check_time_coherence() self.event.location = self.ask('Enter a location', default=self.event.location) def edit_member(self, member): if member.name is None: firstname = self.ask('Enter your firstname') lastname = self.ask('Enter your lastname') member.name = '%s %s' % (firstname, lastname) else: member.name = self.ask('Enter your name', default=member.name) if self.event.date is None: member.availabilities = self.ask('Enter availabilities', default=member.availabilities) member.repository = self.ask('Enter your repository (ex. romain/weboob.git)', default=member.repository) member.hardware = self.ask('Enter your hardware', default=member.hardware) def do_progress(self, line): """ progress Display progress of members. """ self.event.load() for member in self.event.members.itervalues(): if member.is_me and member is self.event.winner: status = '\o/ ->' elif member.is_me: status = ' ->' elif member is self.event.winner: status = ' \o/' else: status = ' ' s = u' %s%20s %s|' % (status, member.shortname(), self.BOLD) for task in member.tasks: if task.status == task.STATUS_DONE: s += '##' elif task.status == task.STATUS_PROGRESS: s += u'=>' else: s += ' ' s += '|%s' % self.NC print s print '' now = datetime.now() if self.event.begin > now: d = self.event.begin - now msg = 'The event will start in %d days, %02d:%02d:%02d' elif self.event.end < now: d = now - self.event.end msg = 'The event is finished since %d days, %02d:%02d:%02d' else: tot = (self.event.end - self.event.begin).seconds cur = (datetime.now() - self.event.begin).seconds pct = cur*20/tot progress = '' for i in xrange(20): if i < pct: progress += '=' elif i == pct: progress += '>' else: progress += ' ' print 'Event started: %s |%s| %s' % (self.event.begin.strftime('%H:%M'), progress, self.event.end.strftime('%H:%M')) d = self.event.end - now msg = 'The event will be finished in %d days, %02d:%02d:%02d' print msg % (d.days, d.seconds/3600, d.seconds%3600/60, d.seconds%60) def do_tasks(self, line): """ tasks Display all tasks of members. """ self.event.load() stop = False i = -2 while not stop: if i >= 0 and not i%2: sys.stdout.write(' #%-2d' % (i/2)) else: sys.stdout.write(' ') if i >= 0 and i%2: # second line of task, see if we'll stop stop = True for mem in self.event.members.itervalues(): if len(mem.tasks) > (i/2+1): # there are more tasks, don't stop now stop = False if i == -2: sys.stdout.write(' %s%-20s%s' % (self.BOLD, mem.shortname().encode('utf-8'), self.NC)) elif i == -1: sys.stdout.write(' %s%-20s%s' % (self.BOLD, '-' * len(mem.shortname()), self.NC)) elif len(mem.tasks) <= (i/2): sys.stdout.write(' ' * (20+1)) else: task = mem.tasks[i/2] if task.status == task.STATUS_DONE: status = u'#' elif task.status == task.STATUS_PROGRESS: if not i%2: status = u'|' #1st line else: status = u'v' #2nd line else: status = u' ' if not i%2: #1st line line = u'%s %s' % (status, task.backend) else: #2nd line line = u'%s `-%s' % (status, task.capability[3:]) sys.stdout.write((u' %-20s' % line).encode('utf-8')) sys.stdout.write('\n') i += 1 def complete_close(self, text, line, *ignored): args = line.split(' ') if len(args) == 2: self.event.load() return [member.name for member in self.event.members.itervalues()] def do_close(self, name): """ close WINNER Close the event and set the winner. """ self.event.load() for member in self.event.members.itervalues(): if member.name == name: self.winner = member if self.save_event('Close event'): print 'Event is now closed. Winner is %s!' % self.winner.name return print >>sys.stderr, '"%s" not found' % name return 3 def complete_edit(self, text, line, *ignored): args = line.split(' ') if len(args) == 2: return ['event', 'me'] def do_edit(self, line): """ edit [event | me] Edit information about you or about event. """ if not line: print >>sys.stderr, 'Syntax: edit [event | me]' return 2 self.event.load() if line == 'event': self.edit_event() self.save_event('Event edited') elif line == 'me': mem = self.event.get_me() if not mem: print >>sys.stderr, 'You haven\'t joined the event.' return 1 self.edit_member(mem) self.save_event('Member edited') else: print >>sys.stderr, 'Unable to edit "%s"' % line return 1 def do_info(self, line): """ info Display information about this event. """ self.event.load() print self.event.title print '-' * len(self.event.title) print self.event.description print '' print 'Date:', self.event.date.strftime('%Y-%m-%d') if self.event.date else 'Unknown' print 'Begin:', self.event.begin.strftime('%H:%M') if self.event.begin else 'Unknown' print 'End:', self.event.end.strftime('%H:%M') if self.event.end else 'Unknown' print 'Duration:', self.event.format_duration() or 'Unknown' print 'Location:', self.event.location or 'Unknown' print '' print 'There are %d members, use the "members" command to list them' % len(self.event.members) if self.event.get_me() is None: print 'To join this event, use the command "join".' def do_members(self, line): """ members Display members informations. """ self.event.load() for member in self.event.members.itervalues(): print member.name print '-' * len(member.name) print 'Repository:', member.repository if self.event.date is None: print 'Availabilities:', member.availabilities print 'Hardware:', member.hardware accompl = 0 for task in member.tasks: if task.status == task.STATUS_DONE: accompl += 1 print '%d tasks (%d accomplished)' % (len(member.tasks), accompl) if member is self.event.winner: print '=== %s is the winner!' % member.name print '' print 'Use the "tasks" command to display all tasks' def do_join(self, line): """ join Join this event. """ self.event.load() if self.event.backend.browser.get_userid() in self.event.members: print >>sys.stderr, 'You have already joined this event.' return 1 if self.event.is_closed(): print >>sys.stderr, "Boobathon is closed." return 1 m = Member(self.event.backend.browser.get_userid(), None) self.edit_member(m) self.event.members[m.id] = m self.save_event('Joined the event') def do_leave(self, line): """ leave Leave this event. """ self.event.load() if self.event.currently_in_event(): print >>sys.stderr, 'Unable to leave during the event, loser!' return 1 if self.event.is_closed(): print >>sys.stderr, "Boobathon is closed." return 1 try: self.event.members.pop(self.event.backend.browser.get_userid()) except KeyError: print >>sys.stderr, "You have not joined this event." return 1 else: self.save_event('Left the event') def do_remtask(self, line): """ remtask TASK_ID Remove a task. """ self.event.load() mem = self.event.get_me() if not mem: print >>sys.stderr, "You have not joined this event." return 1 if self.event.is_closed(): print >>sys.stderr, "Boobathon is closed." return 1 try: task_id = int(line) except ValueError: print >>sys.stderr, 'The task ID should be a number' return 2 try: task = mem.tasks.pop(task_id) except IndexError: print >>sys.stderr, 'Unable to find task #%d' % task_id return 1 else: print 'Removing task #%d (%s,%s).' % (task_id, task.backend, task.capability) self.save_event('Remove task') def do_addtask(self, line): """ addtask BACKEND CAPABILITY Add a new task. """ self.event.load() mem = self.event.get_me() if not mem: print >>sys.stderr, "You have not joined this event." return 1 if self.event.is_closed(): print >>sys.stderr, "Boobathon is closed." return 1 backend, capability = self.parse_command_args(line, 2, 2) if not backend[0].isupper(): print >>sys.stderr, 'The backend name "%s" needs to start with a capital.' % backend return 2 if not capability.startswith('Cap') or not capability[3].isupper(): print >>sys.stderr, '"%s" is not a proper capability name (must start with Cap).' % capability return 2 for task in mem.tasks: if (task.backend,task.capability) == (backend,capability): print >>sys.stderr, "A task already exists for that." return 1 task = Task(backend, capability) mem.tasks.append(task) self.save_event('New task') def do_start(self, line): """ start [TASK_ID] Start a task. If you don't give a task ID, the first available task will be taken. """ self.event.load() mem = self.event.get_me() if not mem: print >>sys.stderr, "You have not joined this event." return 1 if len(mem.tasks) == 0: print >>sys.stderr, "You don't have any task to do." return 1 if not self.event.currently_in_event(): print >>sys.stderr, "You can't start a task, we are not in event." return 1 if line.isdigit(): task_id = int(line) else: task_id = -1 last_done = -1 for i, task in enumerate(mem.tasks): if task.status == task.STATUS_DONE: last_done = i elif task.status == task.STATUS_PROGRESS: task.status = task.STATUS_NONE print 'Task #%s (%s,%s) canceled.' % (i, task.backend, task.capability) if (i == task_id or task_id < 0) and task.status == task.STATUS_NONE: break else: print >>sys.stderr, 'Task not found.' return 3 if task.status == task.STATUS_DONE: print >>sys.stderr, 'Task is already done.' return 1 task.status = task.STATUS_PROGRESS mem.tasks.remove(task) mem.tasks.insert(last_done + 1, task) self.save_event('Started a task') def do_done(self, line): """ done Set the current task as done. """ self.event.load() mem = self.event.get_me() if not mem: print >>sys.stderr, "You have not joined this event." return 1 if self.event.is_closed(): print >>sys.stderr, "Boobathon is closed." return 1 for i, task in enumerate(mem.tasks): if task.status == task.STATUS_PROGRESS: print 'Task (%s,%s) done! (%d%%)' % (task.backend, task.capability, (i+1)*100/len(mem.tasks)) if self.event.currently_in_event(): task.status = task.STATUS_DONE task.date = datetime.now() task.branch = self.ask('Enter name of branch') self.save_event('Task accomplished') else: task.status = task.STATUS_NONE print >>sys.stderr, 'Oops, you are out of event. Canceling the task...' self.save_event('Cancel task') return 1 return print >>sys.stderr, "There isn't any task in progress." return 1 def do_cancel(self, line): """ cancel Cancel the current task. """ self.event.load() mem = self.event.get_me() if not mem: print >>sys.stderr, "You have not joined this event." return 1 if self.event.is_closed(): print >>sys.stderr, "Boobathon is closed." return 1 for task in mem.tasks: if task.status == task.STATUS_PROGRESS: print 'Task (%s,%s) canceled.' % (task.backend, task.capability) task.status = task.STATUS_NONE self.save_event('Cancel task') return print >>sys.stderr, "There isn't any task in progress." return 1 def load_default_backends(self): """ Overload a BaseApplication method. """ for instance_name, backend_name, params in self.weboob.backends_config.iter_backends(): if backend_name != 'redmine': continue v = urlsplit(params['url']) if v.netloc == 'symlink.me': self.load_backends(names=[instance_name]) return if not self.check_loaded_backends({'url': 'https://symlink.me'}): print "Ok, so leave now, fag." sys.exit(0) def is_backend_loadable(self, backend): """ Overload a ConsoleApplication method. """ return backend.name == 'redmine'