Source

weblog / clean.py

Full commit
#!/usr/bin/env python

'''
Web Log Cleaning class.

Contents:

- Cleaner: logfile cleaning class
  clean_log = weblog.clean.Cleaner(log object)
  methods:
	- clean_log.getlogent()
  variables:
	- clean_log.directory_index  [ filenames that are equal to directories ]
	- clean_log.cache_size  [ maximum size of url cache ]

- test: test function
'''


# (c) 1998 Copyright Mark Nottingham
# <mnot@pobox.com>
#
# This software may be freely distributed, modified and used,
# provided that this copyright notice remain intact.
#
# This software is provided 'as is' without warranty of any kind.


# Web Log Cleaning
#
# The log object fed to a Cleaner MUST be preprocessed with weblog.url.Parser.
#
# The Cleaner class will normalise the log entries as much as possible, 
# resulting in more accurate statistics for analysis. Although its efforts
# are not perfect (particularly with path collapsing, due to the differing
# natures of browsers and servers), it tries to 'do the right thing'.
# 
# This class will do the following to a log fed to it, for both request URL
# and referer (if available):
# 
# - collapse out ./ ../ // and other sequences where appropriate
# 
# - replace % escaped sequences in the path with their equivalents
# 
# Additionally, 
# 
# - if a request URL is for a page specified in the directory_index list,
#   that page is trimmed from the path
# 
# - referer hosts are normalised to lowercase (actually done by UrlParser)
# 
# - referer hosts that specify a port which is the default port for that
#   service will have it removed
#
# - referer news: URL's that specify a host have it removed


__version__ = '1.0'



from string import join, rfind
from regsub import gsub
from urlparse import urlunparse
from urllib import unquote
import regex

_replace_pat = regex.compile('\(/[^/]+/\.\./?\|/\./\|//\|/\.$\|/\.\.$\)')

_default_port = {	'http': ':80',
					'https': ':443',
					'gopher': ':70',
					'news': ':119',
					'snews': ':563',
					'nntp': ':119',
					'snntp': ':563',
					'ftp': ':21',
					'telnet': ':23',
					'prospero': ':191',
				}

_relative_scheme = {	'http': 1,
						'https': 1,
						'news': 1,
						'snews': 1,
						'nntp': 1,
						'snntp': 1,
						'ftp': 1,
						'file': 1,
						'': 1
					}


class Cleaner:
	def __init__(self, log):
		self.log = log
		self.directory_index = []
		self.cache_size = 10000
		self._cache = {'url': {}, 'ref': {}}
		self._referer_present = 0
		if hasattr(self.log, 'referer'):
			self._referer_present = 1


	def __getattr__(self, attr):
		try:
			return getattr(self.log, attr)
		except AttributeError:
			raise AttributeError, attr


	def __setattr__(self, attr, value):
		if attr == 'directory_index':
			self._dir_pat = "/\(" + join(value, "\|") + "\)$"
			self._dir_comp = regex.compile(self._dir_pat)
		self.__dict__[attr] = value


	def getlogent(self):
		''' Increment position in the log and populate requested attributes '''

		if self.log.getlogent():
			### clean url
			if not self._cache['url'].has_key(self.log.url):
				self._cache['url'][self.log.url] = self._clean(self.log.url, 'url')
			clean_url = self._cache['url'][self.log.url]
			self.url = urlunparse(clean_url)
			self.url_scheme = clean_url[0]
			self.url_host = clean_url[1]
			self.url_path = clean_url[2]
			
			### clean referer
			if self._referer_present:
				if not self._cache['ref'].has_key(self.log.referer):
					self._cache['ref'][self.log.referer] = self._clean(self.log.referer, 'ref')
				clean_ref = self._cache['ref'][self.log.referer]
				self.referer = urlunparse(clean_ref)
				self.ref_scheme = clean_ref[0]
				self.ref_host = clean_ref[1]
				self.ref_path = clean_ref[2]
			return 1
		else:
			return 0


	def _clean(self, url, url_type):
		''' cleans url (url_type is url or ref) '''

		if len(self._cache[url_type]) > self.cache_size:
			self._cache[url_type] = {}
			
		scheme = getattr(self.log, url_type + "_scheme")
		path = getattr(self.log, url_type + "_path")

		if _relative_scheme.get(scheme, 0):
			last_path = path
			while 1:
				path = gsub(_replace_pat, '/', path)
				if last_path == path:
					break
				last_path = path

		path = unquote(path)

		if url_type == 'url':		 	
			scheme, host = '', ''
			if self._dir_comp:
				if self._dir_comp.search(path) > -1:
					slash_index = rfind(path, '/') + 1
					path = path[:slash_index]

		elif url_type == 'ref':
			host = self.log.ref_host
			colon_index = rfind(host, ':')
			if colon_index:
				if host[colon_index:] == _default_port.get(scheme, '#'):
					host = host[:colon_index]
			if scheme == 'news':
				slash_index = rfind(path, '/') + 1
				path = path[slash_index:]
			
		return (	scheme, 
					host, 
					path, 
					getattr(self.log, url_type + "_parameters"),
					getattr(self.log, url_type + "_query"),
					getattr(self.log, url_type + "_fragment")
				)




