1. Ben Hartshorne
  2. ganglia-logtailer

Commits

gr...@eniac32.lindenlab.com  committed e646788

initial import

  • Participants
  • Branches default

Comments (0)

Files changed (21)

File Makefile

View file
+SRCDIR=src
+
+SCRIPTS=${SRCDIR}/ganglia-logtailer
+MODULES=${SRCDIR}/ganglia_logtailer_helper.py ${SRCDIR}/tailnostate.py ${SRCDIR}/ApacheLogtailer.py ${SRCDIR}/BindLogtailer.py ${SRCDIR}/DummyLogtailer.py ${SRCDIR}/PostfixLogtailer.py ${SRCDIR}/UnboundLogtailer.py
+
+
+all:
+
+install:
+	install -d ${DESTDIR}/var/lib/ganglia-logtailer
+	install -d ${DESTDIR}/var/log/ganglia-logtailer
+
+	install -d ${DESTDIR}/usr/bin
+	install -m 0755 ${SCRIPTS} ${DESTDIR}/usr/bin
+	
+	install -d ${DESTDIR}/usr/share/ganglia-logtailer
+	install -m 0644 ${MODULES} ${DESTDIR}/usr/share/ganglia-logtailer	
+
+clean:

File debian/README

View file
+##################
+##
+##  ganglia-logtailer
+##
+##  ganglia-logtailer is a python script that will tail any log file,
+##  crunch the data collected, and report summary data to ganglia.
+##
+##  This directory contains the script ganglia-logtailer, its required
+##  helper classes, and two example classes, one generic and one for
+##  parsing Apache logs.
+##
+##  Copyright Linden Lab, 2008
+##  License to modify and redistribute granted under the GPL v2 or later
+##
+##################
+
+
+0. Table of Contents
+1. Overview
+2. Installation
+3. License
+
+1. Overview
+
+Many metrics associated with ganglia and gmetric plugins are rather easy to
+collect; you poll the relevant application for a value and report it.  Examples
+are asking MySQL for the number of questions and calculating queries per
+second, or asking iostat for the percentage disk I/O currently being used.
+However, there are a large number of applications out there that don't support
+being queried for interesting data, but do provide a log file which, when
+properly parsed, yields the interesting data we desire.  An example of the
+latter category is Apache, which does not furnish any interface for measuring
+queries per second, yet has a log file allowing you to count how many queries
+come in over a specific time period.
+
+ganglia-logtailer is designed to make it easy to parse any log file, pull out
+the information you desire, and plug it into ganglia to make pretty graphs.
+
+ganglia-logtailer is comprised of three parts:
+* ganglia-logtailer - the python script doing the real work
+* ganglia_logtailer_helper.py and tailnostate.py - supporting python classes
+* ApacheLogtailer.py, DummyLogtailer.py, etc. - user modifiable, log-file specific classes
+
+You must modify DummyLogtailer.py or write new FooLogtailer.py classes for
+each of the log files you wish to parse.  As every log file has a specific
+format, you must write the regular expression to properly pick out interesting
+information for your specific application.  As each application has different
+ways of expressing what might be interesting metrics to graph, you must write
+the functions to collect the information present in each line of the log file.
+
+DummyLogtailer.py is an example file; it does nothing other than count the
+number of lines present in the log file and report lines per second.  However,
+it does have extensive comments explaining the purpose of each of the functions
+present in the file.  I would recommend reading that file and modifying it to
+do more interesting things.
+
+ApacheLogtailer.py is a fully functional class parsing an Apache log.  At
+Linden Lab, we use a custom log format (and include the very interesting %D -
+time to execute the query in microseconds), so the regular expression included
+there will probably have to be changed for your environment.
+ApacheLogtailer.py defines and returns the number of Apache requests per
+second, also broken out by return code (200, 300, 400, 500), and the average,
+maximum, and 90th percentile run time of all queries caught during the sample
+period.
+
+The rest of the *Logtailer.py classes present are customized for different
+types of logs (postfix, bind, etc.)
+
+ganglia-logtailer can be invoked in two different modes, either as a daemon
+(which tells it to run as a persistent process) or invoked from cron on a
+regular basis.  I recommend using daemon mode for testing, but invoking it from
+cron every 1-5 minutes for deploy.  I make this recommendation because (aside
+from minimizing the number of running daemons), there are no start scripts to
+invoke daemon mode on system boot, and there is no facility to relaunch the
+process if it were to crash or raise an exception.
+
+ganglia-logtailer will log certain bits of information to
+/var/log/ganglia/ganglia_logtailer in case of error.  Log level is variable by
+modfying ganglia-logtailer and editing the following line:
+logger.setLevel(logging.INFO) Look up the 'logging' python module for valid
+values.  (logging.DEBUG is a good bet)
+
+2. Installation
+
+ganglia-logtailer depends on the 'logtail' package
+(http://packages.debian.org/etch/logtail) when run in cron mode.
+
+i.   Copy ganglia-logtailer to /usr/local/bin/ (or wherever you store
+       unpackaged binaries)
+ii.  Copy ganglia_logtailer_helper.py and tailnostate.py to
+       /usr/local/share/ganglia-logtailer (or somewhere in your python search 
+       path)
+iii. Copy ApacheLogtailer.py and DummyLogtailer.py to /usr/local/share/ganglia-logtailer
+       (or somewhere in your python search path)
+
+Create the directory ganglia-logtailer uses to store state:
+/var/lib/ganglia-logtailer/
+
+Test the installation by invoking ganglia-logtailer with the DummyLogtailer
+class on a log file:
+# ganglia-logtailer --classname DummyLogtailer --log_file /var/log/daemon.log --mode daemon
+wait 30s to 1m, then check and see whether your new metric is present in
+ganglia.
+
+If all goes well, try out one of the real modules:
+# ganglia-logtailer --classname PostfixLogtailer --log-file /var/log/mail.log --mode daemon
+
+If that works as well, deploy!  Add the following to
+/etc/cron.d/ganglia-logtailer
+* * * * * root /usr/local/bin/ganglia-logtailer --classname PostfixLogtailer --log-file /var/log/mail.log --mode cron
+
+3. License
+
+These scripts are all released under the GPL v2 or later.  For a full
+description of the licence, please visit http://www.gnu.org/licenses/gpl.txt
+
+
+

File debian/README.Debian

View file
+For the debian package, ganglia_logtailer_helper.pyc and tailnostate.py are 
+stored in /usr/share/ganglia-logtailer, along with all base plugins that ship 
+with ganglia-logtailer.  Plugins you create should be put in 
+/usr/local/share/ganglia-logtailer/.  

File debian/changelog

View file
+ganglia-logtailer (1.0-1) stable; urgency=low
+
+  * Initial release
+
+ -- Lex Linden <lex@lindenlab.com>  Wed, 21 Oct 2009 15:02:51 -0400

File debian/compat

View file
+5

File debian/control

View file
+Source: ganglia-logtailer
+Section: admin
+Priority: optional
+Maintainer: Lex Linden <lex@lindenlab.com>
+Build-Depends: debhelper (>= 5), python-support
+Standards-Version: 3.7.2
+
+Package: ganglia-logtailer
+Architecture: any
+Depends: ${shlibs:Depends}, ${misc:Depends}, python (>= 2.4), logtail, ganglia-monitor
+Description: framework to crunch data from logfiles and send using gmetric
+ Many metrics associated with ganglia and gmetric plugins are rather easy to
+ collect; you poll the relevant application for a value and report it.  Examples
+ are asking MySQL for the number of questions and calculating queries per
+ second, or asking iostat for the percentage disk I/O currently being used.
+ However, there are a large number of applications out there that don't support
+ being queried for interesting data, but do provide a log file which, when
+ properly parsed, yields the interesting data we desire.  An example of the
+ latter category is Apache, which does not furnish any interface for measuring
+ queries per second, yet has a log file allowing you to count how many queries
+ come in over a specific time period.
+ .
+ ganglia-logtailer is designed to make it easy to parse any log file, pull out
+ the information you desire, and plug it into ganglia to make pretty graphs.

File debian/copyright

View file
+This package was debianized by Lex Linden <lex@lindenlab.com> on
+Wed, 21 Oct 2009 15:02:51 -0400.
+
+Copyright:
+
+    Copyright (C) 2008 Linden Lab
+
+License:
+    Released under the GPL v2 or later.
+    For a full description of the license, please examine:
+    /usr/share/common-licenses/GPL-2
+
+
+The Debian packaging is:
+
+    Copyright (C) 2009 Linden Lab
+
+and is licensed under the GPL v2 or later.
+
+

File debian/dirs

View file
+usr/bin
+usr/share/ganglia-logtailer
+var/lib/ganglia-logtailer
+var/log/ganglia-logtailer
+

File debian/docs

View file
+debian/README
+debian/README.Debian
+debian/ganglia-logtailer.cron.ex

File debian/ganglia-logtailer.cron.ex

View file
+# This example cron job gathers metrics using DummyLogtailer every 5 minutes and
+# submits them to gmetric.  DummyLogtailer is an example plugin that just counts
+# the number of lines logged per second.
+
+*/5 * * * * root /usr/bin/ganglia-logtailer --classname DummyLogtailer --log-file /var/log/mail.log --mode cron
+

File debian/postinst

View file
+#!/bin/sh
+# postinst script for ganglia-logtailer
+#
+# see: dh_installdeb(1)
+
+set -e
+
+case "$1" in
+    configure)
+	    # debian policy requires we use set -e above, so the || true ignores
+	    # a failure in case, for example, /usr/local isn't writable
+	    mkdir /usr/local/share/ganglia-logtailer || true
+    ;;
+
+    abort-upgrade|abort-remove|abort-deconfigure)
+    ;;
+
+    *)
+        echo "postinst called with unknown argument \`$1'" >&2
+        exit 1
+    ;;
+esac
+
+# dh_installdeb will replace this with shell code automatically
+# generated by other debhelper scripts.
+
+#DEBHELPER#
+
+exit 0

