Commits

pystew committed daf735b

upd

  • Participants
  • Parent commits d4f7cea

Comments (0)

Files changed (5)

api.py

-import tcp
-
-class Protocol(tcp.Client):
-    def INFO(self): # get server infos
-        c, a = self.req('INFO')
-        if c=='100':
-            raws = list(self.readlines())
-            return ' - '.join(raws[i] for i in (4,2,3))
-    def NOOP(self): # NOP
-        c, a = self.req('NOOP')
-        assert c=='200'
-    def QUIT(self): # close cnx
-        c, a = self.req('QUIT')
-    def CREU(self, usr, psw): # create usr
-        c, a = self.req('CREU', usr, psw)
-    def VALI(self, usr, level): # validate usr
-        c, a = self.req('VALI', usr, level)
-        return c=='200'
-    def LOUT(self): # logout
-        c, a = self.req('LOUT')
-    def USER(self, username): # send login
-        c, a = self.req('USER', username)
-        return c=='300'
-    def PASS(self, password): # send psw
-        c, a = self.req('PASS',  password)
-        return c=='200'
-    def QUSR(self, usr, ignore=False): # chk usr
-        if ignore: ignore='570'
-        res = self.req('QUSR',  usr, ignore=ignore)
-        if res: return res[1]
-    def GOTO(self, room): # goto room
-        c, a = self.req('GOTO',  room)
-        return c=='200'
-    def REGI(self, name): # set usr info
-        c, a = self.req('REGI')
-        if c=='400':
-            lines = [name, 'adr', 'vil', 'st', '33333', 'tel', 'mail', 'fr']
-            self.writelines(lines)
-        else:
-            self.get()
-            self.writeline('000')
-    def RENU(self, old, new): # chg usr login
-        self.req('RENU', old, new)
-    def LIST(self): # list users
-        c, a = self.req('LIST')
-        for lin in self.readlines():
-            login, level, num, last, log, msg, psw, what = lin
-            yield login, level
-    def ASUP(self, usr, p, f, t, m, a, n, s, g): # set usr params
-        c, a = self.req('ASUP', usr, p, f, t, m, a, n , s, g)
-        return c=='200'
-    def LFLR(self): # list floors
-        c, a = self.req('LFLR')
-        assert c=='100'
-        for floor in self.readlines():
-            yield (
-                floor[0], # num
-                floor[1], # name
-                floor[2], # refcount
-            )
-    def LKRA(self, floor=-1): # known rooms
-        c, a = self.req('LKRA %d'%floor)
-        assert c=='100'
-        for room in self.readlines():
-            yield (
-                room[0],
-                room[1],
-                room[2],
-                room[3],
-                room[4],
-                room[5],
-            )
-    def ENT0(self, post='1', to='', anon='0', fmt='0', sbj='', usr='', confirm='0', cc='', bcc='', msg=''):
-        c, a = self.req('ENT0', post, to, anon, fmt, sbj, usr, confirm, cc, bcc)
-        if c=='400':
-            self.writelines(msg, log=True)
-        else:
-            print '? ? ?', c, a

citadel.py

-import api
-import tpl
-
-class Citadel(api.Protocol):
-    def __init__(self, config=None):
-        self.config = config
-        api.Protocol.__init__(self)
-    @property
-    def admin_login(self):
-        if not hasattr(self, '_login'):
-            if hasattr(self.config, 'admin_login'):
-                self._login = self.config.admin_login
-            else:
-                self._login = raw_input('Admin LOGIN :')
-        return self._login
-    @property
-    def admin_psw(self):
-        if not hasattr(self, '_psw'):
-            if hasattr(self.config, 'admin_psw'):
-                self._psw = self.config.admin_psw
-            else:
-                self._psw = raw_input('Admin PASSWORD :')
-        return self._psw
-    def admin(self):
-        return self.login(self.admin_login, self.admin_psw)
-    def close(self):
-        self.QUIT()
-        self.quit()
-    def login(self, usr, psw):
-        return self.USER(usr) and self.PASS(psw)
-    def chku(self, usr, ignore=False):
-        return self.QUSR(usr, ignore=ignore)==usr
-    def creu(self, usr, psw, lvl='4'):
-        if self.chku(usr, ignore=True):  return True
-        self.admin()
-        self.CREU(usr, psw)
-        valid = self.VALI(usr, lvl)
-        self.LOUT()
-        return valid and self.chku(usr)
-    def delu(self, usr):
-        self.admin()
-        c = self.ASUP(usr, '0','0','0','0','0','0','0','0')
-        self.LOUT()
-        return c
-    def listu(self):
-        for usr, level in sorted(self.LIST()):
-            if not usr.startswith('SYS_') and level<'6':
-                yield usr
-    def mail(self, dst, sbj, txt): # post a mail
-        self.GOTO('mail')
-        msg = [' %s'%i for i in txt.split('\n')]
-        self.ENT0(to=dst, sbj=sbj, msg=msg)
-    def vcard(self, user, first, name, full): # post a vcard
-        msg = tpl.vcf%(user, user, name, first, full, user)
-        msg = msg.lstrip().split('\n')
-        self.ENT0(fmt='4', msg=msg)
-    def updusr(self, usr, psw, pre, nom, full): # update user info
-        self.login(usr, psw)
-        self.GOTO('My Citadel Config')
-        self.vcard(usr, pre, nom, full)
-        self.LOUT()
-    def creuser(self, login, psw, pre, nom, full):
-        if self.creu(login, psw):
-            self.updusr(login, psw, pre, nom, full)
-

