Commits

Frederic De Groef committed 17e3aba

whole database reprocess: added error logging

Comments (0)

Files changed (1)

scripts/csxj_reprocess_entire_database.py

 from datetime import datetime
 import time
 import traceback
+import random
 
 import csxj.db as csxjdb
 from csxj.datasources import lesoir, lalibre, dhnet, sudinfo, rtlinfo, lavenir, rtbfinfo, levif, septsursept, sudpresse
     'sudpresse': sudpresse
 }
 
+
 def write_dict_to_file(d, outdir, outfile):
     """
     """
         json.dump(d, outfile)
 
 
-
 def reprocess_single_batch(datasource_parser, raw_data_dir):
     raw_data_index_file = os.path.join(raw_data_dir, csxjdb.constants.RAW_DATA_INDEX_FILENAME)
 
                 print "    Reprocessing: {1} ({0})".format(url, raw_filepath)
                 #reprocessed_articles.append(raw_filepath)
                 with open(raw_filepath, 'r') as raw_html:
-                   article_data, html = datasource_parser.extract_article_data(raw_html)
-                   article_data.url = url
-                   reprocessed_articles.append((article_data, html))
+                    article_data, html = datasource_parser.extract_article_data(raw_html)
+                    article_data.url = url
+                    reprocessed_articles.append((article_data, html))
             except Exception as e:
                 stacktrace = traceback.format_exc()
                 print "!!! FAIL", e.message
-                errors_encountered.append((raw_filepath, stacktrace))
+                errors_encountered.append((url, raw_filepath, stacktrace))
 
     return reprocessed_articles, errors_encountered
 
 
-
 def save_reprocessed_batch(dest_root_dir, source_name, day_string, batch_hour_string, articles):
     batch_root_dir = os.path.join(dest_root_dir, source_name, day_string, batch_hour_string)
     if not os.path.exists(batch_root_dir):
 
     print "+++ Saving {0} articles to {1}".format(len(articles), batch_root_dir)
     articles_json_data = {"articles": [article.to_json() for article, raw_html in articles],
-                            "errors": []}
+                          "errors": []}
     articles_filepath = os.path.join(batch_root_dir, csxjdb.constants.ARTICLES_FILENAME)
-    write_dict_to_file(articles_json_data, batch_root_dir,  articles_filepath)
-
+    write_dict_to_file(articles_json_data, batch_root_dir, articles_filepath)
 
     raw_data_dir = os.path.join(batch_root_dir, csxjdb.constants.RAW_DATA_DIR)
     if not os.path.exists(raw_data_dir):
         json.dump(index, f)
 
 
-
-
 def reprocess_raw_html(p, dest_root_dir):
     datasource = NAME_TO_SOURCE_MODULE_MAPPING[p.name]
     article_count = 0
 
     all_reprocessing_errors = list()
 
-    for day in p.get_all_days():
+    errors_by_day = dict()
+    all_days = p.get_all_days()
+    n_days = len(all_days)
+    subset = int(float(n_days) * 0.05)
+    print("    total days: {0}".format(n_days))
+    print("    picking {0} random days".format(subset))
+    random.shuffle(all_days)
+    for day in all_days[:subset]:
+        errors_by_batch = dict()
         for batch_hour in p.get_all_batch_hours(day):
             batch_root_dir = os.path.join(p.directory, day, batch_hour)
-            raw_data_dir =  os.path.join(batch_root_dir, csxjdb.constants.RAW_DATA_DIR)
+            raw_data_dir = os.path.join(batch_root_dir, csxjdb.constants.RAW_DATA_DIR)
             batch_reprocessed_articles, batch_reprocessing_errors = reprocess_single_batch(datasource, raw_data_dir)
 
             if p.has_reprocessed_content(day, batch_hour):
                 reprocessed_dates = p.get_reprocessed_dates(day, batch_hour)
                 for day_string, batch_time_string in reprocessed_dates:
-                    old_reprocessed_raw_data_dir = os.path.join(batch_root_dir, "{0}_{1}_{2}".format(csxjdb.constants.REPROCESSED_DIR_PREFIX, day_string, batch_time_string),  csxjdb.constants.RAW_DATA_DIR)
+                    old_reprocessed_raw_data_dir = os.path.join(batch_root_dir, "{0}_{1}_{2}".format(csxjdb.constants.REPROCESSED_DIR_PREFIX, day_string, batch_time_string), csxjdb.constants.RAW_DATA_DIR)
                     reprocessed_articles, errors = reprocess_single_batch(datasource, old_reprocessed_raw_data_dir)
                     batch_reprocessed_articles.extend(reprocessed_articles)
                     batch_reprocessing_errors.extend(errors)
 
-            all_reprocessing_errors.extend(batch_reprocessing_errors)
+            if batch_reprocessing_errors:
+                errors_by_batch[batch_hour] = batch_reprocessing_errors
+            #all_reprocessing_errors.extend(batch_reprocessing_errors)
             article_count += len(batch_reprocessed_articles)
 
             save_reprocessed_batch(dest_root_dir, p.name, day, batch_hour, batch_reprocessed_articles)
-            # new batch to save at 'ROOT/p.name/day/batch_hour'
-            # copy reprocessed articles and html to the new db
-            # copy old errors to the new db
 
-    return article_count, all_reprocessing_errors
+        if errors_by_batch:
+            errors_by_day[day] = errors_by_batch
 
+    return article_count, errors_by_day
 
 
+def main(source_path, dest_path):
+    """
 
+    Args:
 
 
+    Returns:
 
-if __name__ == "__main__":
-    source_root = "/Users/sevas/Documents/juliette/json_db_allfeeds"
-    dest = "/Users/sevas/Documents/juliette/json_db_allfeeds_reprocess"
+    """
     provider_names = csxjdb.get_all_provider_names(source_root)
-    print provider_names
+    provider_names = NAME_TO_SOURCE_MODULE_MAPPING.keys()
 
     before = datetime.now()
     n_samples = 0
-    for name in provider_names:
+    errors_by_source = dict()
+    for name in provider_names[:]:
         provider = csxjdb.Provider(source_root, name)
         count, errors = reprocess_raw_html(provider, dest)
+
+        if errors:
+            errors_by_source[name] = errors
+
         n_samples += count
 
     after = datetime.now()
 
     print "Projection for {0} articles:".format(projected_article_count), time.strftime("%H:%M:%S", time.gmtime(projected_time))
 
-#    import argxparse
+    write_dict_to_file(errors_by_source, os.path.join(dest, os.path.pardir), os.path.basename(dest) + "_errors.json")
+
+
+if __name__ == "__main__":
+#    import argparse
 #    parser = argparse.ArgumentParser(description='Utility functions to troubleshoot queue management')
 #    parser.add_argument('--source_jsondb', type=str, dest='source_jsondb', required=True, help='source json db root directory')
 #    parser.add_argument('--dest_jsondb', type=str, dest='dest_jsondb', required=True, help='dest json db root directory')
 #
-#
 #    args = parser.parse_args()
 
+    source_root = "/Users/sevas/Documents/juliette/json_db_0_5"
+    dest = "/Users/sevas/Documents/juliette/json_db_0_5_reprocess"
+
+    main(source_root, dest)
+
+#    source='septsursept'
+#    p = csxjdb.Provider(source_root, source)
+#    batches_by_day = p.get_queued_batches_by_day()
+#    import random
+#    import itertools as it
+#    from pprint import pprint
+#    random.shuffle(batches_by_day)
+#
+#    urls = list()
+#    for day, batches in batches_by_day[:20]:
+#        day_urls = list(it.chain(*[b['articles'] for hour, b in batches]))
+#        random.shuffle(day_urls)
+#        urls.append((day, day_urls[:1]))
+#    pprint(urls)
+#
+#
+#    import json
+#    with open("/Users/sevas/Desktop/7sur7.json", 'w') as f:
+#        json.dump(urls, f)