File debian/prerm

View file
+#!/bin/sh
+# prerm script for ganglia-logtailer
+#
+# see: dh_installdeb(1)
+
+set -e
+
+case "$1" in
+    remove|upgrade|deconfigure)
+	rmdir /usr/local/share/ganglia-logtailer || true
+    ;;
+
+    failed-upgrade)
+    ;;
+
+    *)
+        echo "prerm called with unknown argument \`$1'" >&2
+        exit 1
+    ;;
+esac
+
+# dh_installdeb will replace this with shell code automatically
+# generated by other debhelper scripts.
+
+#DEBHELPER#
+
+exit 0

File debian/rules

View file
+#!/usr/bin/make -f
+# -*- makefile -*-
+# Sample debian/rules that uses debhelper.
+# This file was originally written by Joey Hess and Craig Small.
+# As a special exception, when this file is copied by dh-make into a
+# dh-make output file, you may use that output file without restriction.
+# This special exception was added by Craig Small in version 0.37 of dh-make.
+
+# Uncomment this to turn on verbose mode.
+#export DH_VERBOSE=1
+
+
+
+
+
+configure: configure-stamp
+configure-stamp:
+	dh_testdir
+	# Add here commands to configure the package.
+
+	touch configure-stamp
+
+
+build: build-stamp
+
+build-stamp: configure-stamp  
+	dh_testdir
+
+	# Add here commands to compile the package.
+	$(MAKE)
+	#docbook-to-man debian/ganglia-logtailer.sgml > ganglia-logtailer.1
+
+	touch $@
+
+clean: 
+	dh_testdir
+	dh_testroot
+	rm -f build-stamp configure-stamp
+
+	# Add here commands to clean up after the build process.
+	$(MAKE) clean
+
+	dh_clean 
+
+install: build
+	dh_testdir
+	dh_testroot
+	#dh_prep  
+	dh_installdirs
+
+	# Add here commands to install the package into debian/ganglia-logtailer.
+	$(MAKE) DESTDIR=$(CURDIR)/debian/ganglia-logtailer install
+
+
+# Build architecture-independent files here.
+binary-indep: install
+# We have nothing to do by default.
+
+# Build architecture-dependent files here.
+binary-arch: install
+	dh_testdir
+	dh_testroot
+	dh_installchangelogs 
+	dh_installdocs
+	dh_installexamples
+#	dh_install
+#	dh_installmenu
+#	dh_installdebconf
+#	dh_installlogrotate
+#	dh_installemacsen
+#	dh_installpam
+#	dh_installmime
+	dh_pysupport
+#	dh_installinit
+#	dh_installcron
+#	dh_installinfo
+	dh_installman
+	dh_link
+	dh_strip
+	dh_compress
+	dh_fixperms
+#	dh_perl
+#	dh_makeshlibs
+	dh_installdeb
+	dh_shlibdeps
+	dh_gencontrol
+	dh_md5sums
+	dh_builddeb
+
+binary: binary-indep binary-arch
+.PHONY: build clean binary-indep binary-arch binary install configure

File src/ApacheLogtailer.py

