Commits

Evgeniy Tatarkin  committed d1fd0d1

Item now subclass of OrderedDict, add CsvPipeline

  • Participants
  • Parent commits 67cb977

Comments (0)

Files changed (6)

 dist/
 html_docs/
 docs/_build/
+build/
+*.csv

File docs/api.rst

 This part of the documentation documents all the public classes and
 functions in pomp.
 
+
 Contrib
 *******
 
 .. automodule:: pomp.contrib
-    :members: 
+    :members:
+
+.. automodule:: pomp.contrib.pipelines
+    :members:
+
 
 Engine
 ******

File pomp/contrib/pipelines.py

+"""
+Simple pipelines
+````````````````
+
+Simple pipelines
+""" 
+import csv
+from pomp.core.base import BasePipeline
+from pomp.core.utils import isstring
+
+
+class CsvPipeline(BasePipeline):
+    """Save items to CSV format
+
+    Params `*args` and `**kwargs` passed to ``csv.writer`` constuctor.
+
+    :param output_file: filename of file like object. If `output_file` is file
+                        like object then after pipe stoped file will be
+                        not closed
+    """
+
+    def __init__(self, output_file, *args, **kwargs):
+        self.output_file = output_file
+        self._csv_args = args
+        self._csv_kwargs = kwargs
+
+        # no close file if it not opened by this instance
+        self._need_close = False
+
+
+    def start(self):
+        if isstring(self.output_file):
+            self.csvfile = open(self.output_file, 'a', newline='')
+            self._need_close = True
+        else:
+            self.csvfile = self.output_file
+
+        self.writer = csv.writer(
+            self.csvfile, *self._csv_args, **self._csv_kwargs
+        )
+
+    def process(self, item):
+        self.writer.writerow(list(item.values()))
+        return item
+
+    def stop(self):
+        if self._need_close:
+            self.csvfile.close()

File pomp/core/item.py

 """
 Item
 """
+import inspect
+from collections import OrderedDict
 
 
-class Item(object):
-    pass
+class Item(OrderedDict):
+
+    def __init__(self, *args, **kwargs):
+        # Initialize empty ordered dict
+        super(Item, self).__init__(self)
+
+        # Populate ordered dict
+        _args = list(args[:])
+        _args.reverse()
+        for field, obj in inspect.getmembers(self.__class__):
+            if isinstance(obj, Field):
+                value = _args.pop() if _args else kwargs.get(field, None)
+                super(Item, self).__setitem__(field, value)
+
+    def __setattr__(self, key, value):
+        if key in self:
+            super(Item, self).__setitem__(key, value)
+        super(Item, self).__setattr__(key, value)
 
 
 class Field(object):

File pomp/core/utils.py

         return iter(var)
 
     return iter((var,))
+
+
+def isstring(obj):
+    try:
+        return isinstance(obj, basestring)
+    except NameError:
+        return isinstance(obj, str)

File tests/test_contrib_pipelines.py

+# -*- coding: utf-8 -*-
+import csv
+import codecs
+import logging
+from nose.tools import assert_equal
+from pomp.core.item import Item, Field
+from pomp.contrib.pipelines import CsvPipeline
+
+
+logging.basicConfig(level=logging.DEBUG)
+
+
+class DummyItem(Item):
+    field1 = Field()
+    field2 = Field()
+    field3 = Field()
+    field4 = Field()
+
+
+
+class TestContribPipelines(object):
+
+    def test_csv_pipeline(self):
+
+        with codecs.open('test_pipe.csv', 'w', encoding='utf-8') as csvfile:
+
+            pipe = CsvPipeline(
+                csvfile,
+                delimiter=';',
+                quotechar='"',
+                quoting=csv.QUOTE_MINIMAL
+            )
+            
+            # prepare pipe
+            pipe.start()
+
+            item = DummyItem(field4='f4')
+            item.field1 = 'f1'
+            item.field2 = 'f2'
+
+            # pipeline item
+            pipe.process(item)
+
+            # close files
+            pipe.stop()
+
+            # check content
+            csvfile.flush()
+            with open(csvfile.name, 'r') as csvres:
+                res = csvres.read()
+                assert_equal(res.strip(), 'f1;f2;;f4')