ProcessingSource ProcessingSink

Processor API or use Brewery:

def my_processor(source, sink):
for entry in source:
  • Row-based, CSV and JSON as only two options
  • Sink types: CSV or JSON uploads to ckan storage, no API in first version
  • Source types: CSV and JSON files, URL pseudo-source, eventually: ScraperWiki API
  • Provide CLI variant for local testing and dev env

CKAN Integration

  1. Derive Processor from Package Resources
  2. Specify output resource (JSON, CSV)
  3. Specify source repo and entry point
  4. Show processor on package view page, two options:
  • Run (now)
  • Schedule (cron-job, based on paster command)
  1. Hand off run commands to backend
  2. Report on each run in UI
  3. Link to processing result for latest run

Core Functionality


  1. Receive kombu/AMQP task to run Processor X
  2. Create or allocate computing runtime for processor
    Runtime types: local, uml, boto
  3. Submit task description to runtime
  4. Expect report on callback hook


  1. Boot and init python http server on a "weird" port
  2. Expect input on weird port
  3. Create a virtualenv in a temp directory

4) Spawn child in virtualenv 4) Activate virtualenv 3) Re-bind stdin, stdout, stderr 4) Run entry point 5) Upload result data

// 6) Register resulting resource
  1. Ping controller
  2. Terminate

Virtualenv detections:

if hasattr(sys, 'real_prefix'):



"pip_url": "hg+....#egg=foo", "callback_url": "http://..../", "entry_point": "foo", "package": "foopkg", "source": {

"resource_id": "xxxxx", "type": "csv",

}, "sink": {

"resource_description": "xxxxx", "type": "csv",




"status": "ok", "stdout": "", "stderr": "", "result_url": "xxx"


Runtime platform:

EC2 EBS micro-instances