View file
+###
+###  This plugin for logtailer will crunch apache logs and produce these metrics:
+###    * hits per second
+###    * GETs per second
+###    * average query processing time
+###    * ninetieth percentile query processing time
+###    * number of HTTP 200, 300, 400, and 500 responses per second
+###
+###  Note that this plugin depends on a certain apache log format, documented in
+##   __init__.
+
+import time
+import threading
+import re
+
+# local dependencies
+from ganglia_logtailer_helper import GangliaMetricObject
+from ganglia_logtailer_helper import LogtailerParsingException, LogtailerStateException
+
+class ApacheLogtailer(object):
+    # only used in daemon mode
+    period = 30
+    def __init__(self):
+        '''This function should initialize any data structures or variables
+        needed for the internal state of the line parser.'''
+        self.reset_state()
+        self.lock = threading.RLock()
+        # this is what will match the apache lines
+        # apache log format string:
+        # %v %A %a %u %{%Y-%m-%dT%H:%M:%S}t %c %s %>s %B %D %{cookie}n \"%{Referer}i\" \"%r\" \"%{User-Agent}i\" %P
+        # host.com 127.0.0.1 127.0.0.1 - 2008-05-08T07:34:44 - 200 200 371 103918 - "-" "GET /path HTTP/1.0" "-" 23794
+        # match keys: server_name, local_ip, remote_ip, date, conn_status, init_retcode, final_retcode, size,
+        #               req_time, cookie, referrer, request, user_agent, pid
+        self.reg = re.compile('^(?P<server_name>[^ ]+) (?P<local_ip>[^ ]+) (?P<remote_ip>[^ ]+) (?P<user>[^ ]+) (?P<date>[^ ]+) (?P<conn_status>[^ ]+) (?P<init_retcode>[^ ]+) (?P<final_retcode>[^ ]+) (?P<size>[^ ]+) (?P<req_time>[^ ]+) (?P<cookie>[^ ]+) "(?P<referrer>[^"]+)" "(?P<request>[^"]+)" "(?P<user_agent>[^"]+)" (?P<pid>[^ ]+)')
+
+        # assume we're in daemon mode unless set_check_duration gets called
+        self.dur_override = False
+
+
+    # example function for parse line
+    # takes one argument (text) line to be parsed
+    # returns nothing
+    def parse_line(self, line):
+        '''This function should digest the contents of one line at a time,
+        updating the internal state variables.'''
+        self.lock.acquire()
+        self.num_hits+=1
+        try:
+            regMatch = self.reg.match(line)
+            if regMatch:
+                linebits = regMatch.groupdict()
+                # capture GETs
+                if( 'GET' in linebits['request'] ):
+                    self.num_gets+=1
+                # capture HTTP response code
+                rescode = float(linebits['init_retcode'])
+                if( (rescode >= 200) and (rescode < 300) ):
+                    self.num_two+=1
+                elif( (rescode >= 300) and (rescode < 400) ):
+                    self.num_three+=1
+                elif( (rescode >= 400) and (rescode < 500) ):
+                    self.num_four+=1
+                elif( (rescode >= 500) and (rescode < 600) ):
+                    self.num_five+=1
+                # capture request duration
+                dur = float(linebits['req_time'])
+                # convert to seconds
+                dur = dur / 1000000
+                self.req_time += dur
+                # store for 90th % calculation
+                self.ninetieth.append(dur)
+            else:
+                raise LogtailerParsingException, "regmatch failed to match"
+        except Exception, e:
+            self.lock.release()
+            raise LogtailerParsingException, "regmatch or contents failed with %s" % e
+        self.lock.release()
+    # example function for deep copy
+    # takes no arguments
+    # returns one object
+    def deep_copy(self):
+        '''This function should return a copy of the data structure used to
+        maintain state.  This copy should different from the object that is
+        currently being modified so that the other thread can deal with it
+        without fear of it changing out from under it.  The format of this
+        object is internal to the plugin.'''
+        myret = dict( num_hits=self.num_hits,
+                    num_gets=self.num_gets,
+                    req_time=self.req_time,
+                    num_two=self.num_two,
+                    num_three=self.num_three,
+                    num_four=self.num_four,
+                    num_five=self.num_five,
+                    ninetieth=self.ninetieth
+                    )
+        return myret
+    # example function for reset_state
+    # takes no arguments
+    # returns nothing
+    def reset_state(self):
+        '''This function resets the internal data structure to 0 (saving
+        whatever state it needs).  This function should be called
+        immediately after deep copy with a lock in place so the internal
+        data structures can't be modified in between the two calls.  If the
+        time between calls to get_state is necessary to calculate metrics,
+        reset_state should store now() each time it's called, and get_state
+        will use the time since that now() to do its calculations'''
+        self.num_hits = 0
+        self.num_gets = 0
+        self.req_time = 0
+        self.num_two = 0
+        self.num_three = 0
+        self.num_four = 0
+        self.num_five = 0
+        self.ninetieth = list()
+        self.last_reset_time = time.time()
+    # example for keeping track of runtimes
+    # takes no arguments
+    # returns float number of seconds for this run
+    def set_check_duration(self, dur):
+        '''This function only used if logtailer is in cron mode.  If it is
+        invoked, get_check_duration should use this value instead of calculating
+        it.'''
+        self.duration = dur 
+        self.dur_override = True
+    def get_check_duration(self):
+        '''This function should return the time since the last check.  If called
+        from cron mode, this must be set using set_check_duration().  If in
+        daemon mode, it should be calculated internally.'''
+        if( self.dur_override ):
+            duration = self.duration
+        else:
+            cur_time = time.time()
+            duration = cur_time - self.last_reset_time
+            # the duration should be within 10% of period
+            acceptable_duration_min = self.period - (self.period / 10.0)
+            acceptable_duration_max = self.period + (self.period / 10.0)
+            if (duration < acceptable_duration_min or duration > acceptable_duration_max):
+                raise LogtailerStateException, "time calculation problem - duration (%s) > 10%% away from period (%s)" % (duration, self.period)
+        return duration
+    # example function for get_state
+    # takes no arguments
+    # returns a dictionary of (metric => metric_object) pairs
+    def get_state(self):
+        '''This function should acquire a lock, call deep copy, get the
+        current time if necessary, call reset_state, then do its
+        calculations.  It should return a list of metric objects.'''
+        # get the data to work with
+        self.lock.acquire()
+        try:
+            mydata = self.deep_copy()
+            check_time = self.get_check_duration()
+            self.reset_state()
+            self.lock.release()
+        except LogtailerStateException, e:
+            # if something went wrong with deep_copy or the duration, reset and continue
+            self.reset_state()
+            self.lock.release()
+            raise e
+
+        # crunch data to how you want to report it
+        hits_per_second = mydata['num_hits'] / check_time
+        gets_per_second = mydata['num_gets'] / check_time
+        avg_req_time = mydata['req_time'] / mydata['num_hits']
+        two_per_second = mydata['num_two'] / check_time
+        three_per_second = mydata['num_three'] / check_time
+        four_per_second = mydata['num_four'] / check_time
+        five_per_second = mydata['num_five'] / check_time
+
+        # calculate 90th % request time
+        ninetieth_list = mydata['ninetieth']
+        ninetieth_list.sort()
+        num_entries = len(ninetieth_list)
+        ninetieth_element = ninetieth_list[int(num_entries * 0.9)]
+
+        # package up the data you want to submit
+        hps_metric = GangliaMetricObject('apache_hits', hits_per_second, units='hps')
+        gps_metric = GangliaMetricObject('apache_gets', gets_per_second, units='hps')
+        avgdur_metric = GangliaMetricObject('apache_avg_dur', avg_req_time, units='sec')
+        ninetieth_metric = GangliaMetricObject('apache_90th_dur', ninetieth_element, units='sec')
+        twops_metric = GangliaMetricObject('apache_200', two_per_second, units='hps')
+        threeps_metric = GangliaMetricObject('apache_300', three_per_second, units='hps')
+        fourps_metric = GangliaMetricObject('apache_400', four_per_second, units='hps')
+        fiveps_metric = GangliaMetricObject('apache_500', five_per_second, units='hps')
+
+        # return a list of metric objects
+        return [ hps_metric, gps_metric, avgdur_metric, ninetieth_metric, twops_metric, threeps_metric, fourps_metric, fiveps_metric, ]
+
+
+

