ganglia-logtailer / src / ganglia-logtailer

Full commit
#!/usr/bin/python -tt
# -*- coding: utf-8 -*-

###  logtailer
###  tails a log,  reports stuff to ganglia using gmetric
###  arguments:
###    function to accumulate the log lines
###    log file to tail
###  copyright Linden Research, Inc. 2008
###  Released under the GPL v2 or later.
###  For a full description of the license, please visit
###  $Id$

# System Libraries
import os
import sys
import threading
import time
import optparse
import stat
# Logging module
import logging.handlers
import fcntl
from math import floor

# Local dependencies
from tailnostate import LogTail
from ganglia_logtailer_helper import LogtailerParsingException, LogtailerStateException, LockingError

## globals
gmetric = '/usr/bin/gmetric'
#gmetric = '/bin/echo'
logtail = '/usr/sbin/logtail'
logtail_statedir = '/var/lib/ganglia-logtailer/'
script_start_time = time.time()

## set up logging infrastructure for use throughout the script
logDir = '/var/log/ganglia'
if(not os.path.isdir(logDir)):
logger = logging.getLogger('ganglia_logtailer')
# open the log file for append, rotate at 1GB, keep 10 of them
#hdlr = logger.RotatingFileHandler('%s/ganglia_logtailer.log' % logDir, 'a', 1000000000, 10)
hdlr = logging.handlers.RotatingFileHandler('%s/ganglia_logtailer.log' % logDir, 'a', 10 * 1024 * 1024, 10)
formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s')

## This provides a lineno() function to make it easy to grab the line
## number that we're on (for logging)
## Danny Yoo (
## taken from
import inspect
def lineno():
    """Returns the current line number in our program."""
    return inspect.currentframe().f_back.f_lineno

def submit_stats( parser, metric_prefix, gmetric_options, duration=None  ):
    if( duration != None ):
        # this only happens in cron mode
        metrics = parser.get_state()
        for m in metrics:

            if ( metric_prefix != "" ):
       = metric_prefix + "_" +

            logger.debug( "Submitting gmetric: %s %s --name %s --value %s --type %s --units %s --tmax %s --dmax %s" %
                 (gmetric, gmetric_options,, m.value, m.type, m.units, m.tmax, m.dmax) )
            os.system("%s %s --name %s --value %s --type %s --units %s --tmax %s --dmax %s" %
                (gmetric, gmetric_options,, m.value, m.type, m.units, m.tmax, m.dmax) )
    except LogtailerStateException, e:
        logger.warning( "State exception caught (line %s): %s" % (lineno(), e) )

# function gmetric_manager
# takes a parser object - class instance
class GMetricManager(object):
    '''This process should be used to start the thread that calls
    gmetric every so often.  It should get the period and data from the
    parser object'''
    def __init__(self, metric_prefix, gmetric_options):
        self.__metric_prefix = metric_prefix
        self.__gmetric_options = gmetric_options
    def __call__(self, parser):
        period = parser.period

        # wait one period the first time (so we have something to report)

        while True:
            logger.debug("manager: starting")
            start = time.time()
            # submit the stats
            finish = time.time()
            runtime = finish - start
            sleep_time = period - runtime
            while( sleep_time <= 0 ):
       "manager: calculation time is longer than period.  doubling period to %s." % (period * 2) )
                sleep_time += period
                period *= 2
                # tell the logtailer class that we're slowing period
                parser.period = period
            logger.debug( "manager: sleeping for %s" % sleep_time)

# function start_locking
def start_locking(lockfile_name):
    """ Acquire a lock via a provided lockfile filename. """

    # This lock can be improved by insterting the current PID into the lockfile
    # created.  This lets us check for stale lockfiles (if the PID mentioned in
    # the lockfile isn't running).

    f = open(lockfile_name, 'w')

        fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
    except IOError:
        raise LockingError("Cannot acquire ganglia-logtailer lock (%s)" % lockfile_name)

    logger.debug("Locking sucessful")

    return f

# function end_locking
def end_locking(lockfile_fd, lockfile_name):
    """ Release a lock via a provided file descriptor. """

        fcntl.flock(lockfile_fd, fcntl.LOCK_UN | fcntl.LOCK_NB)
    except IOError, e:
        raise LockingError("Cannot release ganglia-logtailer lock (%s)" % lockfile_name)

    except OSError, e:
        raise LockingError("Cannot unlink %s" % lockfile_name)

    logger.debug("Unlocking sucessful")


def main():

    cmdline = optparse.OptionParser()
    cmdline.add_option('--classname', '-c', action='store', help='The name of the plugin to use to parse the log file')
    cmdline.add_option('--log_file', '-l', action='store', help='The path to the file to tail and parse')
    cmdline.add_option('--metric_prefix', '-p', action='store', help='Add prefix to all published metrics. This is for people that may multiple instances of same service on same host. So if your metric is e.g. gc_time it becomes tomcat1_gc_time', default='' )
    cmdline.add_option('--gmetric_options', '-g', action='store', help='Options to pass to gmetric such as -c /etc/ganglia/gmond.conf (default). These are passed directly to gmetric',
                       default='-c /etc/ganglia/gmond.conf' )
    cmdline.add_option('--mode', '-m', action='store', type='choice',
                       choices=('daemon', 'cron'), default='cron',
                       help='MODE must be "cron" or "daemon".  Cron mode (default) is designed to be called every X minutes.  Daemon mode is a persistent process.')
    cmdline.add_option('--state_dir', '-s', action='store', default=logtail_statedir,
                       help='The state dir is used in cron mode, and is where to store the logtail state file.  Default location %s' % logtail_statedir)

    options, arguments = cmdline.parse_args()
