Miran Levar avatar Miran Levar committed 86b20d6

Added mapreduce

Comments (0)

Files changed (88)

P4/graph_viewer/Templates/graphs.html

     {% else %}
         <a class="btn btn-primary" style="margin-left:5px" href="/graphs/filter" id="filter">Show my graphs</a>
     {% endif %}
-    <a class="btn" style="margin-left:5px" href="/graphs/calculate" id="new">Calculate statistics</a>
+    {% if stats %}
+        <a  id="stats" class="btn " style="margin-left:5px" rel="popover" data-placement="bottom"
+ data-content="{{ stats}}" title="Basic graph statistics">Graph statistics</a>
+        <script>
+            $('#stats').popover()
+        </script>
+    {% else %}
+        <a class="btn" style="margin-left:5px" href="/graphs/calculate" id="new">Graph statistics</a>  
+    {% endif %}
 {% endblock %}
 
 {% block content %}

P4/graph_viewer/views.py

 from django import forms
 from django.views.decorators.csrf import csrf_exempt
 
-
+import mr
 import datetime
 import random
 random.seed(42)
             g.name=form.cleaned_data['name']
             g.nodes=form.cleaned_data['nodes']           
             g.put()
+            memcache.delete('stats')
             return HttpResponseRedirect('/graphs/') 
     else:
         g = Graph.get_by_id(id)
                 u.put()
             g.author = u
             g.put()          
+            memcache.delete('stats')
             return HttpResponseRedirect('/graphs/') 
     else:
         form = GraphForm() 
     return HttpResponse(simplejson.dumps(data), mimetype)
 
 @loginrequired
+def calculate(request):
+    pipeline = mr.PhrasesPipeline()
+    pipeline.start()
+    return HttpResponseRedirect(pipeline.base_path + "/status?root=" + pipeline.pipeline_id)
+
+@loginrequired
 def filter(request):
     """
     Toggle for the filter.
     Returns paginated list of graphs that can optionally be filtered to only show
     graphs created by the user.
     """
+    stats = memcache.get('stats')
+
     if "filter" in request.session and request.session["filter"]:
         u = User.get_by_key_name(users.get_current_user().nickname())
         graph_list = Graph.gql("WHERE author = :author", author=u)
         graphs = paginator.page(paginator.num_pages)
 
     return render(request, 'graphs.html', {
-        'graphs': graphs, 'is_paginated':paginated
+        'graphs': graphs, 'is_paginated':paginated, 'stats':stats
     })
 
 
 # Django settings for P4 project.
 import os
 