File src/BindLogtailer.py

View file
+###
+###  This plugin for logtailer crunches bind's log and produces these metrics:
+###    * queries per second
+###    * number of unique clients seen in the sampling period, normalized over
+###      the sampling time
+###    * number of requests by the client that made the most requests
+###
+
+import time
+import threading
+import re
+
+# local dependencies
+from ganglia_logtailer_helper import GangliaMetricObject
+from ganglia_logtailer_helper import LogtailerParsingException, LogtailerStateException
+
+class BindLogtailer(object):
+    # only used in daemon mode
+    period = 30.0
+    def __init__(self):
+        '''This function should initialize any data structures or variables
+        needed for the internal state of the line parser.'''
+        self.reset_state()
+        self.lock = threading.RLock()
+        # this is what will match the backbone lines
+        # backbone log example:
+        # Sep 11 09:03:05 ns0-sfo.lindenlab.com named[577]: client 80.189.94.233#49199: query: secondlife.com IN A
+        # match keys: client_ip
+        self.reg = re.compile('^.*named.*client (?P<client_ip>[0-9\.]+).*query')
+
+        # assume we're in daemon mode unless set_check_duration gets called
+        self.dur_override = False
+
+
+    # example function for parse line
+    # takes one argument (text) line to be parsed
+    # returns nothing
+    def parse_line(self, line):
+        '''This function should digest the contents of one line at a time,
+        updating the internal state variables.'''
+        self.lock.acquire()
+        try:
+            regMatch = self.reg.match(line)
+            if regMatch:
+                linebits = regMatch.groupdict()
+                self.num_hits+=1
+                self.client_ip_list.append(linebits['client_ip'])
+            else:
+                # this occurs for every non-named query line.  Ignore them.
+                #raise LogtailerParsingException, "regmatch failed to match line (%s)" % line
+                pass
+        except Exception, e:
+            self.lock.release()
+            raise LogtailerParsingException, "regmatch or contents failed with %s" % e
+        self.lock.release()
+    # example function for deep copy
+    # takes no arguments
+    # returns one object
+    def deep_copy(self):
+        '''This function should return a copy of the data structure used to
+        maintain state.  This copy should different from the object that is
+        currently being modified so that the other thread can deal with it
+        without fear of it changing out from under it.  The format of this
+        object is internal to the plugin.'''
+        myret = dict( num_hits=self.num_hits,
+                      client_ip_list=self.client_ip_list,
+                    )
+        return myret
+    # example function for reset_state
+    # takes no arguments
+    # returns nothing
+    def reset_state(self):
+        '''This function resets the internal data structure to 0 (saving
+        whatever state it needs).  This function should be called
+        immediately after deep copy with a lock in place so the internal
+        data structures can't be modified in between the two calls.  If the
+        time between calls to get_state is necessary to calculate metrics,
+        reset_state should store now() each time it's called, and get_state
+        will use the time since that now() to do its calculations'''
+        self.num_hits = 0
+        self.last_reset_time = time.time()
+        self.client_ip_list = list()
+    # example for keeping track of runtimes
+    # takes no arguments
+    # returns float number of seconds for this run
+    def set_check_duration(self, dur):
+        '''This function only used if logtailer is in cron mode.  If it is
+        invoked, get_check_duration should use this value instead of calculating
+        it.'''
+        self.duration = dur 
+        self.dur_override = True
+    def get_check_duration(self):
+        '''This function should return the time since the last check.  If called
+        from cron mode, this must be set using set_check_duration().  If in
+        daemon mode, it should be calculated internally.'''
+        if( self.dur_override ):
+            duration = self.duration
+        else:
+            cur_time = time.time()
+            duration = cur_time - self.last_reset_time
+            # the duration should be within 10% of period
+            acceptable_duration_min = self.period - (self.period / 10.0)
+            acceptable_duration_max = self.period + (self.period / 10.0)
+            if (duration < acceptable_duration_min or duration > acceptable_duration_max):
+                raise LogtailerStateException, "time calculation problem - duration (%s) > 10%% away from period (%s)" % (duration, self.period)
+        return duration
+    # example function for get_state
+    # takes no arguments
+    # returns a dictionary of (metric => metric_object) pairs
+    def get_state(self):
+        '''This function should acquire a lock, call deep copy, get the
+        current time if necessary, call reset_state, then do its
+        calculations.  It should return a list of metric objects.'''
+        # get the data to work with
+        self.lock.acquire()
+        try:
+            mydata = self.deep_copy()
+            check_time = self.get_check_duration()
+            self.reset_state()
+            self.lock.release()
+        except LogtailerStateException, e:
+            # if something went wrong with deep_copy or the duration, reset and continue
+            self.reset_state()
+            self.lock.release()
+            raise e
+
+        # crunch data to how you want to report it
+        queries_per_second = mydata['num_hits'] / check_time
+
+        # calculate number of querying IPs and maximum number of queries per IP
+        clist = mydata['client_ip_list']
+
+        cdict = dict()
+        for elem in clist:
+            cdict[elem] = cdict.get(elem,0) + 1
+
+        # number of unique clients connecting, normalized to per minute
+        num_client_ips = len(cdict) / check_time
+        # number of requests issued by the client making the most
+        max_client_ip_count = max(cdict.values()) / check_time
+
+
+        # package up the data you want to submit
+        qps_metric = GangliaMetricObject('bind_queries', queries_per_second, units='qps')
+        clients_metric = GangliaMetricObject('bind_num_clients', num_client_ips, units='cps')
+        max_reqs_metric = GangliaMetricObject('bind_largest_volume_client', max_client_ip_count, units='qps')
+
+        # return a list of metric objects
+        return [ qps_metric, clients_metric, max_reqs_metric, ]
+
+
+

File src/DummyLogtailer.py

