Commits

Mark Roddy committed f28c3dc

Pulled the process of generating a relation's schema into its own function and added tests for it so that in the future a relation's schema can be asserted.

  • Participants
  • Parent commits 8d4e95a

Comments (0)

Files changed (3)

File squealer/pigproxy.py

         if alias in self.alias_overrides:
             del self.alias_overrides[alias]        
 
+    def schemaFor(self, alias):
+        """
+        Returns string containing the schema of the specified alias
+        """
+        self.register_script()
+        sb = StringBuilder()
+        Schema.stringifySchema(sb, self.pig.dumpSchema(alias), DataType.TUPLE)
+        return sb.toString()
+
     def override_to_data(self, alias, input_data):
         """
         Override a statement so that the alias results in having the
         specified set of data
         """
-        self.register_script()
-        sb = StringBuilder()
-        Schema.stringifySchema(sb, self.pig.dumpSchema(alias), DataType.TUPLE)
-        
+        schema = self.schemaFor(alias)
         destination = mktemp()
         cluster = Cluster(self.pig.getPigContext())
         cluster.copyContentFromLocalFile(input_data, destination, True)
-        self.override(alias, "%s = LOAD '%s' AS %s;" % (alias, destination, sb.toString()))
+        self.override(alias, "%s = LOAD '%s' AS %s;" % (alias, destination, schema))
 

File tests/test_pigproxy.py

         proxy = PigProxy.from_file(self.PIG_SCRIPT, args)
         self.assertEqual("queries_limit", proxy.last_stored_alias_name())
 
+    def testSchemaFor(self):
+        args = [
+            "n=3",
+            "reducers=1",
+            "input=" + self.INPUT_FILE,
+            "output=top_3_queries",
+            ]
+        proxy = PigProxy.from_file(self.PIG_SCRIPT, args)
+        schema = proxy.schemaFor('queries_sum')
+        self.assertEqual(schema, '(query: chararray,count: long)')
+
+    def testSchemaFor_ThroughJythonUDF(self):
+        script = '\n'.join([
+            "Register 'tests/udfs.py' using jython as udfs;",
+            "data = LOAD '%s' AS (query:CHARARRAY, count:INT);" % self.INPUT_FILE,
+            "queries = FOREACH data GENERATE query, udfs.concat(query,query) AS doublequery;",
+            "STORE queries INTO 'top_3_queries';",
+            ])
+        proxy = PigProxy(script);
+        schema = proxy.schemaFor('queries')
+        self.assertEqual(schema, '(query: chararray,doublequery: chararray)')
+
 if __name__ == '__main__':
     unittest.main()

File tests/udfs.py

+"""
+User Defined Functions needed for testing.
+
+Intended to be imported by Pig, do not run me
+directly.
+"""
+
+
+@outputSchema('newstring:chararray')
+def concat(string1, string2):
+    return string1 + string2