Source

asterator / Asterisk / manager.py

Full commit

import socket
import asyncore
import asynchat
import thread
import threading

(
    AMS_HELLO,
    AMS_SESSION,
    AMS_HEREDOC
) = range(3)

class mansock(asynchat.async_chat):
    def __init__(self, sock, debug=False):
        asynchat.async_chat.__init__(self, conn=sock)
        self.set_terminator("\r\n")
        self.inbuf = ""
        self.frame = {}
        self.mode = AMS_HELLO
        self.do_debug = debug
        self.version = None

        self.handler_frame = None
        self.handler_session = None
        self.handler_close = None

    def debug(self, data):
        if self.do_debug:
            print data

    def handle_close(self):
        if self.handler_close:
            self.handler_close()

    def collect_incoming_data(self, data):
        self.inbuf += data

    def found_terminator(self):
        data = self.inbuf
        self.inbuf = ""
        self.debug('<<< ' + data)

        if self.mode == AMS_HELLO:
            pos = data.index('/')
            if data[:pos] != 'Asterisk Call Manager':
                self.close()
                raise ValueError, "Invalid protocol header"
            self.version = data[pos+1:]
            self.mode = AMS_SESSION
            if self.handler_session:
                self.handler_session()
        elif self.mode == AMS_SESSION:
            if data == '':
                if self.handler_frame:
                    self.handler_frame(self.frame)
                self.frame = {}
                return
            pos = data.index(': ')
            key = data[:pos]
            val = data[pos+2:]
            self.frame[key] = val
            if key == 'ActionID' and 'Response' in self.frame and self.frame['Response'] == 'Follows':
                self.mode = AMS_HEREDOC
                self.set_terminator("--END COMMAND--\r\n")
        elif self.mode == AMS_HEREDOC:
            self.frame['Data'] = data
            self.mode = AMS_SESSION
            self.set_terminator("\r\n")
        else:
            raise Exception, "Invalid mode"
    
    def __assemble_frame(self, frame):
        data = ''
        for k in frame.keys():
            line = k + ': ' + frame[k]
            self.debug('>>> ' + line)
            data += line + "\r\n"
        data += "\r\n"
        self.debug('>>> ')
        return data

    def put_frame(self, frame):
        self.push(self.__assemble_frame(frame))