View file
+###
+###   a 'metric object' is an instance of GangliaMetricObject
+###       { 'name' => 'name-of-metric',
+###         'value' => numerical-or-string-value,
+###         'type' => 'int32',    <--- see gmetric man page for valid types
+###         'units' => 'qps',     <--- label on the graph
+###         }
+###   This object should appear remarkably similar to the required arguments to gmetric.
+###
+###
+###   The logtailer class must define
+###     a class variable 'period'
+###     an instance method set_check_duration that sets the time since last invocation (used in cron mode)
+###     an instance method get_state() that returns a list of metric objects
+###     an instance method parse_line(line) that takes one line of the log file and does whatever internal accounting is necessary to record its metrics
+###   The logtailer class must be thread safe - a separate thread will be calling get_state() and parse_line(line)
+###   parse_line(line) may raise a LogtailerParsingException to log an error and discard the current line but keep going.  Any other exception will kill the process.
+###
+
+import time
+import threading
+
+# local dependencies
+from ganglia_logtailer_helper import GangliaMetricObject
+from ganglia_logtailer_helper import LogtailerParsingException, LogtailerStateException
+
+class DummyLogtailer(object):
+    # period must be defined and indicates how often the gmetric thread should call get_state() (in seconds) (in daemon mode only)
+    # note that if period is shorter than it takes to run get_state() (if there's lots of complex calculation), the calling thread will automatically double period.
+    # period ought to be >=5.  It should probably be >=60 (to avoid excessive load).  120 to 300 is a good range (2-5 minutes).  Take into account the need for time resolution, as well as the number of hosts reporting (6000 hosts * 15s == lots of data).
+    period = 5
+    def __init__(self):
+        '''This function should initialize any data structures or variables
+        needed for the internal state of the line parser.'''
+        self.dur_override = False
+        self.reset_state()
+        self.lock = threading.RLock()
+
+    # example function for parse line
+    # takes one argument (text) line to be parsed
+    # returns nothing
+    def parse_line(self, line):
+        '''This function should digest the contents of one line at a time,
+        updating the internal state variables.'''
+        self.lock.acquire()
+        self.num_lines+=1
+        self.lock.release()
+    # example function for deep copy
+    # takes no arguments
+    # returns one object
+    def deep_copy(self):
+        '''This function should return a copy of the data structure used to
+        maintain state.  This copy should different from the object that is
+        currently being modified so that the other thread can deal with it
+        without fear of it changing out from under it.  The format of this
+        object is internal to the plugin.'''
+        return [ self.num_lines, ]
+    # example function for reset_state
+    # takes no arguments
+    # returns nothing
+    def reset_state(self):
+        '''This function resets the internal data structure to 0 (saving
+        whatever state it needs).  This function should be called
+        immediately after deep copy with a lock in place so the internal
+        data structures can't be modified in between the two calls.  If the
+        time between calls to get_state is necessary to calculate metrics,
+        reset_state should store now() each time it's called, and get_state
+        will use the time since that now() to do its calculations'''
+        self.num_lines = 0
+        self.last_reset_time = time.time()
+    # example for keeping track of runtimes
+    # takes no arguments
+    # returns float number of seconds for this run
+    def set_check_duration(self, dur):
+        '''This function only used if logtailer is in cron mode.  If it is
+        invoked, get_check_duration should use this value instead of calculating
+        it.'''
+        self.duration = dur
+        self.dur_override = True
+    def get_check_duration(self):
+        '''This function should return the time since the last check.  If called
+        from cron mode, this must be set using set_check_duration().  If in
+        daemon mode, it should be calculated internally.'''
+        if( self.dur_override ):
+            duration = self.duration
+        else:
+            cur_time = time.time()
+            duration = cur_time - self.last_reset_time
+            # the duration should be within 10% of period
+            acceptable_duration_min = self.period - (self.period / 10.0)
+            acceptable_duration_max = self.period + (self.period / 10.0)
+            if (duration < acceptable_duration_min or duration > acceptable_duration_max):
+                raise LogtailerStateException, "time calculation problem - duration (%s) > 10%% away from period (%s)" % (duration, self.period)
+        return duration
+    # example function for get_state
+    # takes no arguments
+    # returns a dictionary of (metric => metric_object) pairs
+    def get_state(self):
+        '''This function should acquire a lock, call deep copy, get the
+        current time if necessary, call reset_state, then do its
+        calculations.  It should return a list of metric objects.'''
+        # get the data to work with
+        self.lock.acquire()
+        try:
+            mydata = self.deep_copy()
+            check_time = self.get_check_duration()
+            self.reset_state()
+            self.lock.release()
+        except LogtailerStateException, e:
+            # if something went wrong with deep_copy or the duration, reset and continue
+            self.reset_state()
+            self.lock.release()
+            raise e
+
+        # crunch data to how you want to report it
+        lines_per_second = mydata[0] / check_time
+
+        # package up the data you want to submit
+        lps_metric = GangliaMetricObject('num_lines', lines_per_second, units='lps', type="float")
+        # return a list of metric objects
+        return [ lps_metric, ]
+
+
+

File src/PostfixLogtailer.py

View file
+###
+###  This plugin for logtailer will crunch postfix logs and produce the
+###  following metrics:
+###    * number of connections per second
+###    * number of messages deliveerd per second
+###    * number of bounces per second
+###
+
+import time
+import threading
+import re
+
+# local dependencies
+from ganglia_logtailer_helper import GangliaMetricObject
+from ganglia_logtailer_helper import LogtailerParsingException, LogtailerStateException
+
+class PostfixLogtailer(object):
+    # only used in daemon mode
+    period = 30.0
+    def __init__(self):
+        '''This function should initialize any data structures or variables
+        needed for the internal state of the line parser.'''
+        self.reset_state()
+        self.lock = threading.RLock()
+        # this is what will match the postfix lines
+        # postfix example log format string:
+        # connections:
+        # Sep 12 13:50:21 host postfix/smtpd[13334]: connect from unknown[1.2.3.4]
+        # deliveries:
+        # Sep 12 13:39:11 host postfix/local[11393]: E412470C2B8: to=<foo@host>, orig_to=<foo@bar.com>, relay=local, delay=5, delays=1.9/0/0/3.2, dsn=2.0.0, status=sent (delivered to command: /usr/local/bin/procmail)
+        # bounces:
+        # Sep 12 11:58:52 host postfix/local[18444]: 8D3C671C324: to=<invalid@host>, orig_to=<invalid@bar.com>, relay=local, delay=0.43, delays=0.41/0/0/0.02, dsn=5.1.1, status=bounced (unknown user: "invalid")
+        self.reg_connections = re.compile('^.*postfix/smtpd.*connect from unknown.*$')
+        self.reg_deliveries = re.compile('^.*postfix/local.* status=sent .*$')
+        self.reg_bounces = re.compile('^.*postfix/local.* status=bounced .*$')
+
+        # assume we're in daemon mode unless set_check_duration gets called
+        self.dur_override = False
+
+
+    # example function for parse line
+    # takes one argument (text) line to be parsed
+    # returns nothing
+    def parse_line(self, line):
+        '''This function should digest the contents of one line at a time,
+        updating the internal state variables.'''
+        self.lock.acquire()
+        try:
+            regMatch = self.reg_connections.match(line)
+            if regMatch:
+                self.num_connections+=1
+            regMatch = self.reg_deliveries.match(line)
+            if regMatch:
+                self.num_deliveries+=1
+            regMatch = self.reg_bounces.match(line)
+            if regMatch:
+                self.num_bounces+=1
+            
+        except Exception, e:
+            self.lock.release()
+            raise LogtailerParsingException, "regmatch or contents failed with %s" % e
+        self.lock.release()
+    # example function for deep copy
+    # takes no arguments
+    # returns one object
+    def deep_copy(self):
+        '''This function should return a copy of the data structure used to
+        maintain state.  This copy should different from the object that is
+        currently being modified so that the other thread can deal with it
+        without fear of it changing out from under it.  The format of this
+        object is internal to the plugin.'''
+        myret = dict( num_conns = self.num_connections,
+                    num_deliv = self.num_deliveries,
+                    num_bounc = self.num_bounces
+                    )
+        return myret
+    # example function for reset_state
+    # takes no arguments
+    # returns nothing
+    def reset_state(self):
+        '''This function resets the internal data structure to 0 (saving
+        whatever state it needs).  This function should be called
+        immediately after deep copy with a lock in place so the internal
+        data structures can't be modified in between the two calls.  If the
+        time between calls to get_state is necessary to calculate metrics,
+        reset_state should store now() each time it's called, and get_state
+        will use the time since that now() to do its calculations'''
+        self.num_connections = 0
+        self.num_deliveries = 0
+        self.num_bounces = 0
+        self.last_reset_time = time.time()
+    # example for keeping track of runtimes
+    # takes no arguments
+    # returns float number of seconds for this run
+    def set_check_duration(self, dur):
+        '''This function only used if logtailer is in cron mode.  If it is
+        invoked, get_check_duration should use this value instead of calculating
+        it.'''
+        self.duration = dur 
+        self.dur_override = True
+    def get_check_duration(self):
+        '''This function should return the time since the last check.  If called
+        from cron mode, this must be set using set_check_duration().  If in
+        daemon mode, it should be calculated internally.'''
+        if( self.dur_override ):
+            duration = self.duration
+        else:
+            cur_time = time.time()
+            duration = cur_time - self.last_reset_time
+            # the duration should be within 10% of period
+            acceptable_duration_min = self.period - (self.period / 10.0)
+            acceptable_duration_max = self.period + (self.period / 10.0)
+            if (duration < acceptable_duration_min or duration > acceptable_duration_max):
+                raise LogtailerStateException, "time calculation problem - duration (%s) > 10%% away from period (%s)" % (duration, self.period)
+        return duration
+    # example function for get_state
+    # takes no arguments
+    # returns a dictionary of (metric => metric_object) pairs
+    def get_state(self):
+        '''This function should acquire a lock, call deep copy, get the
+        current time if necessary, call reset_state, then do its
+        calculations.  It should return a list of metric objects.'''
+        # get the data to work with
+        self.lock.acquire()
+        try:
+            mydata = self.deep_copy()
+            check_time = self.get_check_duration()
+            self.reset_state()
+            self.lock.release()
+        except LogtailerStateException, e:
+            # if something went wrong with deep_copy or the duration, reset and continue
+            self.reset_state()
+            self.lock.release()
+            raise e
+
+        # crunch data to how you want to report it
+        connections_per_second = mydata['num_conns'] / check_time
+        deliveries_per_second = mydata['num_deliv'] / check_time
+        bounces_per_second = mydata['num_bounc'] / check_time
+
+        # package up the data you want to submit
+        cps_metric = GangliaMetricObject('postfix_connections', connections_per_second, units='cps')
+        dps_metric = GangliaMetricObject('postfix_deliveries', deliveries_per_second, units='dps')
+        bps_metric = GangliaMetricObject('postfix_bounces', bounces_per_second, units='bps')
+
+        # return a list of metric objects
+        return [ cps_metric, dps_metric, bps_metric, ]
+
+
+