-DEBUG = True
+DEBUG = False
 TEMPLATE_DEBUG = DEBUG
 
 ADMINS = (
     url(r'^graphs/view/(?P<id>\d+)/$','P4.graph_viewer.views.view_graph'),
     url(r'^graphs/(?P<id>\d+)/$','P4.graph_viewer.views.edit'),
     url(r'^graphs/filter/$','P4.graph_viewer.views.filter'),
+    url(r'^graphs/calculate/$','P4.graph_viewer.views.calculate'),
 
     # ajax handler
     url(r'^getgraph/$', 'P4.graph_viewer.views.get_graph'),
 application: graph-viewer
-version: 3
+version: 4
 runtime: python27
 api_version: 1
 threadsafe: true #false
 handlers:
 - url: /favicon.ico
   static_files: img/favicon.ico
-  upload: img/favicon.ico
+  upload: img/favicon.
+
 - url: /img/
   static_dir: img
+
 - url: /info/.*
   script: main.app
 
+- url: /mapreduce/pipeline/images
+  static_dir: mapreduce/lib/pipeline/ui/images
+
+- url: /mapreduce(/.*)?
+  script: mapreduce.main.APP
+
+- url: /blobstore/.*
+  script: main.app
+
+
 libraries:
 - name: jinja2
   version: latest
 </div>
 
 <div class="footer navbar-fixed-bottom" >
-    <a  id="info" class="btn btn-info btn-small pull-left" style="margin:4px;" href="/info/info">Info</a>
     <img class="pull-right" src="/img/appengine-noborder-120x30.gif" 
     alt="Powered by Google App Engine" />
     <p id="foot">
 import webapp2
 import datetime
 import os
+import urllib
 
 from google.appengine.api import users
 from google.appengine.api import memcache
+from google.appengine.ext import blobstore
+
+from google.appengine.ext.webapp import blobstore_handlers
 
 jinja_environment = jinja2.Environment(
     loader=jinja2.FileSystemLoader(os.path.dirname(__file__)+'/jinja/'))
         template = jinja_environment.get_template('info.html')
         self.response.out.write(template.render({"username": user.nickname()}))
 
-class InfoHandler(webapp2.RequestHandler):
+
+class HelloHandler(webapp.RequestHandler):
     def get(self):
-        user = users.get_current_user()
+        self.response.out.write('Hello world!')
 
-        template = jinja_environment.get_template('info.html')
-        self.response.out.write(template.render({"username": user.nickname()}))
+class DownloadHandler(blobstore_handlers.BlobstoreDownloadHandler):
+  """Handler to download blob by blobkey."""
+  def get(self, key):
+    key = str(urllib.unquote(key)).strip()
+    blob_info = blobstore.BlobInfo.get(key)
+    self.send_blob(blob_info)
 
-app = webapp2.WSGIApplication([('/info/', MainHandler), ('/info/info', InfoHandler)], debug=True)
+
+app = webapp2.WSGIApplication([('/info/', MainHandler), ('/info/hello', HelloHandler), (r'/blobstore/(.*)', DownloadHandler),], debug=False)

mapreduce/base_handler.py

 
 
 import logging
-try:
-  import json as simplejson
-except ImportError:
-  from mapreduce.lib import simplejson
+from mapreduce.lib import simplejson
 
 try:
   from mapreduce.lib import pipeline

mapreduce/context.py

 from google.appengine.api import datastore
 from google.appengine.ext import db
 
+try:
+  from google.appengine.ext import ndb
+except ImportError:
+  ndb = None
+#EXT It is acceptable to set key_range.ndb to the ndb module,
+#EXT imported through some other way (e.g. from the app dir).
+
 
 # Maximum pool size in bytes. Pool will be flushed when reaches this amount.
 # We use 950,000 bytes which is slightly less than maximum allowed RPC size of
 
 def _normalize_entity(value):
   """Return an entity from an entity or model instance."""
-  # TODO(user): Consider using datastore.NormalizeAndTypeCheck.
+  if ndb is not None and isinstance(value, ndb.Model):
+    return None
   if getattr(value, "_populate_internal_entity", None):
     return value._populate_internal_entity()
   return value
 
 def _normalize_key(value):
   """Return a key from an entity, model instance, key, or key string."""
+  if ndb is not None and isinstance(value, (ndb.Model, ndb.Key)):
+    return None
   if getattr(value, "key", None):
     return value.key()
   elif isinstance(value, basestring):
     self.force_writes = bool(params.get("force_ops_writes", False))
     self.puts = ItemList()
     self.deletes = ItemList()
+    self.ndb_puts = ItemList()
+    self.ndb_deletes = ItemList()
 
   def put(self, entity):
     """Registers entity to put to datastore.
       entity: an entity or model instance to put.
     """
     actual_entity = _normalize_entity(entity)
+    if actual_entity is None:
+      return self.ndb_put(entity)
     entity_size = len(actual_entity._ToPb().Encode())
     if (self.puts.length >= self.max_entity_count or
         (self.puts.size + entity_size) > self.max_pool_size):
       self.__flush_puts()
     self.puts.append(actual_entity, entity_size)
 
+  def ndb_put(self, entity):
+    """Like put(), but for NDB entities."""
+    assert ndb is not None and isinstance(entity, ndb.Model)
+    entity_size = len(entity._to_pb().Encode())
+    if (self.ndb_puts.length >= self.max_entity_count or
+        (self.ndb_puts.size + entity_size) > self.max_pool_size):
+      self.__flush_ndb_puts()
+    self.ndb_puts.append(entity, entity_size)
+
   def delete(self, entity):
     """Registers entity to delete from datastore.
 
     """
     # This is not very nice: we're calling two protected methods here...
     key = _normalize_key(entity)
+    if key is None:
+      return self.ndb_delete(entity)
     key_size = len(key._ToPb().Encode())
     if (self.deletes.length >= self.max_entity_count or
         (self.deletes.size + key_size) > self.max_pool_size):
       self.__flush_deletes()
     self.deletes.append(key, key_size)
 
+  def ndb_delete(self, entity_or_key):
+    """Like delete(), but for NDB entities/keys."""
+    if isinstance(entity_or_key, ndb.Model):
+      key = entity_or_key.key
+    else:
+      key = entity_or_key
+    key_size = len(key.reference().Encode())
+    if (self.ndb_deletes.length >= self.max_entity_count or
+        (self.ndb_deletes.size + key_size) > self.max_pool_size):
+      self.__flush_ndb_deletes()
+    self.ndb_deletes.append(key, key_size)
+
   # TODO(user): some kind of error handling/retries is needed here.
   def flush(self):
     """Flush(apply) all changed to datastore."""
     self.__flush_puts()
     self.__flush_deletes()
+    self.__flush_ndb_puts()
+    self.__flush_ndb_deletes()
 
   def __flush_puts(self):
     """Flush all puts to datastore."""
       datastore.Delete(self.deletes.items, config=self.__create_config())
     self.deletes.clear()
 
+  def __flush_ndb_puts(self):
+    """Flush all NDB puts to datastore."""
+    if self.ndb_puts.length:
+      ndb.put_multi(self.ndb_puts.items, config=self.__create_config())
+    self.ndb_puts.clear()
+
+  def __flush_ndb_deletes(self):
+    """Flush all deletes to datastore."""
+    if self.ndb_deletes.length:
+      ndb.delete_multi(self.ndb_deletes.items, config=self.__create_config())
+    self.ndb_deletes.clear()
+
   def __create_config(self):
     """Creates datastore Config.
 

mapreduce/errors.py

 #!/usr/bin/env python
-#
 # Copyright 2011 Google Inc.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
     "BadWriterParamsError",
     "BadYamlError",
     "Error",
+    "FailJobError",
     "MissingYamlError",
     "MultipleDocumentsInMrYaml",
+    "NotEnoughArgumentsError",
+    "RetrySliceError",
     "ShuffleServiceError",
     ]
 
+
 class Error(Exception):
   """Base-class for exceptions in this module."""
 
   """The input parameters to a reader were invalid."""
 
 
-class ShuffleServiceError(Error):
-  """Error doing shuffle through shuffle service."""
+class FailJobError(Error):
+  """The job will be failed if this exception is thrown anywhere."""
+
+
+class NotEnoughArgumentsError(Error):
+  """Required argument is missing."""
 
 
 class BadCombinerOutputError(Error):
   """Combiner outputs data instead of yielding it."""
 
+
+class ShuffleServiceError(Error):
+  """Error doing shuffle through shuffle service."""
+
+
+class RetrySliceError(Error):
+  """The slice will be retried up to some maximum number of times.
+
+  The job will be failed if the slice can't progress before maximum
+  number of retries.
+  """

mapreduce/file_format_parser.py

+#!/usr/bin/env python
+# Copyright 2012 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Define file format string Parser."""
+
+
+
+__all__ = ['parse']
+
+import re
+import tokenize
+
+from mapreduce import file_formats
+
+
+def parse(format_string):
+  """Parse format string.
+
+  Args:
+    format_string: format_string from MapReduce FileInputReader.
+
+  Returns:
+    a list of _FileFormat objects.
+  """
+  return _FileFormatParser(_FileFormatTokenizer(format_string)).parse()
+
+
+class _FileFormatParser(object):
+  """Parses a format string according to the following grammar.
+
+  In Python's modified BNF notation.
+  format_string ::= parameterized_format ( "[" parameterized_format "]" )*
+  parameterized_format ::= format [ format_parameters ]
+  format_parameters ::= "(" format_paramter ("," format_parameter )* ")"
+  format_parameter ::= format_specific_parameter "=" parameter_value
+  format ::= (<letter>|<number>)+
+  parameter_value ::= (<letter>|<number>|<punctuation>)+
+  format_specific_parameter ::= (<letter>|<number>)+
+  """
+
+  def __init__(self, tokenizer):
+    self._formats = []
+    self._tokenizer = tokenizer
+
+  def _add_format(self, format_name, arguments):
+    if format_name not in file_formats.FORMATS:
+      raise ValueError('Invalid format %s.' % format_name)
+    format_cls = file_formats.FORMATS[format_name]
+    for k in arguments:
+      if k not in format_cls.ARGUMENTS:
+        raise ValueError('Invalid argument %s for format %s' %
+                         (k, format_name))
+    self._formats.append(format_cls(**arguments))
+
+  def parse(self):
+    self._parse_format_string()
+
+    if self._tokenizer.remainder():
+      raise ValueError('Extra chars after index -%d' %
+                       self._tokenizer.remainder())
+    return self._formats
+
+  def _parse_format_string(self):
+    """Parse format_string."""
+    self._parse_parameterized_format()
+    if self._tokenizer.consume_if('['):
+      self._parse_format_string()
+      self._tokenizer.consume(']')
+
+  def _validate_string(self, text):
+    """Validate a string is composed of valid characters."""
+    if not re.match(tokenize.Name, text):
+      raise ValueError('%s should only contain ascii letters or digits.' %
+                       text)
+
+  def _parse_parameterized_format(self):
+    """Parse parameterized_format."""
+    if not self._tokenizer.remainder():
+      return
+
+    format_name = self._tokenizer.next()
+    self._validate_string(format_name)
+
+    arguments = {}
+
+    if self._tokenizer.consume_if('('):
+      arguments = self._parse_format_parameters()
+      self._tokenizer.consume(')')
+
+    self._add_format(format_name, arguments)
+
+  def _parse_format_parameters(self):
+    """Parse format_parameters."""
+    arguments = {}
+    comma_exist = True
+    while self._tokenizer.peek() not in ')]':
+      if not comma_exist:
+        raise ValueError('Arguments should be separated by comma at index %d.'
+                         % self._tokenizer.index())
+      key = self._tokenizer.next()
+      self._validate_string(key)
+      self._tokenizer.consume('=')
+      value = self._tokenizer.next()
+      comma_exist = self._tokenizer.consume_if(',')
+      if key in arguments:
+        raise ValueError('Argument %s defined more than once.' % key)
+      arguments[key] = value
+    return arguments
+
+
+class _FileFormatTokenizer(object):
+  """Tokenize a user supplied format string.
+
+  A token is either a special character or a group of characters between
+  two special characters or the beginning or the end of format string.
+  Escape character can be used to escape special characters and itself.
+  """
+
+  SPECIAL_CHARS = '[]()=,'
+  ESCAPE_CHAR = '\\'
+
+  def __init__(self, format_string):
+    self._index = 0
+    self._format_string = format_string
+
+  def next(self):
+    return self._next().strip()
+
+  def _next(self):
+    escaped = False
+    token = ''
+    while self.remainder():
+      char = self._format_string[self._index]
+      if char == self.ESCAPE_CHAR:
+        if escaped:
+          token += char
+          self._index += 1
+          escaped = False
+        else:
+          self._index += 1
+          escaped = True
+      elif char in self.SPECIAL_CHARS and not escaped:
+        if token:
+          return token
+        else:
+          self._index += 1
+          return char
+      else:
+        escaped = False
+        self._index += 1
+        token += char
+    return token
+
+  def consume(self, expected_token):
+    token = self.next()
+    if token != expected_token:
+      raise ValueError('Expect "%s" but got "%s" at offset %d' %
+                       (expected_token, token, self._index))
+
+  def consume_if(self, token):
+    if self.peek() == token:
+      self.consume(token)
+      return True
+    return False
+
+  def peek(self):
+    token = self._next()
+    self._index -= len(token)
+    return token.strip()
+
+  def remainder(self):
+    return len(self._format_string) - self._index
+
+  def index(self):
+    return self._index

mapreduce/file_format_root.py

+#!/usr/bin/env python
+# Copyright 2012 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Define file format root."""
+
+from __future__ import with_statement
+
+
+
+
+__all__ = ['FileFormatRoot',
+           'split']
+
+import copy
+import mapreduce.file_format_parser as parser
+
+from mapreduce.lib.files import file as files
+from mapreduce import model
+from mapreduce import file_formats
+
+
+def split(filenames, format_string, shards):
+  """Get a FileFormatRoot for each shard.
+
+  This method creates a list of FileFormatRoot and assigns each root
+  some input files. The number of roots is less than or equal to shards.
+
+  Args:
+    filenames: input filenames
+    format_string: format string from user.
+    shards: number of shards to split inputs.
+
+  Returns:
+    A list of FileFormatRoot or None if all input files have zero bytes.
+  """
+  parsed_formats = parser.parse(format_string)
+
+  sizes = [files.stat(filename).st_size for filename in filenames]
+  # TODO(user): add min shard size protection if needed.
+  size_per_shard = float(sum(sizes)) / shards
+  if not size_per_shard:
+    return
+
+  if parsed_formats[0].can_split():
+    return _deep_split(filenames, size_per_shard, parsed_formats)
+  else:
+    return _shallow_split(filenames, size_per_shard, parsed_formats, sizes)
+
+
+def _shallow_split(filenames, size_per_shard, parsed_formats, sizes):
+  """Split files into roots only based on top level file sizes.
+
+  This split does not cross file boundary.
+  """
+  roots = []
+  inputs = []
+  shard_size = 0
+  for i, size in enumerate(sizes):
+    shard_size += size
+    inputs.append(_FileRange(filenames[i], None))
+    if shard_size > size_per_shard:
+      roots.append(FileFormatRoot(copy.deepcopy(parsed_formats), inputs))
+      inputs = []
+      shard_size = 0
+
+  if inputs:
+    roots.append(FileFormatRoot(copy.deepcopy(parsed_formats), inputs))
+
+  return roots
+
+
+def _deep_split(filenames, size_per_shard, parsed_formats):
+  """Split files into roots using the first FileFormat.
+
+  Deep split can split within a file. It tells the first format how big
+  a split it wants and the first format will do the actually splitting
+  because only the first format knows how to operate on this particular
+  format.
+
+  Args:
+    filenames: a list of input filenames.
+    size_per_shard: size per shard.
+    parsed_format: the parsed FileFormats.
+
+  Returns:
+    A list of FileFormatRoot.
+  """
+  roots = []
+  inputs = []
+  size_left = size_per_shard
+
+  for filename in filenames:
+    index = 0
+    with files.open(filename) as f:
+      cache_for_split = {}
+      # Split a single file.
+      while True:
+        if size_left <= 0:
+          # Shard has been filled.
+          roots.append(FileFormatRoot(copy.deepcopy(parsed_formats), inputs))
+          size_left = size_per_shard
+          inputs = []
+        start_index = index
+        size_left, index = parsed_formats[0].split(size_left,
+                                                   start_index,
+                                                   f,
+                                                   cache_for_split)
+        # File has been entirely covered.
+        if start_index == index:
+          break
+        inputs.append(_FileRange(filename, (start_index, index)))
+
+  if inputs:
+    roots.append(FileFormatRoot(copy.deepcopy(parsed_formats), inputs))
+
+  return roots
+
+
+class _FileRange(model.JsonMixin):
+  """Describe a range of a file to read.
+
+  FileFormatRootFactory creates instances of this class and
+  feeds them to different roots.
+  """
+
+  # Json Properties.
+  FILENAME = 'filename'
+  RANGE = 'range'
+
+  def __init__(self, filename, file_range=None):
+    """Init.
+
+    Args:
+      filename: filename in str.
+      file_range: [start_index, end_index) tuple. This only makes sense for
+        _FileFormats that support splitting within a file.
+        It specify the range to read this file.
+        None means reading the entire file. When defined, what it means
+        differ for each format. For example, if a file is of zip format,
+        index specifies the member files to read. If a file is of record
+        format, index specifies the records to read.
+    """
+    self.filename = filename
+    self.range = file_range
+
+  def to_json(self):
+    return {self.FILENAME: self.filename,
+            self.RANGE: self.range}
+
+  @classmethod
+  def from_json(cls, json):
+    return cls(json[cls.FILENAME], json[cls.RANGE])
+
+
+class FileFormatRoot(model.JsonMixin):
+  """FileFormatRoot.
+
+  FileFormatRoot takes a list of FileFormats as processing units and
+  a list of _FileRanges as inputs. It provides an interface to
+  iterate through all the inputs. All inputs will be processed by each
+  processing unit in a cascaded manner before being emitted.
+
+  The order of the list of FileFormats matters. The last
+  FileFormat's output is returned by FileFormatRoot.
+  Each FileFormat asks FileFormatRoot for inputs, which are either outputs
+  from its previous FileFormat or, in the case of the first FileFormat,
+  outputs directly from FileFormatRoot.
+
+  FileFormats don't know each other. FileFormatRoot coordinates all
+  their initializations, (de)serialization, and communications.
+  """
+
+  # Json Properties.
+  _INPUTS = 'inputs'
+  _FORMATS = 'formats'
+  _FILES_STREAMS = 'files_streams'
+
+  def __init__(self, formats, inputs, files_streams_json=None):
+    """Init.
+
+    Args:
+      formats: A list of _FileFormats.
+      inputs: A list of _FileRanges.
+      init_files_streams: If to initialize files streams to default value.
+    """
+    self._inputs = inputs
+    self._formats = formats
+    for i, file_format in enumerate(self._formats):
+      stream_cls = _RootFilesStream if i == 0 else _FilesStream
+      if files_streams_json:
+        file_format._input_files_stream = stream_cls.from_json(
+            files_streams_json[i], self)
+      else:
+        file_format._input_files_stream = stream_cls(i, self)
+
+  def __repr__(self):
+    return str(self.to_json())
+
+  def __iter__(self):
+    return self
+
+  def to_json(self):
+    return  {self._INPUTS: [_.to_json() for _ in self._inputs],
+             self._FORMATS: [_.to_json() for _ in self._formats],
+             self._FILES_STREAMS:
+             [_._input_files_stream.to_json() for _ in self._formats]}
+
+  @classmethod
+  def from_json(cls, json):
+    formats = [file_formats.FORMATS[_json[file_formats.FileFormat._FORMAT]].
+        from_json(_json) for _json in json[cls._FORMATS]]
+
+    root = cls(formats,
+               [_FileRange.from_json(_) for _ in json[cls._INPUTS]],
+               json[cls._FILES_STREAMS])
+
+    return root
+
+  def next(self):
+    """Iterate over inputs."""
+    result = self._formats[-1].next()
+    self._formats[-1]._input_files_stream.checkpoint()
+    self._formats[-1].checkpoint()
+    return result
+
+
+class _FilesStream(object):
+  """Provide FileFormat with a stream of file-like objects as inputs.
+
+  Attributes:
+    current: the current file-like object to read from.
+  """
+
+  # Json Properties.
+  PREVIOUS_OFFSET = 'previous'
+  INDEX = 'index'
+
+  def __init__(self,
+               index,
+               file_format_root,
+               offset=0,
+               next_func=None):
+    """Init.
+
+    Args:
+      file_format_root: the FileFormatRoot this stream should talk to.
+      index: the index of this stream within the FileFormatRoot.
+      offset: the offset to start reading current file.
+      next_func: a function that gives back the next file from the stream.
+    """
+    self._next_file = next_func or file_format_root._formats[index-1].next
+    self._preprocess = file_format_root._formats[index].preprocess
+    # The offset at last checkpoint. Used to rewind.
+    self._previous_offset = offset
+    self._index = index
+    self._current = self._preprocess(self._next_file())
+    self._current.seek(offset)
+
+  def advance(self):
+    """Advance _current to the next file-like object.
+
+    _FileStream should call this after consumed the current file-like object.
+    """
+    self._previous_offset = 0
+    self._current.close()
+    self._current = self._preprocess(self._next_file())
+
+  @property
+  def current(self):
+    return self._current
+
+  def checkpoint(self):
+    self._previous_offset = self._current.tell()
+
+  def to_json(self):
+    return {self.PREVIOUS_OFFSET: self._previous_offset,
+            self.INDEX: self._index}
+
+  @classmethod
+  def from_json(cls, json, file_format_root):
+    return cls(json[cls.INDEX], file_format_root, json[cls.PREVIOUS_OFFSET])
+
+
+class _RootFilesStream(_FilesStream):
+  """Special FilesStream for the first FileFormat"""
+
+  PREVIOUS_INPUT_INDEX = 'input_index'
+
+  def __init__(self,
+               index,
+               file_format_root,
+               offset=0,
+               input_index=0):
+    """Init.
+
+    Args:
+      index: the index of this stream within the FileFormatRoot.
+      file_format_root: the FileFormatRoot this stream should talk to.
+      offset: the offset to start reading current file.
+      input_index: index of the next input file to read.
+    """
+    self.__inputs = file_format_root._inputs
+    self.__input_index = input_index
+    self.__previous_input_index = input_index
+    self.__file_format_root = file_format_root
+
+    super(_RootFilesStream, self).__init__(index,
+                                           file_format_root,
+                                           offset,
+                                           self.next_file)
+
+  def next_file(self):
+    if self.__input_index == len(self.__inputs):
+      raise StopIteration()
+    file_input = self.__inputs[self.__input_index]
+    if file_input.range:
+      first_format = self.__file_format_root._formats[0]
+      if not first_format.can_split():
+        raise ValueError('Input range specified for a non splitable format %s'
+                         % first_format.NAME)
+      first_format._range = file_input.range
+    self.__previous_input_index = self.__input_index
+    self.__input_index += 1
+    return files.open(file_input.filename, 'r')
+
+  def to_json(self):
+    result = super(_RootFilesStream, self).to_json()
+    result[self.PREVIOUS_INPUT_INDEX] = self.__previous_input_index
+    return result
+
+  @classmethod
+  def from_json(cls, json, file_format_root):
+    return cls(json[cls.INDEX],
+               file_format_root,
+               json[cls.PREVIOUS_OFFSET],
+               json[cls.PREVIOUS_INPUT_INDEX])

mapreduce/file_formats.py

+#!/usr/bin/env python
+# Copyright 2012 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Define file formats."""
+
+
+
+__all__ = ['FileFormat',
+           'FORMATS']
+
+import StringIO
+import zipfile
+
+
+class FileFormat(object):
+  """FileFormat.
+
+  FileFormat knows how to operate on file of a specific format.
+  It should never been instantiated directly.
+
+  Life cycle of FileFormat:
+    1. Two ways that FileFormat is created: file_format_root.split creates
+       FileFormat from scratch. FileFormatRoot.from_json creates FileFormat
+       from serialized json str. Either way, it is associated with a
+       FileFormatRoot.
+    2. Root acts as a coordinator among FileFormats. Root initializes
+       its many fields so that FileFormat knows how to iterate over its inputs.
+    3. Its next() method is used to iterate. next() gets input from a
+       _FileStream object associated to this FileFormat by root.
+    4. It keeps iterating until either root calls its to_json() or root
+       sends it a StopIteration.
+
+  How to define a new format:
+    1. Subclass this.
+    2. Override NAME and ARGUMENTS. They are used during parsing.
+       See file_format_parser._FileFormatParser.
+    3. Optionally override preprocess_file(), which operates on
+       _input_files_stream.current before any next() is called. See method.
+    4. Override get_next(). Used by next() to fetch the next content to
+       return. See method.
+    5. Optionally override can_split() and split() if this format
+       supports them. See method.
+    6. Write unit tests. Tricky logics (to/from_json, advance
+       _input_files_stream) are shared. Thus as long as you respected
+       get_next()'s pre/post conditions, tests are very simple.
+    7. Register your format at FORMATS.
+
+  Attributes:
+    ARGUMENTS: a list of acceptable arguments to this format. Used for parsing
+        this format.
+    NAME: the name of this format. Used for parsing this format.
+  """
+
+  ARGUMENTS = []
+  NAME = '_file'
+  # Default value for self._index.
+  DEFAULT_INDEX_VALUE = None
+
+  # Json Properties.
+  _KWARGS = 'kwargs'
+  _INDEX = 'index'
+  _RANGE = 'input_range'
+  _FORMAT = 'name'
+  _PREVIOUS_INDEX = 'previous_index'
+
+  def __init__(self, **kwargs):
+    for k in kwargs:
+      if k not in self.ARGUMENTS:
+        raise ValueError('Illegal argument %s' % k)
+    self._kwargs = kwargs
+
+    # A dict to save all the transient objects needed during iteration.
+    # If an object is expensive to initiate, put it here.
+    self._cache = {}
+
+    # These fields are directly set by FileFormatRoot.
+
+    # index of where to read _input_files_stream.current.
+    # This is NOT the actually offset into the file.
+    # It's interpreted by different _FileFormats differently.
+    # A format should override DEFAULT_INDEX_VALUE if it needs _index.
+    # See each FileFormat.DEFAULT_INDEX_VALUE for its semantic.
+    self._index = self.DEFAULT_INDEX_VALUE
+    self._previous_index = self.DEFAULT_INDEX_VALUE
+    # A _FilesStream object where _FileFormat should read inputs from.
+    self._input_files_stream = None
+    # A tuple [start_index, end_index) that if defined, should bound
+    # self._index.
+    self._range = None
+
+  def __repr__(self):
+    return str(self.to_json())
+
+  def __str__(self):
+    result = self.NAME
+    sorted_keys = self._kwargs.keys()
+    sorted_keys.sort()
+
+    if self._kwargs:
+      result += (
+          '(' +
+          ','.join([key + '=' + self._kwargs[key] for key in sorted_keys]) +
+          ')')
+    return result
+
+  def checkpoint(self):
+    self._previous_index = self._index
+
+  def to_json(self):
+    return {self._KWARGS: self._kwargs,
+            self._INDEX: self._index,
+            self._RANGE: self._range,
+            self._FORMAT: self.NAME,
+            self._PREVIOUS_INDEX: self._previous_index}
+
+  @classmethod
+  def from_json(cls, json):
+    file_format = cls(**json[cls._KWARGS])
+    file_format._index = json[cls._PREVIOUS_INDEX]
+    file_format._previous_index = json[cls._PREVIOUS_INDEX]
+    file_format._range = json[cls._RANGE]
+    return file_format
+
+  @classmethod
+  def can_split(cls):
+    """Does this format support split.
+
+    Return True if a FileFormat allows its inputs to be splitted into
+    different shards. Must implement split method.
+    """
+    return False
+
+  @classmethod
+  # pylint: disable-msg=W0613
+  def split(cls, desired_size, start_index, input_file, cache):
+    """Split a single chunk of desired_size from file.
+
+    FileFormatRoot uses this method to ask FileFormat how to split
+    one file of its format.
+
+    This method takes an opened file and a start_index. If file
+    size is bigger than desired_size, the method determines a chunk of the
+    file whose size is close to desired_size. The chuck is indicated by
+    [start_index, end_index). If the file is smaller than desired_size,
+    the chunk will include the rest of the input_file.
+
+    This method also indicates how many bytes are consumed by this chunk
+    by returning size_left to the caller.
+
+    Args:
+      desired_size: desired number of bytes for this split. Positive int.
+      start_index: the index to start this split. The index is not necessarily
+        an offset. In zipfile, for example, it's the index of the member file
+        in the archive. Non negative int.
+      input_file: opened Files API file to split. Do not close this file.
+      cache: a dict to cache any object over multiple calls if needed.
+
+    Returns:
+      Return a tuple of (size_left, end_index). If end_index equals start_index,
+      the file is fully split.
+    """
+    raise NotImplementedError('split is not implemented for %s.' %
+                              cls.__name__)
+
+  def __iter__(self):
+    return self
+
+  def preprocess(self, file_object):
+    """Does preprocessing on the file-like object and return another one.
+
+    Normally a FileFormat directly reads from the original
+    _input_files_stream.current, which is a File-like object containing str.
+    But some FileFormat need to decode the entire str before any iteration
+    is possible (e.g. lines). This method takes the original
+    _input_files_stream.current and returns another File-like object
+    that replaces the original one.
+
+    Args:
+      file_object: read from this object and process its content.
+
+    Returns:
+      a file-like object containing processed contents. If the returned object
+      is newly created, close the old one.
+    """
+    return file_object
+
+  def next(self):
+    """Return a file-like object containing next content."""
+    try:
+      # Limit _index by _range.
+      if self.DEFAULT_INDEX_VALUE is not None and self._range:
+        if self._index < self._range[0]:
+          self._index = self._range[0]
+        elif self._index >= self._range[1]:
+          raise EOFError()
+
+      self._input_files_stream.checkpoint()
+      self.checkpoint()
+      result = self.get_next()
+      if isinstance(result, str):
+        result = StringIO.StringIO(result)
+      if isinstance(result, unicode):
+        raise ValueError('%s can not return unicode object.' %
+                         self.__class__.__name__)
+      return result
+    except EOFError:
+      self._input_files_stream.advance()
+      self._index = self.DEFAULT_INDEX_VALUE
+      self._cache = {}
+      return self.next()
+
+  def get_next(self):
+    """Find the next content to return.
+
+    Expected steps of any implementation:
+      1. Read input from _input_files_stream.current. It is guaranteed
+         to be a file-like object ready to be read from. It returns
+         Python str.
+      2. If nothing is read, raise EOFError. Otherwise, process the
+         contents read in anyway. _kwargs is guaranteed to be a dict
+         containing all arguments and values to this format.
+      3. If this format needs _index to keep track of where in
+         _input_files_stream.current to read next, the format is
+         responsible for updating _index correctly in each iteration.
+         _index and _input_files_stream.current is guaranteed to be
+         correctly (de)serialized. Thus the implementation don't need
+         to worry/know about (de)serialization at all.
+      4. Return the processed contents either as a file-like object or
+         Python str. NO UNICODE.
+
+    Returns:
+      The str or file like object if got anything to return.
+
+    Raises:
+      EOFError if no content is found to return.
+    """
+    raise NotImplementedError('%s not implemented.' % self.__class__.__name__)
+
+
+# Binary formats.
+class _BinaryFormat(FileFormat):
+  """Base class for any binary formats.
+
+  This class just reads the entire file as raw str. All subclasses
+  should simply override NAME. That NAME will be passed to Python
+  to decode the bytes.
+  """
+
+  NAME = 'bytes'
+
+  def get_next(self):
+    result = self._input_files_stream.current.read()
+    if not result:
+      raise EOFError()
+    if self.NAME != _BinaryFormat.NAME:
+      return result.decode(self.NAME)
+    return result
+
+
+class _Base64Format(_BinaryFormat):
+  """Read entire file as base64 str."""
+
+  NAME = 'base64'
+
+
+# Archive formats.
+class _ZipFormat(FileFormat):
+  """Read member files of zipfile."""
+
+  NAME = 'zip'
+  # _index specifies the next member file to read.
+  DEFAULT_INDEX_VALUE = 0
+
+  def get_next(self):
+    if self._cache:
+      zip_file = self._cache['zip_file']
+      infolist = self._cache['infolist']
+    else:
+      zip_file = zipfile.ZipFile(self._input_files_stream.current)
+      infolist = zip_file.infolist()
+      self._cache['zip_file'] = zip_file
+      self._cache['infolist'] = infolist
+
+    if self._index == len(infolist):
+      raise EOFError()
+
+    result = zip_file.read(infolist[self._index])
+    self._index += 1
+    return result
+
+  @classmethod
+  def can_split(cls):
+    return True
+
+  @classmethod
+  def split(self, desired_size, start_index, opened_file, cache):
+    if 'infolist' in cache:
+      infolist = cache['infolist']
+    else:
+      zip_file = zipfile.ZipFile(opened_file)
+      infolist = zip_file.infolist()
+      cache['infolist'] = infolist
+
+    index = start_index
+    while desired_size > 0 and index < len(infolist):
+      desired_size -= infolist[index].file_size
+      index += 1
+    return desired_size, index
+
+
+# Text formats.
+class _TextFormat(FileFormat):
+  """Base class for any text format.
+
+  Text formats are those that require decoding before iteration.
+  This class takes care of the preprocessing logic of decoding.
+  """
+
+  ARGUMENTS = ['encoding']
+  NAME = '_text'
+
+  def preprocess(self, file_object):
+    """Decode the entire file to read text."""
+    if 'encoding' in self._kwargs:
+      content = file_object.read()
+      content = content.decode(self._kwargs['encoding'])
+      file_object.close()
+      return StringIO.StringIO(content)
+    return file_object
+
+
+class _LinesFormat(_TextFormat):
+  """Read file line by line."""
+
+  NAME = 'lines'
+
+  def get_next(self):
+    result = self._input_files_stream.current.readline()
+    if not result:
+      raise EOFError()
+    if result and 'encoding' in self._kwargs:
+      result = result.encode(self._kwargs['encoding'])
+    return result
+
+
+class _CSVFormat(_TextFormat):
+  ARGUMENTS = _TextFormat.ARGUMENTS + ['delimiter']
+  NAME = 'csv'
+  # TODO(user) implement this. csv exists now only to test parser.
+
+
+FORMATS = {
+    # Binary formats.
+    'base64': _Base64Format,
+    'bytes': _BinaryFormat,
+    # Text format.
+    'csv': _CSVFormat,
+    'lines': _LinesFormat,
+    # Archive formats.
+    'zip': _ZipFormat}

mapreduce/handlers.py

 #!/usr/bin/env python
-#
 # Copyright 2010 Google Inc.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 from mapreduce import quota
 from mapreduce import util
 
+try:
+  from google.appengine.ext import ndb
+except ImportError:
+  ndb = None
+
 
 # TODO(user): Make this a product of the reader or in quotas.py
 _QUOTA_BATCH_SIZE = 20
 # Delay between consecutive controller callback invocations.
 _CONTROLLER_PERIOD_SEC = 2
 
+# How many times to cope with a RetrySliceError before totally
+# giving up and aborting the whole job.
+_RETRY_SLICE_ERROR_MAX_RETRIES = 10
+
 # Set of strings of various test-injected faults.
 _TEST_INJECTED_FAULTS = set()
 
 
-class Error(Exception):
-  """Base class for exceptions in this module."""
-
-
-class NotEnoughArgumentsError(Error):
-  """Required argument is missing."""
-
-
-class NoDataError(Error):
-  """There is no data present for a desired input."""
-
-
 def _run_task_hook(hooks, method, task, queue_name):
   """Invokes hooks.method(task, queue_name).
 
     if control and control.command == model.MapreduceControl.ABORT:
       logging.info("Abort command received by shard %d of job '%s'",
                    shard_state.shard_number, shard_state.mapreduce_id)
-      if tstate.output_writer:
-        tstate.output_writer.finalize(ctx, shard_state.shard_number)
-      # We recieved a command to abort. We don't care if we override
-      # some data.
+      # NOTE: When aborting, specifically do not finalize the output writer
+      # because it might be in a bad state.
       shard_state.active = False
       shard_state.result_status = model.ShardState.RESULT_ABORTED
       shard_state.put(config=util.create_datastore_write_config(spec))
     else:
       quota_consumer = None
 
+    # Tell NDB to never cache anything in memcache or in-process. This ensures
+    # that entities fetched from Datastore input_readers via NDB will not bloat
+    # up the request memory size and Datastore Puts will avoid doing calls
+    # to memcache. Without this you get soft memory limit exits, which hurts
+    # overall throughput.
+    if ndb is not None:
+      ndb_ctx = ndb.get_context()
+      ndb_ctx.set_cache_policy(lambda key: False)
+      ndb_ctx.set_memcache_policy(lambda key: False)
+
     context.Context._set(ctx)
+
     try:
-      # consume quota ahead, because we do not want to run a datastore
-      # query if there's not enough quota for the shard.
-      if not quota_consumer or quota_consumer.check():
-        scan_aborted = False
-        entity = None
-
-        # We shouldn't fetch an entity from the reader if there's not enough
-        # quota to process it. Perform all quota checks proactively.
-        if not quota_consumer or quota_consumer.consume():
-          for entity in input_reader:
-            if isinstance(entity, db.Model):
-              shard_state.last_work_item = repr(entity.key())
-            else:
-              shard_state.last_work_item = repr(entity)[:100]
-
-            scan_aborted = not self.process_data(
-                entity, input_reader, ctx, tstate)
-
-            # Check if we've got enough quota for the next entity.
-            if (quota_consumer and not scan_aborted and
-                not quota_consumer.consume()):
-              scan_aborted = True
-            if scan_aborted:
-              break
-        else:
-          scan_aborted = True
-
-
-        if not scan_aborted:
-          logging.info("Processing done for shard %d of job '%s'",
-                       shard_state.shard_number, shard_state.mapreduce_id)
-          # We consumed extra quota item at the end of for loop.
-          # Just be nice here and give it back :)
-          if quota_consumer:
-            quota_consumer.put(1)
-          shard_state.active = False
-          shard_state.result_status = model.ShardState.RESULT_SUCCESS
-
-      operation.counters.Increment(
-          context.COUNTER_MAPPER_WALLTIME_MS,
-          int((time.time() - self._start_time)*1000))(ctx)
-
-      # TODO(user): Mike said we don't want this happen in case of
-      # exception while scanning. Figure out when it's appropriate to skip.
-      ctx.flush()
+      self.process_inputs(
+          input_reader, shard_state, tstate, quota_consumer, ctx)
 
       if not shard_state.active:
-        # shard is going to stop. Finalize output writer if any.
-        if tstate.output_writer:
+        # shard is going to stop. Finalize output writer only when shard is
+        # successful because writer might be stuck in some bad state otherwise.
+        if (shard_state.result_status == model.ShardState.RESULT_SUCCESS and
+            tstate.output_writer):
           tstate.output_writer.finalize(ctx, shard_state.shard_number)
 
       config = util.create_datastore_write_config(spec)
       def tx():
         fresh_shard_state = db.get(
             model.ShardState.get_key_by_shard_id(shard_id))
+        if not fresh_shard_state:
+          raise db.Rollback()
         if (not fresh_shard_state.active or
             "worker_active_state_collision" in _TEST_INJECTED_FAULTS):
           shard_state.active = False
       self.reschedule(shard_state, tstate)
     gc.collect()
 
+  def process_inputs(self,
+                     input_reader,
+                     shard_state,
+                     transient_shard_state,
+                     quota_consumer,
+                     ctx):
+    """Read inputs, process them, and write out outputs.
+
+    This is the core logic of MapReduce. It reads inputs from input reader,
+    invokes user specified mapper function, and writes output with
+    output writer. It also updates shard_state accordingly.
+    e.g. if shard processing is done, set shard_state.active to False.
+
+    If errors.FailJobError is caught, it will fail this MR job.
+    All other exceptions will be logged and raised to taskqueue for retry
+    until the number of retries exceeds a limit.
+
+    Args:
+      input_reader: input reader.
+      shard_state: shard state.
+      transient_shard_state: transient shard state.
+      quota_consumer: quota consumer to limit processing rate.
+      ctx: mapreduce context.
+    """
+    # We shouldn't fetch an entity from the reader if there's not enough
+    # quota to process it. Perform all quota checks proactively.
+    if not quota_consumer or quota_consumer.consume():
+      finished_shard = True
+      try:
+        for entity in input_reader:
+          if isinstance(entity, db.Model):
+            shard_state.last_work_item = repr(entity.key())
+          else:
+            shard_state.last_work_item = repr(entity)[:100]
+
+          if not self.process_data(
+              entity, input_reader, ctx, transient_shard_state):
+            finished_shard = False
+            break
+          elif quota_consumer and not quota_consumer.consume():
+            # not enough quota to keep processing.
+            finished_shard = False
+            break
+
+        # Flush context and its pools.
+        operation.counters.Increment(
+            context.COUNTER_MAPPER_WALLTIME_MS,
+            int((time.time() - self._start_time)*1000))(ctx)
+        ctx.flush()
+
+      except errors.FailJobError, e:
+        logging.error("Job failed: %s", e)
+        shard_state.active = False
+        shard_state.result_status = model.ShardState.RESULT_FAILED
+        return
+      # Any other exception during this data processing period should
+      # be handled by mapreduce's retry instead of taskqueue's.
+      except Exception, e:
+        logging.error("Slice error: %s", e)
+        retry_count = int(
+            os.environ.get("HTTP_X_APPENGINE_TASKRETRYCOUNT") or 0)
+        if retry_count <= _RETRY_SLICE_ERROR_MAX_RETRIES:
+          raise
+        logging.error("Too many retries: %d, failing the job", retry_count)
+        shard_state.active = False
+        shard_state.result_status = model.ShardState.RESULT_FAILED
+        return
+
+      # Will not reach this point if any exception happened.
+      if finished_shard:
+        logging.info("Processing done for shard %d of job '%s'",
+                     shard_state.shard_number, shard_state.mapreduce_id)
+        # We consumed extra quota item at the end of the for loop.
+        # Just be nice here and give it back :)
+        if quota_consumer:
+          quota_consumer.put(1)
+        shard_state.active = False
+        shard_state.result_status = model.ShardState.RESULT_SUCCESS
+
   def process_data(self, data, input_reader, ctx, transient_shard_state):
     """Process a single data piece.
 
     Args:
       data: a datum to process.
       input_reader: input reader.
-      ctx: current execution context.
+      ctx: mapreduce context
+      transient_shard_state: transient shard state.
 
     Returns:
       True if scan should be continued, False if scan should be aborted.
               output_writer.write(output, ctx)
 
     if self._time() - self._start_time > _SLICE_DURATION_SEC:
-      logging.debug("Spent %s seconds. Rescheduling",
-                    self._time() - self._start_time)
       return False
     return True
 
     spec = model.MapreduceSpec.from_json_str(
         self.request.get("mapreduce_spec"))
 
-    # TODO(user): Make this logging prettier.
-    logging.debug("post: id=%s headers=%s spec=%s",
-                  spec.mapreduce_id, self.request.headers,
-                  self.request.get("mapreduce_spec"))
-
     state, control = db.get([
         model.MapreduceState.get_key_by_job_id(spec.mapreduce_id),
         model.MapreduceControl.get_key_by_job_id(spec.mapreduce_id),
       state.active_shards = len(active_shards)
       state.failed_shards = len(failed_shards)
       state.aborted_shards = len(aborted_shards)
+      if not control and failed_shards:
+        model.MapreduceControl.abort(spec.mapreduce_id)
 
     if (not state.active and control and
         control.command == model.MapreduceControl.ABORT):
       base_path: handler base path.
     """
     config = util.create_datastore_write_config(mapreduce_spec)
+
+    # Only finalize the output writers if we the job is successful.
+    if (mapreduce_spec.mapper.output_writer_class() and
+        mapreduce_state.result_status == model.MapreduceState.RESULT_SUCCESS):
+      mapreduce_spec.mapper.output_writer_class().finalize_job(mapreduce_state)
+
     # Enqueue done_callback if needed.
-    if mapreduce_spec.mapper.output_writer_class():
-      mapreduce_spec.mapper.output_writer_class().finalize_job(mapreduce_state)
     def put_state(state):
       state.put(config=config)
       done_callback = mapreduce_spec.params.get(
       parameter value
 
     Raises:
-      NotEnoughArgumentsError: if parameter is not specified.
+      errors.NotEnoughArgumentsError: if parameter is not specified.
     """
     value = self.request.get(param_name)
     if not value:
-      raise NotEnoughArgumentsError(param_name + " not specified")
+      raise errors.NotEnoughArgumentsError(param_name + " not specified")
     return value
 
   @classmethod
       parameter value
 
     Raises:
-      NotEnoughArgumentsError: if parameter is not specified.
+      errors.NotEnoughArgumentsError: if parameter is not specified.
     """
     value = self.request.get(param_name)
     if not value:
-      raise NotEnoughArgumentsError(param_name + " not specified")
+      raise errors.NotEnoughArgumentsError(param_name + " not specified")
     return value
 
   @classmethod
       raise Exception("Parent shouldn't be specfied "
                       "for non-transactional starts.")
 
-    # Check that handler can be instantiated.
-    mapper_spec.get_handler()
-
     # Check that reader can be instantiated and is configured correctly
     mapper_input_reader_class = mapper_spec.input_reader_class()
     mapper_input_reader_class.validate(mapper_spec)
         mapreduce_params,
         hooks_class_name)
 
+    # Check that handler can be instantiated.
+    ctx = context.Context(mapreduce_spec, None)
+    context.Context._set(ctx)
+    try:
+      mapper_spec.get_handler()
+    finally:
+      context.Context._set(None)
+
     kickoff_params = {"mapreduce_spec": mapreduce_spec.to_json_str()}
     if _app:
       kickoff_params["app"] = _app

mapreduce/input_readers.py

     "DatastoreEntityInputReader",
     "DatastoreInputReader",
     "DatastoreKeyInputReader",
+    "FileInputReader",
+    "RandomStringInputReader",
     "Error",
     "InputReader",
+    "LogInputReader",
     "NamespaceInputReader",
     "RecordsReader",
     ]
 
 # pylint: disable-msg=C6409
 
+import base64
 import copy
+import logging
+import random
+import string
 import StringIO
 import time
 import zipfile
 
+from google.net.proto import ProtocolBuffer
 from google.appengine.api import datastore
 from mapreduce.lib import files
+from google.appengine.api import logservice
 from mapreduce.lib.files import records
+from google.appengine.api.logservice import log_service_pb
 from google.appengine.datastore import datastore_query
 from google.appengine.datastore import datastore_rpc
 from google.appengine.ext import blobstore
 from google.appengine.ext.db import metadata
 from mapreduce import context
 from mapreduce import errors
+from mapreduce import file_format_parser
+from mapreduce import file_format_root
 from mapreduce import model
 from mapreduce import namespace_range
 from mapreduce import operation
   def validate(cls, mapper_spec):
     """Validates mapper spec and all mapper parameters.
 
+    Input reader parameters are expected to be passed as "input_reader"
+    subdictionary of mapper_spec.params. To be compatible with previous
+    API input reader is advised to check mapper_spec.params and issue
+    a warning if "input_reader" subdicationary is not present.
+    _get_params helper method can be used to simplify implementation.
+
     Args:
       mapper_spec: The MapperSpec for this InputReader.
 
     raise NotImplementedError("validate() not implemented in %s" % cls)
 
 
+def _get_params(mapper_spec, allowed_keys=None):
+  """Obtain input reader parameters.
+
+  Utility function for input readers implementation. Fetches parameters
+  from mapreduce specification giving appropriate usage warnings.
+
+  Args:
+    mapper_spec: The MapperSpec for the job
+    allowed_keys: set of all allowed keys in parameters as strings. If it is not
+      None, then parameters are expected to be in a separate "input_reader"
+      subdictionary of mapper_spec parameters.
+
+  Returns:
+    mapper parameters as dict
+
+  Raises:
+    BadReaderParamsError: if parameters are invalid/missing or not allowed.
+  """
+  if "input_reader" not in mapper_spec.params:
+    message = ("Input reader's parameters should be specified in "
+        "input_reader subdictionary.")
+    if allowed_keys:
+      raise errors.BadReaderParamsError(message)
+    else:
+      logging.warning(message)
+    params = mapper_spec.params
+    params = dict((str(n), v) for n, v in params.iteritems())
+  else:
+    if not isinstance(mapper_spec.params.get("input_reader"), dict):
+      raise BadReaderParamsError(
+          "Input reader parameters should be a dictionary")
+    params = mapper_spec.params.get("input_reader")
+    params = dict((str(n), v) for n, v in params.iteritems())
+    if allowed_keys:
+      params_diff = set(params.keys()) - allowed_keys
+      if params_diff:
+        raise errors.BadReaderParamsError(
+            "Invalid input_reader parameters: %s" % ",".join(params_diff))
+  return params
+
+
+class FileInputReader(InputReader):
+
+  # Reader Parameters
+  FILES_PARAM = "files"
+  FORMAT_PARAM = "format"
+
+  def __init__(self, format_root):
+    """Initialize input reader.
+
+    Args:
+      format_root: a FileFormatRoot instance.
+    """
+    self._file_format_root = format_root
+
+  def __iter__(self):
+    """Inherit docs."""
+    return self
+
+  def next(self):
+    """Inherit docs."""
+    ctx = context.get()
+    start_time = time.time()
+
+    content = self._file_format_root.next().read()
+
+    if ctx:
+      operation.counters.Increment(
+          COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
+      operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx)
+
+    return content
+
+  @classmethod
+  def split_input(cls, mapper_spec):
+    """Inherit docs."""
+    params = _get_params(mapper_spec)
+
+    # Expand potential file patterns to a list of filenames.
+    filenames = []
+    for f in params[cls.FILES_PARAM]:
+      parsedName = files.gs.parseGlob(f)
+      if isinstance(parsedName, tuple):
+        filenames.extend(files.gs.listdir(parsedName[0],
+                                          {"prefix": parsedName[1]}))
+      else:
+        filenames.append(parsedName)
+
+    file_format_roots = file_format_root.split(filenames,
+                                               params[cls.FORMAT_PARAM],
+                                               mapper_spec.shard_count)
+
+    return [cls(root) for root in file_format_roots]
+
+  @classmethod
+  def validate(cls, mapper_spec):
+    """Inherit docs."""
+    if mapper_spec.input_reader_class() != cls:
+      raise BadReaderParamsError("Mapper input reader class mismatch")
+
+    # Check parameters.
+    params = _get_params(mapper_spec)
+    if cls.FILES_PARAM not in params:
+      raise BadReaderParamsError("Must specify %s" % cls.FILES_PARAM)
+    if cls.FORMAT_PARAM not in params:
+      raise BadReaderParamsError("Must specify %s" % cls.FORMAT_PARAM)
+
+    format_string = params[cls.FORMAT_PARAM]
+    if not isinstance(format_string, basestring):
+      raise BadReaderParamsError("format should be string but is %s" %
+                                 cls.FORMAT_PARAM)
+    try:
+      file_format_parser.parse(format_string)
+    except ValueError, e:
+      raise BadReaderParamsError(e)
+
+    paths = params[cls.FILES_PARAM]
+    if not (paths and isinstance(paths, list)):
+      raise BadReaderParamsError("files should be a list of filenames.")
+
+    # Further validations are done by parseGlob().
+    try:
+      for path in paths:
+        files.gs.parseGlob(path)
+    except files.InvalidFileNameError:
+      raise BadReaderParamsError("Invalid filename %s." % path)
+
+  @classmethod
+  def from_json(cls, json):
+    """Inherit docs."""
+    return cls(
+        file_format_root.FileFormatRoot.from_json(json["file_format_root"]))
+
+  def to_json(self):
+    """Inherit docs."""
+    return {"file_format_root": self._file_format_root.to_json()}
+
+
 # TODO(user): This should probably be renamed something like
 # "DatastoreInputReader" and DatastoreInputReader should be called
 # "DatastoreModelReader".
   KEY_RANGE_PARAM = "key_range"
   NAMESPACE_RANGE_PARAM = "namespace_range"
   CURRENT_KEY_RANGE_PARAM = "current_key_range"
+  FILTERS_PARAM = "filters"
 
   # TODO(user): Add support for arbitrary queries. It's not possible to
   # support them without cursors since right now you can't even serialize query
                key_ranges=None,
                ns_range=None,
                batch_size=_BATCH_SIZE,
-               current_key_range=None):
+               current_key_range=None,
+               filters=None):
     """Create new AbstractDatastoreInputReader object.
 
     This is internal constructor. Use split_query in a concrete class instead.
           key_ranges or ns_range can be non-None.
       batch_size: size of read batch as int.
       current_key_range: the current key_range.KeyRange being processed.
+      filters: optional list of filters to apply to the query. Each filter is
+        a tuple: (<property_name_as_str>, <query_operation_as_str>, <value>).
+        User filters are applied first.
     """
     assert key_ranges is not None or ns_range is not None, (
         "must specify one of 'key_ranges' or 'ns_range'")
     self._ns_range = ns_range
     self._batch_size = int(batch_size)
     self._current_key_range = current_key_range
+    self._filters = filters
+
+  @classmethod
+  def _get_raw_entity_kind(cls, entity_kind):
+    if "." in entity_kind:
+      logging.warning(
+          ". detected in entity kind %s specified for reader %s."
+          "Assuming entity kind contains the dot.",
+          entity_kind, cls.__name__)
+    return entity_kind
 
   def __iter__(self):
     """Iterates over the given KeyRanges or NamespaceRange.
   # TODO(user): use query splitting functionality when it becomes available
   # instead.
   @classmethod
-  def _split_input_from_namespace(cls, app, namespace, entity_kind_name,
+  def _split_input_from_namespace(cls, app, namespace, entity_kind,
                                   shard_count):
     """Return KeyRange objects. Helper for _split_input_from_params.
 
     end.
     """
 
-    raw_entity_kind = util.get_short_name(entity_kind_name)
-
+    raw_entity_kind = cls._get_raw_entity_kind(entity_kind)
     if shard_count == 1:
       # With one shard we don't need to calculate any splitpoints at all.
       return [key_range.KeyRange(namespace=namespace, _app=app)]
 
-    # we use datastore.Query instead of ext.db.Query here, because we can't
-    # erase ordering on db.Query once we set it.
     ds_query = datastore.Query(kind=raw_entity_kind,
                                namespace=namespace,
                                _app=app,
     """
     if mapper_spec.input_reader_class() != cls:
       raise BadReaderParamsError("Input reader class mismatch")
-    params = mapper_spec.params
+    params = _get_params(mapper_spec)
     if cls.ENTITY_KIND_PARAM not in params:
       raise BadReaderParamsError("Missing mapper parameter 'entity_kind'")
     if cls.BATCH_SIZE_PARAM in params:
             "Expected a single namespace string")
     if cls.NAMESPACES_PARAM in params:
       raise BadReaderParamsError("Multiple namespaces are no longer supported")
