Commits

James Taylor  committed 6737901 Draft

tracing: first pass trace logging to fluentd

  • Participants
  • Parent commits 0445cd8

Comments (0)

Files changed (9)

 simplejson = 2.1.1
 threadframe = 0.2
 guppy = 0.1.8
+msgpack_python = 0.2.2
 
 [eggs:noplatform]
 amqplib = 0.6.1
 Babel = 0.9.4
 wchartype = 0.1
 Whoosh = 0.3.18
+fluent_logger = 0.3.3
 
 ; extra version information
 [tags]

File lib/galaxy/app.py

         self.config = config.Configuration( **kwargs )
         self.config.check()
         config.configure_logging( self.config )
+        self.configure_fluent_log()
         # Determine the database url
         if self.config.database_connection:
             db_url = self.config.database_connection
                                    db_url,
                                    self.config.database_engine_options,
                                    database_query_profiling_proxy = self.config.database_query_profiling_proxy,
-                                   object_store = self.object_store )
+                                   object_store = self.object_store,
+                                   trace_logger=self.trace_logger )
         # Manage installed tool shed repositories.
         self.installed_repository_manager = galaxy.tool_shed.InstalledRepositoryManager( self )
         # Create an empty datatypes registry.
         self.job_stop_queue = self.job_manager.job_stop_queue
         # Initialize the external service types
         self.external_service_types = external_service_types.ExternalServiceTypesCollection( self.config.external_service_type_config_file, self.config.external_service_type_path, self )
+
     def shutdown( self ):
         self.job_manager.shutdown()
         self.object_store.shutdown()
                 os.unlink( self.datatypes_registry.integrated_datatypes_configs )
         except:
             pass
+
+    def configure_fluent_log( self ):
+        if self.config.fluent_log:
+            from galaxy.util.log.fluent_log import FluentTraceLogger
+            self.trace_logger = FluentTraceLogger( 'galaxy', self.config.fluent_host, self.config.fluent_port ) 
+        else:
+            self.trace_logger = None

File lib/galaxy/config.py

         for k, v in amqp_config:
             self.amqp[k] = v
         self.running_functional_tests = string_as_bool( kwargs.get( 'running_functional_tests', False ) )
+        # Logging with fluentd
+        self.fluent_log = string_as_bool( kwargs.get( 'fluent_log', False ) )
+        self.fluent_host = kwargs.get( 'fluent_host', 'localhost' )
+        self.fluent_port = int( kwargs.get( 'fluent_port', 24224 ) )
+
     def __read_tool_job_config( self, global_conf_parser, section, key ):
         try:
             tool_runners_config = global_conf_parser.items( section )

File lib/galaxy/model/mapping.py

         # Let this go, it could possibly work with db's we don't support
         log.error( "database_connection contains an unknown SQLAlchemy database dialect: %s" % dialect )
 
-def init( file_path, url, engine_options={}, create_tables=False, database_query_profiling_proxy=False, object_store=None ):
+def init( file_path, url, engine_options={}, create_tables=False, database_query_profiling_proxy=False, object_store=None, trace_logger=None ):
     """Connect mappings to the database"""
     # Connect dataset to the file path
     Dataset.file_path = file_path
     Dataset.object_store = object_store
     # Load the appropriate db module
     load_egg_for_url( url )
-    # Should we use the logging proxy?
-    if database_query_profiling_proxy:
+    # If metlog is enabled, do micrologging
+    if trace_logger:
         import galaxy.model.orm.logging_connection_proxy as logging_connection_proxy
-        proxy = logging_connection_proxy.LoggingProxy()
+        proxy = logging_connection_proxy.TraceLoggerProxy( trace_logger )
     else:
         proxy = None
     # Create the database engine

File lib/galaxy/model/orm/logging_connection_proxy.py

     rval = []
     for frame, fname, line, funcname, _, _ in inspect.stack()[2:]:
         rval.append( "%s:%s@%d" % ( stripwd( fname ), funcname, line ) )
-    return " > ".join( rval ) 
+    return rval
 
 class LoggingProxy(ConnectionProxy):
+    """
+    Logs SQL statements using standard logging module
+    """
     def cursor_execute(self, execute, cursor, statement, parameters, context, executemany):
         start = time.clock()
         rval = execute(cursor, statement, parameters, context)
         duration = time.clock() - start
         log.debug( "statement: %r parameters: %r executemany: %r duration: %r stack: %r",
-                   statement, parameters, executemany, duration, pretty_stack() )
+                   statement, parameters, executemany, duration, " > ".join( pretty_stack() ) )
         return rval