File src/UnboundLogtailer.py

View file
+###
+### This logtailer plugin for ganglia-logtailer parses logs from Unbound and
+### produces the following metrics:
+###   * queries per second
+###   * recursion requests per second
+###   * cache hits per second
+###
+
+import time
+import threading
+
+# local dependencies
+from ganglia_logtailer_helper import GangliaMetricObject
+from ganglia_logtailer_helper import LogtailerParsingException, LogtailerStateException
+
+class UnboundLogtailer(object):
+    # period must be defined and indicates how often the gmetric thread should call get_state() (in seconds) (in daemon mode only)
+    # note that if period is shorter than it takes to run get_state() (if there's lots of complex calculation), the calling thread will automatically double period.
+    # period must be >15.  It should probably be >=60 (to avoid excessive load).  120 to 300 is a good range (2-5 minutes).  Take into account the need for time resolution, as well as the number of hosts reporting (6000 hosts * 15s == lots of data).
+    period = 5
+    def __init__(self):
+        '''This function should initialize any data structures or variables
+        needed for the internal state of the line parser.'''
+        self.dur_override = False
+        self.reset_state()
+        self.reg = re.compile('^(?P<month>\S+)\s+(?P<day>\S+)\s+(?P<time>\S+)\s+(?P<hostname>\S+)\s+(?P<program>\S+):\s+\[(?P<pid>\d+):\d+\]\s+(?P<facility>\S+):\s+server\sstats\sfor\sthread\s(?P<thread>\d+):\s+(?P<queries>\d+)\s+\S+\s+(?P<caches>\d+)\s+\S+\s+\S+\s+\S+\s+(?P<recursions>)\d+')
+        self.lock = threading.RLock()
+        self.queries = [0,0,0,0]
+        self.caches = [0,0,0,0]
+        self.recursions = [0,0,0,0]
+
+    # example function for parse line
+    # takes one argument (text) line to be parsed
+    # returns nothing
+    def parse_line(self, line):
+        '''This function should digest the contents of one line at a time,
+        updating the internal state variables.'''
+        self.lock.acquire()
+        regMatch = self.reg.match(line)
+        if regMatch:
+            self.num_lines+=1
+            bitsdict = regMatch.groupdict()
+            self.queries[int(bitsdict['thread'])] += int(bitsdict['queries'])
+            self.caches[int(bitsdict['thread'])] += int(bitsdict['caches'])
+            self.recursions[int(bitsdict['thread'])] += int(bitsdict['recursions'])
+            self.runningcount[bitsdict['time']][int(bitsdict['thread'])] = bitsdict['queries'], bitsdict['caches'], bitsdict['recursions']
+        self.lock.release()
+    # example function for deep copy
+    # takes no arguments
+    # returns one object
+    def deep_copy(self):
+        '''This function should return a copy of the data structure used to
+        maintain state.  This copy should different from the object that is
+        currently being modified so that the other thread can deal with it
+        without fear of it changing out from under it.  The format of this
+        object is internal to the plugin.'''
+        return [ self.num_lines, self.queries, self.caches, self.recursions ]
+    # example function for reset_state
+    # takes no arguments
+    # returns nothing
+    def reset_state(self):
+        '''This function resets the internal data structure to 0 (saving
+        whatever state it needs).  This function should be called
+        immediately after deep copy with a lock in place so the internal
+        data structures can't be modified in between the two calls.  If the
+        time between calls to get_state is necessary to calculate metrics,
+        reset_state should store now() each time it's called, and get_state
+        will use the time since that now() to do its calculations'''
+        self.num_lines = 0
+        self.queries = [0,0,0,0]
+        self.caches = [0,0,0,0]
+        self.recursions = [0,0,0,0]
+        self.last_reset_time = time.time()
+    # example for keeping track of runtimes
+    # takes no arguments
+    # returns float number of seconds for this run
+    def set_check_duration(self, dur):
+        '''This function only used if logtailer is in cron mode.  If it is
+        invoked, get_check_duration should use this value instead of calculating
+        it.'''
+        self.duration = dur
+        self.dur_override = True
+    def get_check_duration(self):
+        '''This function should return the time since the last check.  If called
+        from cron mode, this must be set using set_check_duration().  If in
+        daemon mode, it should be calculated internally.'''
+        if( self.dur_override ):
+            duration = self.duration
+        else:
+            cur_time = time.time()
+            duration = cur_time - self.last_reset_time
+            # the duration should be within 10% of period
+            acceptable_duration_min = self.period - (self.period / 10.0)
+            acceptable_duration_max = self.period + (self.period / 10.0)
+            if (duration < acceptable_duration_min or duration > acceptable_duration_max):
+                raise LogtailerStateException, "time calculation problem - duration (%s) > 10%% away from period (%s)" % (duration, self.period)
+        return duration
+    # example function for get_state
+    # takes no arguments
+    # returns a dictionary of (metric => metric_object) pairs
+    def get_state(self):
+        '''This function should acquire a lock, call deep copy, get the
+        current time if necessary, call reset_state, then do its
+        calculations.  It should return a list of metric objects.'''
+        # get the data to work with
+        self.lock.acquire()
+        try:
+            number_of_lines, queries, caches, recursions = self.deep_copy()
+            check_time = self.get_check_duration()
+            self.reset_state()
+            self.lock.release()
+        except LogtailerStateException, e:
+            # if something went wrong with deep_copy or the duration, reset and continue
+            self.reset_state()
+            self.lock.release()
+            raise e
+
+        # crunch data to how you want to report it
+        queries_per_second = sum(queries) / check_time
+        recursions_per_second = sum(recursions) / check_time
+        caches_per_second = sum(caches) / check_time
+
+        # package up the data you want to submit
+        qps_metric = GangliaMetricObject('queries', queries_per_second, units='qps')
+        rps_metric = GangliaMetricObject('recursions', recursions_per_second, units='rps')
+        cps_metric = GangliaMetricObject('caches', caches_per_second, units='cps')
+        # return a list of metric objects
+        return [ lps_metric, rps_metric, cps_metric ]
+
+
+