+    if cls.FILTERS_PARAM in params:
+      filters = params[cls.FILTERS_PARAM]
+      if not isinstance(filters, list):
+        raise BadReaderParamsError("Expected list for filters parameter")
+      for f in filters:
+        if not isinstance(f, (tuple, list)):
+          raise BadReaderParamsError("Filter should be a tuple or list: %s", f)
+        if len(f) != 3:
+          raise BadReaderParamsError("Filter should be a 3-tuple: %s", f)
+        if not isinstance(f[0], basestring):
+          raise BadReaderParamsError("First element should be string: %s", f)
+        if f[1] != "=":
+          raise BadReaderParamsError(
+              "Only equality filters are supported: %s", f)
 
   @classmethod
   def split_input(cls, mapper_spec):
       equal to number_of_shards but may be padded with Nones if there are too
       few results for effective sharding.
     """
-    params = mapper_spec.params
+    params = _get_params(mapper_spec)
     entity_kind_name = params[cls.ENTITY_KIND_PARAM]
     batch_size = int(params.get(cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE))
     shard_count = mapper_spec.shard_count
     namespace = params.get(cls.NAMESPACE_PARAM)
     app = params.get(cls._APP_PARAM)
+    filters = params.get(cls.FILTERS_PARAM)
 
     if namespace is None:
       # It is difficult to efficiently shard large numbers of namespaces because
         return [cls(entity_kind_name,
                     key_ranges=None,
                     ns_range=ns_range,
-                    batch_size=batch_size)
+                    batch_size=batch_size,
+                    filters=filters)
                 for ns_range in ns_ranges]
       elif not namespace_keys:
         return [cls(entity_kind_name,
                     key_ranges=None,
                     ns_range=namespace_range.NamespaceRange(),
-                    batch_size=shard_count)]
+                    batch_size=shard_count,
+                    filters=filters)]
       else:
         namespaces = [namespace_key.name() or ""
                       for namespace_key in namespace_keys]
     else:
       namespaces = [namespace]
 
-    return cls._split_input_from_params(
+    readers = cls._split_input_from_params(
         app, namespaces, entity_kind_name, params, shard_count)
+    if filters:
+      for reader in readers:
+        reader._filters = filters
+    return readers
 
   def to_json(self):
     """Serializes all the data in this query range into json form.
                  self.NAMESPACE_RANGE_PARAM: namespace_range_json,
                  self.CURRENT_KEY_RANGE_PARAM: current_key_range_json,
                  self.ENTITY_KIND_PARAM: self._entity_kind,
