Commits

Peter Nixon  committed b77e81b

Use the multiprocessing module to add support for multiple log files via a worker process

  • Participants
  • Parent commits d117c86

Comments (0)

Files changed (3)

 #!/usr/bin/env python
+import multiprocessing
+import sys
 
-import sys
-try:
-    from tailer import Tailer
-except:
-    from bmutils.tailer import Tailer
-
-from bmutils import UnisonWebLogParser
+from bmutils import logWorker
 
 __author__ = "Peter Nixon"
 __copyright__ = "Copyright (C) 2012 Peter Nixon"
 __license__ = "Public Domain"
-__version__ = "1.0"
+__version__ = "1.2"
 
 DEBUG = False
         
-def _main(filepath, options):
-    tailer = Tailer(open(filepath, 'rb'))
-    log = UnisonWebLogParser()
+def _main(files, options):
+    logQueue = multiprocessing.Queue()
+
+    for filepath in files:
+        # Start a logWorker process for each file
+        p = multiprocessing.Process(target=logWorker, args=(filepath, logQueue, options.code))
+        p.daemon = True
+        p.start()
+
     try:
-        try:
-            tailer.seek_end()   # Jump to the bottom of the file
-            for line in tailer.follow(delay=options.sleep):
-                if (log.run_regex(line, process_hostnames=True)):
-                    if options.code:
-                        if (log.status == options.code):
-                            print log.timestamp, log.msisdn, log.client, log.server, log.method, log.status, log.url
-                    else:
-                        print log.timestamp, log.msisdn, log.client, log.server, log.method, log.status, log.url
-
-        except KeyboardInterrupt:
-            # Escape silently
-            pass
+        while (logQueue):
+            # Loop through the log queue and print the relevant fields
+            log = logQueue.get()
+            print log.timestamp, log.msisdn, log.client, log.server, log.method, log.status, log.url
+            
+    except KeyboardInterrupt:
+        pass    # Escape silently
     finally:
-        tailer.close()
+        # Cleanup on the way out
+        for filepath in files:
+            p.terminate()
+            p.join()
         
 def main():
     from optparse import OptionParser
 
     (options, args) = parser.parse_args()
 
-    if not len(args) == 1:
+    if not len(args) > 0:
         parser.print_help()
         sys.exit(1)
     else:
-        _main(args[0], options)
+        _main(args, options)
         
 if __name__ == '__main__':
-	main()
+    if DEBUG: multiprocessing.log_to_stderr(logging.DEBUG)
+    main()

File bmutils/__init__.py

-from bmutils import UnisonWebLogCounter, UnisonWebLogParser
+from bmutils import UnisonWebLogCounter, UnisonWebLogParser, logWorker
 
 __author__ = "Peter Nixon"
 __copyright__ = "Copyright (C) 2012 Peter Nixon"
 __license__ = "Public Domain"
-__version__ = "1.1"
+__version__ = "1.2"

File bmutils/bmutils.py

 __author__ = "Peter Nixon"
 __copyright__ = "Copyright (C) 2012 Peter Nixon"
 __license__ = "Public Domain"
-__version__ = "1.1"
+__version__ = "1.2"
 
+import logging
+import multiprocessing
 import os
 import re
-import time
-import logging
+import sys
+#import time
+import tldextract
+from tailer import Tailer
 from urlparse import urlparse
-import tldextract
-
 try:
     from collections import defaultdict, Counter
 except:
         r'\s*(?P<method>.*?) '          # This field specifies the HTTP method used in the request coming from the client.
                                         # The field is of variable length and delimited with the next element by a space.
         
-        r'\s*(?P<url>.*?)'            # The URL in the HTTP request received from the client. For pre-fetch requests generated by the Web Proxy in the context of MOP optimization, this is the pre-fetch request URL.
+        r'\s*(?P<url>.*)'               # The URL in the HTTP request received from the client. For pre-fetch requests generated by the Web Proxy in the context of MOP optimization, this is the pre-fetch request URL.
                                         # The URL by default is logged up to 512 bytes in length.
                                         # The query string (starting with delimited '?') can also be left out if so configured.
                                         # This field is of variable length and is terminated by a new line character.
         self.bytes = 0
         self.client = ''
         self.device = ''
+        self.uagroup = ''
         self.domainname = ''
         self.httphost = ''
         self.method = ''
                 print "MATCH:", m.groups()
                 pass
         self.timestamp = m.group('timestamp')
-        self.utime = time.mktime(time.strptime(self.timestamp))
+        #self.utime = time.mktime(time.strptime(self.timestamp))
         self.client = m.group('ClientIP')
         self.server = m.group('ServerIP')
         self.ident = m.group('SubscriberID')
         self.url = m.group('url')
         self.agent = m.group('UserAgent')
         self.device = m.group('DeviceID')
+        self.uagroup = m.group('UAGroup')
         self.status = int(m.group('HTTPStatusCode'))
         self.bytes = int(m.group('bytes'))
         self.msisdn = m.group('MSISDN')
         
         return True
 
+    def output_to_combined(self):
+        return false
+        
+    
 class UnisonWebLogCounter:
     ''' ByteMobile Unison Web Access Logfile Parser '''
 
             #for (key2, num2) in self.status_dict[key].most_common(1):
             #    print key2
             n += 1
+
+def logWorker(filepath, logQueue, code=None, sleep=1):
+    psname = multiprocessing.current_process().name
+    if DEBUG: print psname, "is monitoring log file:", filepath
+    tailer = Tailer(open(filepath, 'rb'))
+    log = UnisonWebLogParser()
+    try:
+        tailer.seek_end()   # Jump to the bottom of the file
+        for line in tailer.follow(delay=sleep):            
+            if (log.run_regex(line, process_hostnames=True)):
+                if code:
+                    if (log.status != code):
+                        continue    # We are not tracking this status code so continue with the next line
+                logQueue.put(log)  # Queue the log to the parent process
+    except KeyboardInterrupt:
+        pass    # Escape silently
+    finally:
+        # Cleanup on the way out
+        tailer.close()