File src/ganglia-logtailer

View file
+#!/usr/bin/python -tt
+
+###
+###  logtailer
+###
+###  tails a log,  reports stuff to ganglia using gmetric
+###
+###  arguments:
+###    function to accumulate the log lines
+###    log file to tail
+###
+###  copyright Linden Research, Inc. 2008
+###  Released under the GPL v2 or later.
+###  For a full description of the license, please visit http://www.gnu.org/licenses/gpl.txt
+###
+###  $Id$
+###
+
+# System Libraries
+import os
+import sys
+import threading
+import time
+import optparse
+import stat
+# Logging module
+import logging.handlers
+
+# Local dependencies
+sys.path.append("/usr/share/ganglia-logtailer")
+from tailnostate import LogTail
+from ganglia_logtailer_helper import LogtailerParsingException, LogtailerStateException
+
+## globals
+gmetric = '/usr/bin/gmetric'
+#gmetric = '/bin/echo'
+#gmetric = '/bin/echo'
+conffile = '/etc/ganglia/gmond.conf'
+logtail = '/usr/sbin/logtail'
+logtail_statedir = '/var/lib/ganglia-logtailer/'
+
+
+## set up logging infrastructure for use throughout the script
+logDir = '/var/log/ganglia'
+if(not os.path.isdir(logDir)):
+    os.mkdir(logDir)
+logger = logging.getLogger('ganglia_logtailer')
+# open the log file for append, rotate at 1GB, keep 10 of them
+#hdlr = logger.RotatingFileHandler('%s/ganglia_logtailer.log' % logDir, 'a', 1000000000, 10)
+hdlr = logging.handlers.RotatingFileHandler('%s/ganglia_logtailer.log' % logDir, 'a', 10 * 1024 * 1024, 10)
+formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s')
+hdlr.setFormatter(formatter)
+logger.addHandler(hdlr)
+logger.setLevel(logging.INFO)
+#logger.setLevel(logging.DEBUG)
+
+
+## This provides a lineno() function to make it easy to grab the line
+## number that we're on (for logging)
+## Danny Yoo (dyoo@hkn.eecs.berkeley.edu)
+## taken from http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/145297
+import inspect
+def lineno():
+    """Returns the current line number in our program."""
+    return inspect.currentframe().f_back.f_lineno
+
+def submit_stats( parser, duration=None ):
+    if( duration != None ):
+        # this only happens in cron mode
+        parser.set_check_duration(duration)
+    try:
+        metrics = parser.get_state()
+        for m in metrics:
+            logger.debug( "Submitting gmetric: %s -c %s --name %s --value %s --type %s --units %s" %
+                 (gmetric, conffile, m.name, m.value, m.type, m.units) )
+            os.system("%s -c %s --name %s --value %s --type %s --units %s" %
+                (gmetric, conffile, m.name, m.value, m.type, m.units) )
+    except LogtailerStateException, e:
+        logger.warning( "State exception caught (line %s): %s" % (lineno(), e) )
+
+
+# function gmetric_manager
+# takes a parser object - class instance
+def gmetric_manager( parser ):
+    '''This process should be used to start the thread that calls
+    gmetric every so often.  It should get the period and data from the
+    parser object'''
+    period = parser.period
+
+    # wait one period the first time (so we have something to report)
+    time.sleep(period)
+
+    while True:
+        logger.debug("manager: starting")
+        start = time.time()
+        # submit the stats
+        submit_stats(parser)
+        finish = time.time()
+        runtime = finish - start
+        sleep_time = period - runtime
+        while( sleep_time <= 0 ):
+            logger.info( "manager: calculation time is longer than period.  doubling period to %s." % (period * 2) )
+            sleep_time += period
+            period *= 2
+            # tell the logtailer class that we're slowing period
+            parser.period = period
+        logger.debug( "manager: sleeping for %s" % sleep_time)
+        time.sleep(sleep_time)
+
+
+
+def main():
+
+    cmdline = optparse.OptionParser()
+    cmdline.add_option('--classname', '-c', action='store', help='The name of the plugin to use to parse the log file')
+    cmdline.add_option('--log_file', '-l', action='store', help='The path to the file to tail and parse')
+    cmdline.add_option('--mode', '-m', action='store', type='choice',
+                       choices=('daemon', 'cron'), default='cron',
+                       help='MODE must be "cron" or "daemon".  Cron mode (default) is designed to be called every X minutes.  Daemon mode is a persistent process.')
+    cmdline.add_option('--state_dir', '-s', action='store', default=logtail_statedir,
+                       help='The state dir is used in cron mode, and is where to store the logtail state file.  Default location %s' % logtail_statedir)
+
+    options, arguments = cmdline.parse_args()
+#    print ('classname = %s, log_file = %s, mode = %s, state_file = %s' % (options.classname, options.log_file, options.mode, options.state_dir) )
+
+    class_name = options.classname
+    log_file = options.log_file
+    mode = options.mode
+    state_dir = options.state_dir
+    dirsafe_logfile = log_file.replace('/','-')
+    logtail_state_file = '%s/logtail-%s%s.state' % (state_dir, class_name, dirsafe_logfile)
+
+    # only used in cron mode
+    shell_tail = '%s -f %s -o %s' % (logtail, log_file, logtail_state_file)
+
+    logger.debug( "ganglia-logtailer started with class %s, log file %s, mode %s" % (class_name, log_file, mode))
+
+    # import and instantiate the class from the module passed in.  Files and Class names must be the same.
+    try:
+        sys.path.append("/usr/local/share/ganglia-logtailer")       
+        module = __import__(class_name)
+        parser = getattr(module, class_name)()
+    except Exception, e:
+        print "Failed to instantiate parser (line %s): %s" % (lineno(), e)
+        sys.exit(1)
+
+    # get input to parse
+    if ( mode == 'daemon' ):
+        # open the log file for tailing
+        try:
+            input = LogTail(log_file)
+        except Exception, e:
+            print "Failed to instantiate LogTail instance (line %s): %s" % (lineno(), e)
+            sys.exit(1)
+    elif ( mode == 'cron' ):
+        try:
+            # find out how long it's been since we last ran.
+            try:
+                state_file_age = os.stat(logtail_state_file)[stat.ST_MTIME]
+            except OSError, e:
+                # this is our first run or our state file got nuked.
+                # write out a new state file and exit
+                logger.info('First run or state file got nuked.  Wrote new state file. Exiting.')
+                input = os.popen(shell_tail)
+                retval = input.close()
+                if( retval != 256 ):
+                    logger.warning('%s returned bad exit code %s' %
+                                   (shell_tail, retval))
+                sys.exit(0)
+            input = os.popen(shell_tail)
+        except Exception, e:
+            # note - there is no exception when shell-tailer doesn't exist.
+            # I don't know when this exception will ever actually be triggered.
+            print ("Failed to run %s to get log data (line %s): %s" %
+                   (shell_tail, lineno(), e))
+            sys.exit(1)
+    else:
+        raise Exception, "mode (%s) misunderstood" % mode
+
+    # if we're a daemon, launch the other thread (cron mode runs after the parsing)
+    if ( mode == 'daemon' ):
+        #launch gmetric caller thread
+        submitter = threading.Thread(target=gmetric_manager, args=[parser])
+        # the process should die when the main thread dies
+        submitter.setDaemon( True )
+        submitter.start()
+
+    # parse each line in turn
+    try:
+        for line in input:
+            # this will never end in daemon mode, but will in cron mode
+            try:
+                # if in daemon mode, die if our submitter thread has failed
+                if( mode == 'daemon' and not submitter.isAlive() ):
+                    raise Exception, "submitter thread died"
+
+#                logger.info( "parsing line")
+                parser.parse_line(line)  # crunch each line in turn
+
+            except LogtailerParsingException, e:
+                # this should only catch recoverable exceptions (of which there aren't any at the moment)
+                logger.warning( "Parsing exception caught at %s: %s" % (lineno(), e))
+    except Exception, e:
+        print "Exception caught at %s: %s" % (lineno(), e)
+        sys.exit(1)
+
+    # if we're called from cron, crunch the stats
+    if ( mode == 'cron' ):
+        # calculate now() - state file age to determine check duration
+        now = time.time()
+        duration = now - state_file_age
+        if ( duration <= 45 ):
+            # something's borked.  cron's minimum is 60s
+            logger.warning('duration (%s) less than 45s, despite being called from cron.  Shouldn\'t happen. (line: %s)' % (duration, lineno()))
+        #print 'metric measure with duration: %s' % duration
+        submit_stats(parser, duration=duration)
+
+if __name__ == '__main__':
+    main()
+
+

