1. chango
  2. inferno

Commits

t1m  committed 7ac3801

added --process-map CLI option to run a rule on a job with map results

  • Participants
  • Parent commits 4bf304b
  • Branches default

Comments (0)

Files changed (2)

File inferno/bin/run.py

View file
         default=None,
         help="given a module.job_id, just run the result processor")
 
+    parser.add_argument(
+        "--process-map",
+        dest="process_map",
+        default=None,
+        help="resume a job using the mapresults of supplied module.job_id")
+
     options = parser.parse_args(argv)
 
     if options.source_tags:
         try:
             rule_name = options['process_results'].split('@')[0]
             job_name = options['process_results'].split('.')[1]
-            print '-->', rule_name, rules_dir
             rule = get_rules_by_name(rule_name, rules_dir, immediate=True)[0]
             job = InfernoJob(rule, settings)
             status, results = job.disco.results(job_name)
             log.error(trace)
             log.error("Error processing results for job: %s %s" % (options['process_results'], e))
             raise e
+    elif options['process_map']:
+        rules_dir = options.get('rules_directory')
+        if not rules_dir:
+            rules_dir = settings.get('rules_directory')
+        try:
+            rule_name = options['process_map'].split('@')[0]
+            job_name = options['process_map'].split('.')[1]
+            rule = get_rules_by_name(rule_name, rules_dir, immediate=True)[0]
+            rule.map_function = None
+            rule.source_tags = []
+            disco, ddfs = get_disco_handle(settings.get('server'))
+            rule.source_urls = disco.mapresults(job_name)
+            job = InfernoJob(rule, settings)
+            if job.start():
+                job.wait()
+        except Exception as e:
+            import traceback
+            trace = traceback.format_exc(15)
+            log.error(trace)
+            log.error("Error processing map results for job: %s %s" % (options['process_map'], e))
+            raise e
     elif options['immediate_rule']:
         # run inferno in 'immediate' mode
         settings['no_purge'] = True
         setproctitle('inferno - immediate.%s' % options['immediate_rule'])
         JobFactory.execute_immediate_rule(settings)
-
     else:
         # run inferno in 'daemon' mode
         from inferno.lib.daemon import InfernoDaemon

File inferno/lib/job.py

View file
         # process the map-results option (ie. skip map phase and grab map results from job id/ddfs
         self.archiver = self._determine_job_blobs()
         job_blobs = self.archiver.job_blobs
-        map_function = self.rule.map_function
         self.start_time = time.time()
         if self.settings.get('just_query'):
             self.query()
         if self._enough_blobs(self.archiver.blob_count):
             if self.rule.rule_init_function:
                 self.rule.rule_init_function(self.params)
-            self.job.run(name=self.rule.name,
-                         input=job_blobs,
-                         map=map_function,
-                         reduce=self.rule.reduce_function,
-                         params=self.params,
-                         partitions=self.rule.partitions,
-                         map_input_stream=self.rule.map_input_stream,
-                         map_output_stream=self.rule.map_output_stream,
-                         map_init=self.rule.map_init_function,
-                         save=self.rule.save or self.rule.result_tag is not None,
-                         scheduler=self.rule.scheduler,
-                         combiner=self.rule.combiner_function,
-                         reduce_output_stream=self.rule.reduce_output_stream,
-                         sort=self.rule.sort,
-                         sort_buffer_size=self.rule.sort_buffer_size,
-                         profile=self.settings.get('profile'),
-                         partition=self.rule.partition_function,
-                         required_files=self.rule.required_files,
-                         required_modules=self.rule.required_modules)
-
+            self._run(job_blobs)
             # actual id is only assigned after starting the job
             self.full_job_id = self.job.name
             return self.job
                        'tag results': self.archiver.tag_map,
                        'total_blobs': self.archiver.blob_count})
 
+    def _run(self, sources):
+        return self.job.run(name=self.rule.name,
+                            input=sources,
+                            map=self.rule.map_function,
+                            reduce=self.rule.reduce_function,
+                            params=self.params,
+                            partitions=self.rule.partitions,
+                            map_input_stream=self.rule.map_input_stream,
+                            map_output_stream=self.rule.map_output_stream,
+                            map_init=self.rule.map_init_function,
+                            save=self.rule.save or self.rule.result_tag is not None,
+                            scheduler=self.rule.scheduler,
+                            combiner=self.rule.combiner_function,
+                            reduce_output_stream=self.rule.reduce_output_stream,
+                            sort=self.rule.sort,
+                            sort_buffer_size=self.rule.sort_buffer_size,
+                            profile=self.settings.get('profile'),
+                            partition=self.rule.partition_function,
+                            required_files=self.rule.required_files,
+                            required_modules=self.rule.required_modules)
+
+
     def _safe_str(self, value):
         try:
             return str(value)