Commits

Jacob Sondergaard committed b603d3e

a command line client

Comments (0)

Files changed (2)

 
 ## Example client usage
 
-This simple example involves the most basic operations of putting a job in the queue, reserving and deleting it (the code are in `demo.py`):
+The following is an example on how to use the client's API in a program. 
+
+The example show use of the most fundamental operations when using beanstalkd: putting a job in the queue, reserving and deleting it (the code are in `demo.py`):
 
     import tornado.ioloop
     import beanstalkt
 
 The client will attempt to automatically re-connect if the socket connection to beanstalkd is closed unexpectedly. In other cases where an error occur, an exception will be passed to the callback function.
 
+### Command line client
+
+The package also includes a command line client for interacting with beanstalkd directly from the commandline. Here is an example usage corresponding to the code example above:
+
+Put a job in the default tube (program returns the jid of the new job):
+
+    > python -m beanstalkt.cmd put "A job to work on"
+    1
+
+Reserved a job from the default tube and delete it:
+
+    > python -m beanstalkt.cmd reserve delete
+    {
+      "body": "A job to work on", 
+      "jid": 1
+    }
+
+The documentation is available using the `-h` option, e.g.:
+
+    > python -m beanstalkt.cmd -h
+    > python -m beanstalkt.cmd put -h
+    > python -m beanstalkt.cmd reserve -h
+
+All relevant commands are available via the command line client.
+
 ## The job lifecycle
 
 A job in beanstalk gets created by a client with the put command. During its life it can be in one of four states:
 
 ## Implementation notes
 
-Tests are contained in `btc_test.py` and all tests cases can be run by `python btc_test.py`. 
+Tests are contained in `btc_test.py` and all tests cases can be run by `python bt_test.py` in the source directory.
 
 The beanstalkd protocol uses YAML for communicating the various stats and lists. The client has a crude YAML parser, suitable only for parsing simple lists and dicts, which eliminates the dependency of a YAML parser.

beanstalkt/cmd.py

