Commits

Frederic De Groef committed aa37361

full reprocessing can take advantage of multicore systems

Comments (0)

Files changed (1)

scripts/csxj_reprocess_entire_database.py

 import os
 import os.path
 import json
+import traceback
+import multiprocessing as mp
 from datetime import datetime
-import time
-import traceback
-import random
 
 import csxj.db as csxjdb
 from csxj.datasources import lesoir, lalibre, dhnet, sudinfo, rtlinfo, lavenir, rtbfinfo, levif, septsursept, sudpresse
         json.dump(index, f)
 
 
-def reprocess_raw_html(p, dest_root_dir):
-    datasource = NAME_TO_SOURCE_MODULE_MAPPING[p.name]
+def reprocess_raw_html(args):
+    provider_name, dest_root_dir = args
+    p = csxjdb.Provider(source_root, provider_name)
+    datasource = NAME_TO_SOURCE_MODULE_MAPPING[provider_name]
     article_count = 0
 
-    all_reprocessing_errors = list()
-
     errors_by_day = dict()
     all_days = p.get_all_days()
-#    n_days = len(all_days)
-#    subset = int(float(n_days) * 0.05)
-#    print("    total days: {0}".format(n_days))
-#    print("    picking {0} random days".format(subset))
-#    random.shuffle(all_days)
     for day in all_days[:]:
         errors_by_batch = dict()
         for batch_hour in p.get_all_batch_hours(day):
 
             if batch_reprocessing_errors:
                 errors_by_batch[batch_hour] = batch_reprocessing_errors
-            #all_reprocessing_errors.extend(batch_reprocessing_errors)
             article_count += len(batch_reprocessed_articles)
 
             save_reprocessed_batch(dest_root_dir, p.name, day, batch_hour, batch_reprocessed_articles)
         if errors_by_batch:
             errors_by_day[day] = errors_by_batch
 
-    return article_count, errors_by_day
 
+    err_dict = dict()
+    err_dict[provider_name] = errors_by_day
+    return article_count, err_dict
 
-def main(source_path, dest_path):
-    """
 
-    Args:
-
-
-    Returns:
-
-    """
+def main(source_path, dest_path, processes):
     provider_names = csxjdb.get_all_provider_names(source_root)
     provider_names = NAME_TO_SOURCE_MODULE_MAPPING.keys()
 
     before = datetime.now()
     n_samples = 0
     errors_by_source = dict()
-    for name in provider_names[:]:
-        provider = csxjdb.Provider(source_root, name)
-        count, errors = reprocess_raw_html(provider, dest)
 
-        if errors:
-            errors_by_source[name] = errors
 
-        n_samples += count
+    p = mp.Pool(processes)
+    results = p.map(reprocess_raw_html, [(name, dest) for name in provider_names])
+
+    n_samples = sum([x[0] for x in results])
+    errors_by_source = [x[1] for x in results]
 
     after = datetime.now()
     dt = after - before
     parser = argparse.ArgumentParser(description='Utility functions to troubleshoot queue management')
     parser.add_argument('--source_jsondb', type=str, dest='source_jsondb', required=True, help='source json db root directory')
     parser.add_argument('--dest_jsondb', type=str, dest='dest_jsondb', required=True, help='dest json db root directory')
+    parser.add_argument('--processes', type=int, dest='processes', required=False, default=1, help='Number of parallel processes to use (default=1)')
 
     args = parser.parse_args()
 
-#    source_root = "/Users/sevas/Documents/juliette/json_db_0_5"
-#    dest = "/Users/sevas/Documents/juliette/json_db_0_5_reprocess"
     source_root = args.source_jsondb
     dest = args.dest_jsondb
 
-    main(source_root, dest)
+    print "Using {0} processes".format(args.processes)
+    main(source_root, dest, args.processes)
 
-#    source='septsursept'
-#    p = csxjdb.Provider(source_root, source)
-#    batches_by_day = p.get_queued_batches_by_day()
-#    import random
-#    import itertools as it
-#    from pprint import pprint
-#    random.shuffle(batches_by_day)
-#
-#    urls = list()
-#    for day, batches in batches_by_day[:20]:
-#        day_urls = list(it.chain(*[b['articles'] for hour, b in batches]))
-#        random.shuffle(day_urls)
-#        urls.append((day, day_urls[:1]))
-#    pprint(urls)
-#
-#
-#    import json
-#    with open("/Users/sevas/Desktop/7sur7.json", 'w') as f:
-#        json.dump(urls, f)