Commits

svevang committed d51bd0f

handle large requests

  • Participants
  • Parent commits c6b59bc

Comments (0)

Files changed (1)

File pyft/fusiontables.py

 import datetime
 import csv
 import time
+from urllib2 import HTTPError
 
 from pyft import current_app
 from pyft.client.sql.sqlbuilder import SQL
       new_query = SQL().insert(self.table_id, insert_values)
       query_list.append(new_query)
 
-    result_batch = self.bulk_insert_query(query_list)
+    inserted_row_ids = self.bulk_insert_query(query_list)
 
-    new_row_ids = []
-    for result in result_batch:
-      rows = result.strip().split('\n')
-      # skip the header
-      for row in csv.reader(rows[1:]):
-        new_row_ids += row
+    logger.debug('return row ids for the inserted rows {0}'.format(inserted_row_ids))
+    return inserted_row_ids
 
-    logger.debug('return row ids for the inserted rows {0}'.format(row) )
-    return new_row_ids
 
   def bulk_insert_query(self, query_list):
 
     todo_query_list = [] 
     results = []
 
+    logger.debug('starting bulk insert of {0} queries'.format(len(query_list)))
+
+    def run_query_list(ql):
+      new_row_ids = []
+      logger.debug('running query list insert of {0} queries'.format(len(ql)))
+
+      try:
+        result = self.run_query(";".join(ql))
+      except HTTPError, e:
+        # We are in an error state
+        logger.debug('Recieved Error:', e)
+        if e.code == 500:
+          # did google fusion tables barf?
+          # check if our rows are present
+          logger.debug('500 Error')
+
+      rows = result.strip().split('\n')
+      # skip the header
+      for row in csv.reader(rows[1:]):
+        new_row_ids += row
+      return new_row_ids
+
+    row_ids = []
     for query in query_list:
-      if len(";".join(todo_query_list + [query])) > QUERY_SIZE_LIMIT or \
-          len(query_list) >= 500:
-        self.run_query(";".join(todo_query_list))
-        results.append(todo_query_list = [query])
+      if len(";".join(todo_query_list + [query])) > QUERY_SIZE_LIMIT or len(todo_query_list) >= 500:
+        row_ids += run_query_list(todo_query_list)
+        logger.debug('Hit critical limit, size: {0} , len: {1}'.format(len(";".join(todo_query_list + [query])), len(todo_query_list) )  )
+        todo_query_list = [query]
       else:
         todo_query_list.append(query)
+
     # clean up any remaining queries
     if todo_query_list:
-      results.append(self.run_query(";".join(todo_query_list)))
+      row_ids += run_query_list(todo_query_list)
 
-    return results
+    return row_ids
 
 
-  def select(self, column_name="ROWID", in_values=[]):
+  def insert_with_retry(self, unique_key, rows=[], num_retries=3):
+    "insert routine with error handling and checking"
+
+    i=0
+    completed_rows = []
+
+    while i < num_retries:
+
+      try:
+        res_rows = self.insert(rows)
+      except HTTPError, e:
+        logger.debug('Recieved Error:', e)
+        if e.code == 500:
+          logger.debug('500 Error')
+          # select row ids that have been inserted
+          # diff remaining rows for insertion
+          # self.select()
+      else:
+        return res_rows
+
+      i = i+1
+
+    return new_row_ids
+
+
+  def select(self, unique_key, rows=[]):
     """
-    Push data locally back to google-hosted fusion table
-    `rows` is a list of Row objects
+    Based on a set of rows, get the remote fusion table row_ids
+    `rows` are a set of row objects
+    `unique_key` is the name of a column that acts as a primary key in our rows
     """
-    # TODO flesh this out
 
-    return self.run_query("select * from {0} limit 1".format(self.table_id))
+    # build a lookup mapping for key-column-name -> key-value -> row
+    key_field_map = {}
+    key_values = []
+    for row in rows:
+      for field in row:
+        if field.column_name == unique_key:
+          key_values.append(field.value)
+          key_field_map[field.value] = row 
+   
+    select_columns = rows[0].column_names() + ['ROWID']
+    key_column_values = ",".join([r.field_lookup[unique_key].prepare_value() for r in rows])
+    membership_clause = "'{0}' IN ({1})".format(unique_key,
+                                                key_column_values)
+
+    select_query = SQL().select(self.table_id, select_columns, membership_clause)
+    return self.run_query(select_query)
+
+  def parse_row_results(self, results):
+    pass
 
   def update(self, rows=[]):
     """
     logger.debug('BULk UPDATE result batch {0}'.format(result_batch) )
     return result_batch
 
-  def get_row_ids(self, unique_key, rows=[]):
-    """
-    Based on a set of rows, get the remote fusion table row_ids
-    `rows` are a set of row objects
-    `unique_key` is the name of a column that acts as a primary key in our rows
-    """
-
-    # build a lookup mapping for key-column-name -> key-value -> row
-    key_field_map = {}
-    key_values = []
-    for row in rows:
-      for field in row:
-        if field.column_name == unique_key:
-          key_values.append(field.value)
-          key_field_map[field.value] = row 
-   
-    select_columns = rows[0].column_names() + ['ROWID']
-    key_column_values = ",".join([r.field_lookup[unique_key].prepare_value() for r in rows])
-    membership_clause = "'{0}' IN ({1})".format(unique_key,
-                                                key_column_values)
-
-    select_query = SQL().select(self.table_id, select_columns, membership_clause)
-    return self.run_query(select_query)
-
 
   def pull(self):
     """