class manager:
    def __init__(self, sock, debug=False):
        self.terminated = False
        self.do_debug = debug
        
        self.sock = sock
        self.sock.handler_frame = self.__delegate
        self.version = None
        self.event_handlers = {}
        self.response_handlers = {}
        self.generic_response_handler = None
        self.unique = 1
        self.lock = thread.allocate_lock()

    def terminate(self):
        self.__disconnect()
        self.terminated = True

    def debug(self, data):
        if self.do_debug:
            print data

    def assign_handler(self, event, callback):
        """Assigns a callback handler to a specific type of event frame

        Please specify the name of the event (e.g. newchannel, link)
        and a callback.  Your callback will be passed the event frame
        as an argument.

        Also note that event names are case _IN_sensitive, so you're
        probably best off not twisting your brain on casing and just
        pass the event name as lowercase.
        
        """
        if event.lower() in self.event_handlers:
            self.event_handlers[event.lower()].append(callback)
        else:
            self.event_handlers[event.lower()] = [callback]

    def __delegate(self, frame):
        """Delegates control of event frame to associated callback

        If no handler has been assigned for the particular type of
        event frame (e.g. NewChannel), the event frame will be
        dropped.  This method is called when the socket is being
        polled or if an event is mysteriously encountered during a
        send operation.
        
        """
        if 'Event' in frame:
            if frame['Event'].lower() in self.event_handlers:
                for callback in self.event_handlers[frame['Event'].lower()]:
                    callback(frame)
        elif 'Response' in frame:
            if 'ActionID' in frame and frame['ActionID'] in self.response_handlers:
                self.response_handlers[frame['ActionID']](frame)
                del self.response_handlers[frame['ActionID']]
            else:
                if self.generic_response_handler != None:
                    resp = self.generic_response_handler
                    self.generic_response_handler = None
                    resp(frame)
                else:
                    self.debug("!!! Unhandled repsonse sent to delegate(): " + str(frame))
        else:
            self.debug("!!! Invalid frame sent to delegate(): " + str(frame))

    def queue_frame(self, frame, callback=None):
        """Queues a frame for transmit
        """
        res = None
        if 'Action' in frame and callback != None:
            self.lock.acquire()
            id = self.unique
            self.unique += 1
            self.lock.release()
            frame['ActionID'] = frame['Action'].lower() + '_' + str(id)
            self.response_handlers[frame['ActionID']] = callback

        self.lock.acquire()
        self.sock.put_frame(frame)
        self.lock.release()

    def __disconnect(self):
        """Disconnects from the Asterisk Manager Interface
        """
        self.sock.close()
        self.sock = None

    def login(self, callback, username, secret):
        """Logs in to manager interface and sets events mask
        """
        self.generic_response_handler = callback
        self.queue_frame({'Action':   'Login',
                          'Username': username,
                          'Secret':   secret})

    def command(self, callback, command):
        """Sends a CLI command and returns output

        Command output is returned in the 'data' key in the frame
        passed to your callback
        
        """
        self.queue_frame({'Action':  'Command',
                          'Command': command}, callback)

    def hangup(self, callback, channel):
        """Hangs up a channel
        """
        self.queue_frame({'Action':  'Hangup',
                          'Channel': channel}, callback)

    def hangup_timeout(self, callback, channel, timeout):
        """Hangs up a channel after a timeout

        Timeout is specified in seconds
        
        """
        self.queue_frame({'Action':  'Timeout',
                          'Channel': channel,
                          'Timeout': timeout}, callback)

    def setvar(self, callback, variable, value, channel=None):
        """Sets a channel or global variable

        If you do not specify a channel, it will be a global variable.

        """
        if channel == None:
            self.queue_frame({'Action':   'Setvar',
                              'Variable': variable,
                              'Value':    value}, callback)
        else:
            self.queue_frame({'Action':   'Setvar',
                              'Channel':  channel,
                              'Variable': variable,
                              'Value':    value}, callback)

    def getvar(self, callback, variable, channel=None):
        """Gets a channel or global variable

        If you do not specify a channel, it will be a global variable.

        This method returns the value of the variable.

        """
        if channel == None:
            self.queue_frame({'Action':   'Getvar',
                              'Variable': variable}, callback)
        else:
            self.queue_frame({'Action':   'Getvar',
                              'Channel':  channel,
                              'Variable': variable}, callback)

    def status(self, callback=None):
        """Tells the manager to spill its guts

        Once the status command is issued, the manager will send all
        sorts of neat 'Status' events describing all the current
        channels.  Be sure you have a handler installed for the
        'status' event before calling this!

        """
        self.generic_response_handler = callback
        self.queue_frame({'Action': 'Status'})

    def zapshowchannels(self, callback=None):
        """Tells the manager to spill its guts about Zaptel
        """
        self.generic_response_handler = callback
        self.queue_frame({'Action': 'ZapShowChannels'})

    def redirect(self, callback, channel, context, exten, priority=1):
        """Transfers a channel somewhere in your dial plan
        """
        self.queue_frame({'Action':   'Redirect',
                          'Channel':  channel,
                          'Context':  context,
                          'Exten':    exten,
                          'Priority': priority}, callback)

    def originate_dialplan(self, callback, channel, context, exten, priority=1,
                           callerid=None, timeout=30000, account=None, async=True):
        """Generate an outgoing call and direct to dial plan

        channel should be set to someone to call,
        i.e. Zap/g1/12036660420 When the person picks up the call,
        they will be directed to a location of your choice in your
        dial plan.

        timeout is in milliseconds to wait for answer, default is 30,000

        async will cause a lot of the call origination overhead to be
        moved out in to a pthread so there will be less IO delay,
        default value is True.  Do not confuse this async with this
        class's async mode.

        This function will not return any fancy info about the
        originated call because the manager core merely throws it in a
        queue.
        
        """
        if async:
            async = 'true'
        else:
            async = 'false'
        p = {'Action':   'Redirect',
             'Channel':  channel,
             'Context':  context,
             'Exten':    exten,
             'Priority': priority,
             'Async':    async}
        if callerid != None:
            p['Callerid'] = callerid
        if timeout != None:
            p['Timeout'] = timeout
        if account != None:
            p['Account'] = account
        self.queue_frame(p, callback)
        
    def originate_app(self, callback, channel, app, data,
                      callerid=None, timeout=30000, account=None, async=True):
        """Generate an outgoing call and direct to an application

        This is the EXACT same thing as originate_dialplan(), only
        this method directs the called party directly to an
        application instead of somewhere in the dial plan.
        
        """
        if async:
            async = 'true'
        else:
            async = 'false'
        p = {'Action':  'Redirect',
             'Channel': channel,
             'App':     app,
             'Data':    data,
             'Async':   async}
        if callerid != None:
            p['Callerid'] = callerid
        if timeout != None:
            p['Timeout'] = timeout
        if account != None:
            p['Account'] = account
        self.queue_frame(p, callback)
        
    def mailboxstatus(self, callback, mailbox):
        """Determines the number of unread voicemails in mailbox

        The number of unread voicemails is passed to your callback in
        the 'Waiting' key.

        I honestly don't know if the value returned by this is any
        different than the 'NewMessages' value returned by
        mailboxcount.  Chances are this action is redundant.
        
        """
        self.queue_frame({'Action':  'Mailboxstatus',
                          'Mailbox': mailbox}, callback)
        
    def mailboxcount(self, callback, mailbox):
        """Determines the new and old message count of a box

        The result is passed to your callback in the following keys:
        'NewMessages' and 'OldMessages'
        
        """
        man.queue_frame({'Action':  'Mailboxcount',
                         'Mailbox': mailbox}, callback)

def run_manager(manager, ignore=None):
    while not manager.terminated:
        asyncore.loop(timeout=0.05, count=1)