-                 self.BATCH_SIZE_PARAM: self._batch_size}
+                 self.BATCH_SIZE_PARAM: self._batch_size,
+                 self.FILTERS_PARAM: self._filters}
     return json_dict
 
   @classmethod
         key_ranges,
         ns_range,
         json[cls.BATCH_SIZE_PARAM],
-        current_key_range)
+        current_key_range,
+        filters=json.get(cls.FILTERS_PARAM))
 
 
 class DatastoreInputReader(AbstractDatastoreInputReader):
     cursor = None
     while True:
       query = k_range.make_ascending_query(
-          util.for_name(self._entity_kind))
-      if cursor:
-        query.with_cursor(cursor)
+          util.for_name(self._entity_kind),
+          filters=self._filters)
+      if isinstance(query, db.Query):
+        # Old db version.
+        if cursor:
+          query.with_cursor(cursor)
 
-      results = query.fetch(limit=self._batch_size)
-      if not results:
-        break
+        results = query.fetch(limit=self._batch_size)
+        if not results:
+          break
 
-      for model_instance in results:
-        key = model_instance.key()
-        yield key, model_instance
-      cursor = query.cursor()
+        for model_instance in results:
+          key = model_instance.key()
+          yield key, model_instance
+        cursor = query.cursor()
+      else:
+        # NDB version using fetch_page().
+        results, cursor, more = query.fetch_page(self._batch_size,
+                                                 start_cursor=cursor)
+        for model_instance in results:
+          key = model_instance.key
+          yield key, model_instance
+        if not more:
+          break
 
   @classmethod
   def validate(cls, mapper_spec):
       BadReaderParamsError: required parameters are missing or invalid.
     """
     super(DatastoreInputReader, cls).validate(mapper_spec)
-    params = mapper_spec.params
+    params = _get_params(mapper_spec)
     keys_only = util.parse_bool(params.get(cls.KEYS_ONLY_PARAM, False))
     if keys_only:
       raise BadReaderParamsError("The keys_only parameter is obsolete. "
     except ImportError, e:
       raise BadReaderParamsError("Bad entity kind: %s" % e)
 
+  @classmethod
+  def _get_raw_entity_kind(cls, entity_kind):
+    """Returns an entity kind to use with datastore calls."""
+    entity_type = util.for_name(entity_kind)
+    if isinstance(entity_kind, db.Model):
+      return entity_type.kind()
+    else:
+      return util.get_short_name(entity_kind)
+
 
 class DatastoreKeyInputReader(AbstractDatastoreInputReader):
   """An input reader which takes a Kind and yields Keys for that kind."""
 
   def _iter_key_range(self, k_range):
-    raw_entity_kind = util.get_short_name(self._entity_kind)
+    raw_entity_kind = self._get_raw_entity_kind(self._entity_kind)
     query = k_range.make_ascending_datastore_query(
-        raw_entity_kind, keys_only=True)
+        raw_entity_kind, keys_only=True, filters=self._filters)
     for key in query.Run(
         config=datastore_query.QueryOptions(batch_size=self._batch_size)):
       yield key, key
   """An input reader which yields low level datastore entities for a kind."""
 
   def _iter_key_range(self, k_range):
-    raw_entity_kind = util.get_short_name(self._entity_kind)
+    raw_entity_kind = self._get_raw_entity_kind(self._entity_kind)
     query = k_range.make_ascending_datastore_query(
-        raw_entity_kind)
+        raw_entity_kind, self._filters)
     for entity in query.Run(
         config=datastore_query.QueryOptions(batch_size=self._batch_size)):
       yield entity.key(), entity
     """
     if mapper_spec.input_reader_class() != cls:
       raise BadReaderParamsError("Mapper input reader class mismatch")
-    params = mapper_spec.params
+    params = _get_params(mapper_spec)
     if cls.BLOB_KEYS_PARAM not in params:
       raise BadReaderParamsError("Must specify 'blob_keys' for mapper input")
     blob_keys = params[cls.BLOB_KEYS_PARAM]
     Returns:
       A list of BlobstoreInputReaders corresponding to the specified shards.
     """
-    params = mapper_spec.params
+    params = _get_params(mapper_spec)
     blob_keys = params[cls.BLOB_KEYS_PARAM]
     if isinstance(blob_keys, basestring):
       # This is a mechanism to allow multiple blob keys (which do not contain