#    print ('classname = %s, log_file = %s, mode = %s, state_file = %s' % (options.classname, options.log_file, options.mode, options.state_dir) )

    class_name = options.classname
    log_file = options.log_file
    mode = options.mode
    metric_prefix = options.metric_prefix
    gmetric_options = options.gmetric_options
    state_dir = options.state_dir
    dirsafe_logfile = log_file.replace('/','-')
    logtail_state_file = '%s/logtail-%s%s.state' % (state_dir, class_name, dirsafe_logfile)
    logtail_lock_file = '%s/logtail-%s%s.lock' % (state_dir, class_name, dirsafe_logfile)

    # only used in cron mode
    shell_tail = '%s -f %s -o %s' % (logtail, log_file, logtail_state_file)

    logger.debug( "ganglia-logtailer started with class %s, log file %s, mode %s" % (class_name, log_file, mode))

    # import and instantiate the class from the module passed in.  Files and Class names must be the same.
        module = __import__(class_name)
        parser = getattr(module, class_name)()
    except Exception, e:
        print "Failed to instantiate parser (line %s): %s" % (lineno(), e)

    # check for lock file so we don't run multiple copies of the same parser simultaneuosly
    # this will happen if the log parsing takes more time than the cron period
    # which is likely on first run when the logfile is huge
        lockfile = start_locking(logtail_lock_file)
    except LockingError, e:
        print "Failed to get lock.  Is another instance of ganglia-logtailer running?  Exiting."
    # we now have a lock that we must clear anywhere we exit.

    # get input to parse
    if ( mode == 'daemon' ):
        # open the log file for tailing
            input = LogTail(log_file)
        except Exception, e:
            print "Failed to instantiate LogTail instance (line %s): %s" % (lineno(), e)
            end_locking(lockfile, logtail_lock_file)
    elif ( mode == 'cron' ):
            # find out how long it's been since we last ran.
                state_file_age = os.stat(logtail_state_file)[stat.ST_MTIME]
            except OSError, e:
                # this is our first run or our state file got nuked.
                # write out a new state file and exit
      'First run or state file got nuked.  Wrote new state file. Exiting.')
                input = os.popen(shell_tail)
                retval = input.close()
                if( retval != 256 ):
                    logger.warning('%s returned bad exit code %s' %
                                   (shell_tail, retval))
                end_locking(lockfile, logtail_lock_file)
            input = os.popen(shell_tail)
        except SystemExit, e:
        except Exception, e:
            # note - there is no exception when shell-tailer doesn't exist.
            # I don't know when this exception will ever actually be triggered.
            print ("Failed to run %s to get log data (line %s): %s" %
                   (shell_tail, lineno(), e))
            end_locking(lockfile, logtail_lock_file)
        raise Exception, "mode (%s) misunderstood" % mode

    # if we're a daemon, launch the other thread (cron mode runs after the parsing)
    if ( mode == 'daemon' ):
        #launch gmetric caller thread
        submitter = threading.Thread(target=GMetricManager(metric_prefix, gmetric_options), args=[parser])
        # the process should die when the main thread dies
        submitter.setDaemon( True )

    # parse each line in turn
        for line in input:
            # this will never end in daemon mode, but will in cron mode
                # if in daemon mode, die if our submitter thread has failed
                if( mode == 'daemon' and not submitter.isAlive() ):
                    raise Exception, "submitter thread died"

#       "parsing line")
                parser.parse_line(line)  # crunch each line in turn

            except LogtailerParsingException, e:
                # this should only catch recoverable exceptions (of which there aren't any at the moment)
                logger.warning( "Parsing exception caught at %s: %s" % (lineno(), e))
    except Exception, e:
        print "Exception caught at %s: %s" % (lineno(), e)
        end_locking(lockfile, logtail_lock_file)

    # if we're called from cron, crunch the stats
    if ( mode == 'cron' ):
        # calculate now() - state file age to determine check duration
        now = time.time()
        duration = now - state_file_age
        if ( duration <= 45 ):
            # something's borked.  cron's minimum is 60s
            logger.warning('duration (%s) less than 45s, despite being called from cron.  Shouldn\'t happen. (line: %s)' % (duration, lineno()))
        #print 'metric measure with duration: %s' % duration
        submit_stats(parser, metric_prefix, gmetric_options, duration=duration)
        # Reset mtime/atime on state file so duration isn't thrown off by long execution times.
        os.utime(logtail_state_file, (floor(script_start_time), floor(script_start_time)))
        end_locking(lockfile, logtail_lock_file)

    # try and remove the lockfile one last time, but it's a valid state that it's already been removed.
        end_locking(lockfile, logtail_lock_file)
    except Exception, e:

if __name__ == '__main__':