Snippets

Eugene Ciurana New job payload validation tools

Created by Eugene Ciurana

File newJob().py Added

  • Ignore whitespace
  • Hide word diff
+def _validateProcessorsAndInputsIn(message):
+    validProcessorIRIs = [ processor[u'processorIRI'] for processor in message[u'processors'] ]
+
+    try:
+        for inputs in (processor[u'inputs'] for processor in message[u'processors']):
+            for inputEnrichment in inputs:
+                if inputEnrichment not in validProcessorIRIs:
+                    return False
+    except Exception as e:
+        log.warning(u'invalid or missing processor inputs list of IRIs; e = %s' % e)
+        return False
+
+    return True
+
+
+def validateInput(message):
+    """
+    Checks that message has the minimum initialized attributes to
+    create a new job, and that none of the automatic definitions are
+    pre-initialized (e.g. jobID).
+    """
+    validated = False
+
+    if message:
+        log.debug(u'validating input message = %s' % message)
+        try:
+            if message[u'encoding'] == u'UTF-8':
+                if message.get(u'completionDate', None):
+                    return False
+                if message[u'payload'].get(u'aggregatedResults', None):
+                    return False
+                if len(message['processors']):
+                    if message['payloadType']:
+                        validated = _validateProcessorsAndInputsIn(message)
+                    else:
+                        log.warning(u'message.payloadType missing or invalid')
+                else:
+                    log.warning(u'message.processors empty or null')
+        except Exception as e:
+            log.warning(u'message format error; e = %s' % e)
+
+    log.debug(u'%svalid input message' % (u'in' if not validated else u''))
+
+    return validated
HTTPS SSH

You can clone a snippet to your computer for local editing. Learn more.