tcp.py

-import socket
-
-class Client(object):
-    def __init__(self):
-        cfg = self.config
-        self.logfil = hasattr(cfg, 'log') and cfg.log
-        adr = hasattr(cfg, 'server_adr') and cfg.server_adr or '127.0.0.1'
-        prt = hasattr(cfg, 'server_prt') and cfg.server_prt or 504
-        self.sock = self.connect(adr, prt)
-        rf = self.sock.makefile('rb', -1)
-        wf = self.sock.makefile('wb', 0)
-        self.read = rf.readline
-        self.write = wf.write
-        c, a = self.get()
-        self.log(a[0], 'w')
-    def log(self, txt, mode='a'):
-        if hasattr(self, 'logfil'):
-            log = open(self.logf, mode)
-            log.write('%s\n'%txt)
-            log.close
-    def connect(self, adr, port):
-        for af, socktype, proto, cn, sa in socket.getaddrinfo(
-            adr , port , 0 , socket.SOCK_STREAM
-        ):
-            try:
-                sock = None
-                sock = socket.socket(af, socktype, proto)
-                sock.connect(sa)
-                break
-            except socket.error:
-                if sock: sock.close()
-        else:
-            raise socket.error
-        return sock
-    def quit(self):
-        self.sock.close()
-        self.sock = None
-    def readline(self, log=True):
-        line = self.read().rstrip('\n')
-        if log: self.log('<-   %s'%line)
-        return line
-    def readlines(self):
-        while True:
-            line = self.readline(log=False)
-            if line=='000': break
-            data =  line.split('|')
-            yield data[0] if len(data)==1 else data
-    def writeline(self, line, log=True):
-        if log: self.log('  -> %s'%line)
-        self.write(u'%s\n' % unicode(line))
-    def writelines(self, lines, log=False):
-        for line in lines:
-            self.writeline(line, log)
-        self.writeline('000', log)
-    def get(self):
-        code, args = self.readline().split(' ', 1)
-        return code, args.split('|')
-    def req(self, *arg, **kw):
-        ignore = kw.get('ignore')
-        cmd = '%s %s' % (arg[0], '|'.join(arg[1:]))
-        self.writeline(cmd)
-        c, a = self.get()
-        if len(a)==1: a=a[0]
-        if c.startswith('5') and c!=ignore:
-            print '\n! ! ! ERROR %s : %s ! ! !\n' % (c, a)
-            return None, None
-        else:
-            return c, a

test.py

-from citadel import Citadel
-
-import config
-
-citadel = Citadel(config)
-print citadel.INFO()
-print '\n'.join(citadel.listu())
-citadel.close()

tpl.py

-vcf = '''
-Content-type: text/x-vcard; charset=UTF-8
-
-begin:vcard
-VERSION:2.1
-FBURL;PREF:http://test/%s.vfb
-UID;charset=UTF-8:Citadel vCard: personal card for %s at test
-n:%s;%s;;;
-title:
-fn:%s
-org:
-adr:;;;;;;
-tel;home:
-tel;work:
-tel;fax:
-tel;cell:
-email;internet:%s
-end:vcard
-'''