def test():
	''' basic test suite- modify at will to test full functionality '''

	import sys
	from weblog import combined, url

	file = sys.stdin
	log = combined.Parser(file)
	p_log = url.Parser(log)

	clean_log = Cleaner(p_log)
	clean_log.directory_index = ['index.html', 'index.htm']

	while clean_log.getlogent():
		print "%s\n%s %s" % (log.url, clean_log.url, clean_log.url_path)
		print "%s\n%s %s %s" % (log.referer, clean_log.referer, clean_log.ref_host, clean_log.ref_path)
		print



class Dummy:
	''' dummy log cless for test_clean() '''
	pass


def test_clean():
	''' function to test _clean's operation '''
	
	from urlparse import urlparse

	dummy = Dummy()
	cleaner = Cleaner(dummy)
	cleaner.directory_index = ['index.html', 'index.htm']	

	u_tests = { '/foo/../bar':				'/bar',
				'/foo/index.html':			'/foo/',
				'/foo/index.html/':			'/foo/index.html/',
				'/index.html.':				'/index.html.',
				'/index.html':				'/',
				'/index.htm':				'/',
				'/index.ht':				'/index.ht'
			}

	r_tests = {	'/foo/bar/.':				'/foo/bar/', 
				'/foo/bar/./':				'/foo/bar/',
				'/foo/bar/..':				'/foo/',
				'/foo/bar/../': 			'/foo/',
				'/foo/bar/../baz': 			'/foo/baz',
				'/foo/bar/../..': 			'/',
				'/foo/bar/../../': 			'/',
				'/foo/bar/../../baz': 		'/baz',
				'/foo/bar/../../../baz':	'/../baz',
				'/foo/bar/../../../../baz':	'/baz',
				'/./foo':					'/foo',
				'/../foo':					'/../foo',
				'/foo.':					'/foo.',
				'/.foo':					'/.foo',
				'/foo..':					'/foo..',
				'/..foo':					'/..foo',
				'/./../foo':				'/foo',
				'/./foo/.':					'/foo/',
				'/foo/./bar':				'/foo/bar',
				'/foo/../bar':				'/bar',
				'/foo//':					'/foo/',
				'/foo///bar//':				'/foo/bar/',	
				'news:alt.this.group':		'news:alt.this.group',
				'news://foo.com/user-full.article.number@whatever.server.net':	'news:user-full.article.number@whatever.server.net',
				'news:user-full.article.number@whatever.server.net':	'news:user-full.article.number@whatever.server.net',
				'http://www.foo.com:80/foo':	'http://www.foo.com/foo',
				'http://www.foo.com:8000/foo':	'http://www.foo.com:8000/foo',
				'http://www.foo.com/%7ebar':	'http://www.foo.com/~bar',
				'http://www.foo.com/%7Ebar':	'http://www.foo.com/~bar',
				'-':						'-',
			}

	n_correct, n_fail = 0, 0
	test_types = { 	'url': u_tests,
					'ref': r_tests,
				}
	type_keys = test_types.keys()
	
	for ty in type_keys:
		test_keys = test_types[ty].keys()
		test_keys.sort()		
	
		for i in test_keys:
			print 'ORIGINAL:', i
			(	dummy.__dict__[ty + '_scheme'],
				dummy.__dict__[ty + '_host'],
				dummy.__dict__[ty + '_path'],
				dummy.__dict__[ty + '_parameters'],
				dummy.__dict__[ty + '_query'],
				dummy.__dict__[ty + '_fragment']	) = urlparse(i)
			
			cleaned = urlunparse(cleaner._clean(i, ty))
			answer = test_types[ty][i]
			print ty, 'CLEANED: ', cleaned
			print ty, 'CORRECT: ', answer
			if cleaned != answer:
				print "*** TEST FAILED"
				n_fail = n_fail + 1
			else:
				n_correct = n_correct + 1
			print
		
	print "TOTAL CORRECT:", n_correct
	print "TOTAL FAILURE:", n_fail




if __name__ == '__main__':
	test()