+
+class TraceLoggerProxy(ConnectionProxy):
+    """
+    Logs SQL statements using a metlog client
+    """
+    def __init__( self, trace_logger ):
+        self.trace_logger = trace_logger
+    def cursor_execute(self, execute, cursor, statement, parameters, context, executemany):
+        start = time.clock()
+        rval = execute(cursor, statement, parameters, context)
+        duration = time.clock() - start
+        self.trace_logger.log( "sqlalchemy_query", 
+            message="Query executed", statement=statement, parameters=parameters, 
+            executemany=executemany, duration=duration, stack=pretty_stack() ) 
+        return rval

File lib/galaxy/util/log/__init__.py

+class TraceLogger( object ):
+	def __init__( self, name ):
+		self.name = name
+	def log( **kwargs ):
+		raise TypeError( "Abstract Method" )

File lib/galaxy/util/log/fluent_log.py

+import time
+import threading
+
+import galaxy.eggs
+galaxy.eggs.require( "fluent-logger" )
+galaxy.eggs.require( "msgpack_python" )
+
+
+from fluent.sender import FluentSender
+
+
+class FluentTraceLogger( object ):
+	def __init__( self, name, host='localhost', port=24224 ):
+		self.lock = threading.Lock()
+		self.thread_local = threading.local()
+		self.name = name
+		self.sender = FluentSender( self.name, host=host, port=port )
+
+	def context_push( self, value ):
+		self.lock.acquire()
+		if not hasattr( self.thread_local, 'context' ):
+			self.thread_local.context = []
+		self.thread_local.context.append( value )
+		self.lock.release()
+
+	def context_pop( self ):
+		self.lock.acquire()
+		self.thread_local.context.pop()
+		self.lock.release()
+
+	def log( self, label, **kwargs ):
+		self.lock.acquire()
+		if not hasattr( self.thread_local, 'context' ):
+			self.thread_local.context = []
+		self.lock.release()
+		kwargs['log_context'] = self.thread_local.context
+		self.sender.emit_with_time( label, int(time.time()), kwargs )

File lib/galaxy/web/framework/base.py

 import os.path
 import sys
 import tarfile
+import threading
+import uuid
 
 from Cookie import SimpleCookie
 
         self.mapper.explicit = False
         self.api_mapper = routes.Mapper()
         self.transaction_factory = DefaultWebTransaction
+        # Each request will have a unique id. Since we are assuming
+        # a threaded model for the moment we can store that here
+        self.request_id = threading.local()
     def add_ui_controller( self, controller_name, controller ):
         """
         Add a controller class to this application. A controller class has
         # Create/compile the regular expressions for route mapping
         self.mapper.create_regs( self.controllers.keys() )
         self.api_mapper.create_regs( self.api_controllers.keys() )
+    def trace( self, **fields ):
+        if self.trace_logger:
+            self.trace_logger.log( "WebApplication", **fields )
     def __call__( self, environ, start_response ):
         """
         Call interface as specified by WSGI. Wraps the environment in user
         friendly objects, finds the appropriate method to handle the request
         and calls it.
         """
+        # Immediately create request_id which we will use for logging
+        self.request_id = request_id = uuid.uuid1().hex
+        if self.trace_logger:
+            self.trace_logger.context_push( dict( request_id = request_id ) )
+        self.trace( message= "Starting request" )
         # Map url using routes
         path_info = environ.get( 'PATH_INFO', '' )
         map = self.mapper.match( path_info, environ )
             controllers = self.controllers
         if map == None:
             raise httpexceptions.HTTPNotFound( "No route for " + path_info )
+        self.trace( path_info=path_info, map=map )
         # Setup routes
         rc = routes.request_config()
         rc.mapper = mapper

File lib/galaxy/webapps/galaxy/buildapp.py

     webapp.api_mapper.connect("import_workflow", "/api/workflows/upload", controller="workflows", action="import_new_workflow", conditions=dict(method=["POST"]))
     webapp.api_mapper.connect("workflow_dict", '/api/workflows/download/{workflow_id}', controller='workflows', action='workflow_dict', conditions=dict(method=['GET']))
 
+    # Connect logger from app
+    if app.trace_logger:
+        webapp.trace_logger = app.trace_logger
+
+    # Indicate that all configuration settings have been provided
     webapp.finalize_config()
+
     # Wrap the webapp in some useful middleware
     if kwargs.get( 'middleware', True ):
         webapp = wrap_in_middleware( webapp, global_conf, **kwargs )