+import argparse
+import json
+import signal
+
+import tornado.ioloop
+
+import beanstalkt
+
+client = beanstalkt.Client()
+ioloop = tornado.ioloop.IOLoop.instance()
+
+
+def main():
+    #
+    # get arguments and call the client
+    #
+
+    parser = argparse.ArgumentParser(
+            description='Beanstalkd command line client')
+
+    subparsers = parser.add_subparsers()
+
+    # put
+    parser_put = subparsers.add_parser('put', help='put a job (string) into '
+            'the queue')
+    parser_put.add_argument('body', help='string with job data')
+    parser_put.add_argument('-p', '--priority', type=int, default=2 ** 31)
+    parser_put.add_argument('-u', '--use', default='default',
+            help='tube to use')
+    parser_put.add_argument('-d', '--delay', type=int, default=0,
+            help='delay in seconds before moving job to the ready queue')
+    parser_put.add_argument('-t', '--ttr', default=120, type=int,
+            help='Time To Run in seconds')
+    parser_put.set_defaults(func=put)
+
+    # reserve and delete/release/bury
+    parser_reserve = subparsers.add_parser('reserve',
+            help='reserve job from the watched tube(s) and then '
+            'delete/release/bury it')
+    parser_reserve.add_argument('action', choices=['delete', 'release', 'bury'],
+            help='action to be performed when job is reserved')
+    parser_reserve.add_argument('-t', '--timeout', type=int, default=None,
+            help='timeout in seconds')
+    parser_reserve.add_argument('-w', '--watch', action='append',
+            help='tube to watch, apply multiple times to watch several tubes')
+    parser_reserve.add_argument('-i', '--ignore-default', action='store_true',
+            help='ignore the default tube')
+    parser_reserve.add_argument('-p', '--priority', type=int, default=2 ** 31,
+            help='assign new priority, when releasing or burying the job')
+    parser_reserve.add_argument('-d', '--delay', type=int, default=0,
+            help='delay in seconds, when releasing the job, before moving job '
+            'to the ready queue')
+    parser_reserve.set_defaults(func=reserve)
+
+    # peek
+    parser_peek = subparsers.add_parser('peek',
+            help='peek at job with given jid')
+    parser_peek.add_argument('jid', type=int,
+            help='the jid of the job to peek')
+    parser_peek.set_defaults(func=peek)
+
+    # peek-ready
+    parser_peek_ready = subparsers.add_parser('peek-ready',
+            help='peek at the next job in the ready queue of the used tube')
+    parser_peek_ready.add_argument('-u', '--use', default='default',
+            help='tube to use')
+    parser_peek_ready.set_defaults(func=peek_ready)
+
+    # peek-delayed
+    parser_peek_delayed = subparsers.add_parser('peek-delayed',
+            help='peek at the next job in the delayed queue of the used tube')
+    parser_peek_delayed.add_argument('-u', '--use', default='default',
+            help='tube to use')
+    parser_peek_delayed.set_defaults(func=peek_delayed)
+
+    # peek-buried
+    parser_peek_buried = subparsers.add_parser('peek-buried',
+            help='peek at the next job in the buried queue of the used tube')
+    parser_peek_buried.add_argument('-u', '--use', default='default',
+            help='tube to use')
+    parser_peek_buried.set_defaults(func=peek_buried)
+
+    # kick
+    parser_kick = subparsers.add_parser('kick',
+            help='kick one or more jobs into the ready queue from the used '
+            'tube, returns the number of jobs actually kicked')
+    parser_kick.add_argument('-b', '--bound', default=1,
+            help='bound (integer value) on the number of jobs to kick')
+    parser_kick.add_argument('-u', '--use', default='default',
+            help='tube to use')
+    parser_kick.set_defaults(func=kick)
+
+    # kick_job
+    parser_kick_job = subparsers.add_parser('kick-job',
+            help='kick a job with given jid into the ready queue')
+    parser_kick_job.add_argument('jid', type=int,
+            help='the jid of the job to kick')
+    parser_kick_job.set_defaults(func=kick_job)
+
+    # stats_job
+    parser_stats_job = subparsers.add_parser('stats-job',
+            help='get stats for job with given jid')
+    parser_stats_job.add_argument('jid', type=int,
+            help='the jid of the job')
+    parser_stats_job.set_defaults(func=stats_job)
+
+    # stats_tube
+    parser_stats_tube = subparsers.add_parser('stats-tube',
+            help='get stats for a tube')
+    parser_stats_tube.add_argument('name', help='tube to get stats for')
+    parser_stats_tube.set_defaults(func=stats_tube)
+
+    # stats
+    parser_stats = subparsers.add_parser('stats',
+            help='get stats about the beanstalkd instance')
+    parser_stats.set_defaults(func=stats)
+
+    # list
+    parser_list = subparsers.add_parser('list',
+            help='list all existing tubes')
+    parser_list.set_defaults(func=list_tubes)
+
+    # pause_tube
+    parser_pause_tube = subparsers.add_parser('pause', help='pause a tube')
+    parser_pause_tube.add_argument('name', help='tube to pause')
+    parser_pause_tube.add_argument('-d', '--delay', type=int, default=0,
+        help='delay in seconds to wait before reserving any more jobs from '
+        'the queue')
+    parser_pause_tube.set_defaults(func=pause_tube)
+
+    args = parser.parse_args()
+
+    signal.signal(signal.SIGINT, stop)
+    signal.signal(signal.SIGTERM, stop)
+
+    args.func(**vars(args))
+
+
+def start(callback):
+    client.connect(callback)
+    ioloop.start()
+
+
+def success(callback, last=True):
+    def _check_point(data):
+        if isinstance(data, Exception):
+            print str(data)
+            stop()
+        else:
+            callback(data)
+            last and stop()
+    return _check_point
+
+
+def stop(*args):
+    client.close(ioloop.stop)
+
+
+def put(body, priority, use, delay, ttr, func):
+    def step1(_):
+        client.put(body, priority=priority, delay=delay, ttr=ttr,
+                callback=success(step2))
+    def step2(data):
+        print data
+    start(lambda: client.use(use, step1))
+
+
+def reserve(action, timeout, watch, ignore_default, priority, delay, func):
+
+    def step1(_=None):
+        if watch:
+            client.watch(watch.pop(), step1)
+        elif ignore_default:
+            client.ignore('default', step2)
+        else:
+            step2()
+
+    def step2(_=None):
+        client.reserve(timeout, success(step3, last=False))
+
+    def step3(data):
+        print json.dumps(data, indent=2)
+
+        cb = success(lambda _: None)
+        if action == 'delete':
+            client.delete(data['jid'], cb)
+        elif action == 'release':
+            client.release(data['jid'], priority, delay, cb)
+        elif action == 'bury':
+            client.bury(data['jid'], priority, cb)
+
+    start(step1)
+
+
+def peek(jid, func):
+    def step2(data):
+        print json.dumps(data, indent=2)
+    start(lambda: client.peek(jid, success(step2)))
+
+
+def peek_ready(use, func):
+    def step1(_):
+        client.peek_ready(success(step2))
+    def step2(data):
+        print json.dumps(data, indent=2)
+    start(lambda: client.use(use, step1))
+
+
+def peek_delayed(use, func):
+    def step1(_):
+        client.peek_delayed(success(step2))
+    def step2(data):
+        print json.dumps(data, indent=2)
+    start(lambda: client.use(use, step1))
+    
+
+def peek_buried(use, func):
+    def step1(_):
+        client.peek_buried(success(step2))
+    def step2(data):
+        print json.dumps(data, indent=2)
+    start(lambda: client.use(use, step1))
+
+
+def kick(bound, use, func):
+    def step1(_):
+        client.kick(bound, success(step2))
+    def step2(data):
+        print data
+    start(lambda: client.use(use, step1))
+
+
+def kick_job(jid, func):
+    start(lambda: client.kick_job(jid, success(lambda _: None)))
+
+
+def stats_job(jid, func):
+    def step2(data):
+        print json.dumps(data, indent=2)
+    start(lambda: client.stats_job(jid, success(step2)))
+
+
+def stats_tube(name, func):
+    def step2(data):
+        print json.dumps(data, indent=2)
+    start(lambda: client.stats_tube(name, success(step2)))
+
+
+def stats(func):
+    def step2(data):
+        print json.dumps(data, indent=2)
+    start(lambda: client.stats(success(step2)))
+
+
+def list_tubes(func):
+    def step2(data):
+        print json.dumps(data, indent=2)
+    start(lambda: client.list_tubes(success(step2)))
+
+
+def pause_tube(name, delay, func):
+    start(lambda: client.pause_tube(name, delay, success(lambda _: None)))
+
+
+if __name__ == '__main__':
+  main()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.