File src/ganglia_logtailer_helper.py

View file
+#!/usr/bin/python
+"""class for ganglia metric objects to be passed around"""
+
+class GangliaMetricObject(object):
+    def __init__(self, name, value, units='', type='float', tmax=60):
+        self.name = name
+        self.value = value
+        self.units = units
+        self.type = type
+        self.tmax = tmax
+
+class LogtailerParsingException(Exception):
+    """Raise this exception if the parse_line function wants to
+        throw a 'recoverable' exception - i.e. you want parsing
+        to continue but want to skip this line and log a failure."""
+    pass
+
+class LogtailerStateException(Exception):
+    """Raise this exception if the get_state function has failed.  Metrics from
+       this run will not be submitted (since the function did not properly
+       return), but reset_state() should have been called so that the metrics
+       are valid next time."""
+    pass

File src/tailnostate.py

View file
+#!/usr/bin/python
+"""Tail a file, reopening it if it gets rotated"""
+
+import time, os, sys, glob
+
+
+class Tail(object):
+    def __init__(self, filename, start_pos=0):
+        self.fp = file(filename)
+        self.filename = filename
+
+        if start_pos < 0:
+            self.fp.seek(-start_pos-1, 2)
+            self.pos = self.fp.tell()
+        else:
+            self.fp.seek(start_pos)
+            self.pos = start_pos
+
+    def __iter__(self):
+        """Return next line.  This function will sleep until there *is* a
+        next line.  Works over log rotation."""
+        counter = 0
+        while True:
+            line = self.next()
+            if line is None:
+                counter += 1
+                if counter >= 5:
+                    counter = 0
+                    self.check_inode()
+                time.sleep(1.0)
+            else:
+                yield line
+
+    def check_inode(self):
+        """check to see if the filename we expect to tail has the same
+        inode as our currently open file.  This catches log rotation"""
+        inode = os.stat(self.filename).st_ino
+        old_inode = os.fstat(self.fp.fileno()).st_ino
+        if inode != old_inode:
+            self.fp = file(self.filename)
+            self.pos = 0
+
+    def next(self):
+        """Return the next line from the file.  Returns None if there are not
+        currently any lines available, at which point you should sleep before
+        calling again.  Does *not* handle log rotation.  If you use next(), you
+        must also use check_inode to handle log rotation"""
+        where = self.fp.tell()
+        line = self.fp.readline()
+        if line and line[-1] == '\n':
+            self.pos += len(line)
+            return line
+        else:
+            self.fp.seek(where)
+            return None
+
+    def close(self):
+        self.fp.close()
+
+
+
+class LogTail(Tail):
+    def __init__(self, filename):
+        self.base_filename = filename
+        super(LogTail, self).__init__(filename, -1)
+
+    def get_file(self, inode, next=False):
+        files = glob.glob('%s*' % self.base_filename)
+        files = [(os.stat(f).st_mtime, f) for f in files]
+        # Sort by modification time
+        files.sort()
+
+        flag = False
+        for mtime, f in files:
+            if flag:
+                return f
+            if os.stat(f).st_ino == inode:
+                if next:
+                    flag = True
+                else:
+                    return f
+        else:
+            return self.base_filename
+
+    def reset(self):
+        self.fp = file(self.filename)
+        self.pos = 0
+
+    def advance(self):
+        self.filename = self.get_file(os.fstat(self.fp.fileno()).st_ino, True)
+        self.reset()
+
+    def check_inode(self):
+        if self.filename != self.base_filename or os.stat(self.filename).st_ino != os.fstat(self.fp.fileno()).st_ino:
+            self.advance()
+
+
+def main():
+    import sys
+
+    t = Tail(sys.argv[1], -1)
+    for line in t:
+        print line
+
+
+if __name__ == '__main__':
+    main()
+
+