1. yang xiaoyong
  2. woocode

Source

woocode / py / sa / statistics / miniclient_statistics.py

#!/usr/bin/env python
# -*- encoding:utf-8 -*-
from __future__ import with_statement
import os
import sys
import re
import csv
import tarfile
import urlparse
import time

from log import get_logger
log = get_logger('mini')

KB = 1024
MB = KB ** 2
SIZE_THRESHOLD = 600 * MB # 下载的文件小于该值则标识用户流失

NGINX_LOG_FORMAT = '%{remote_addr}i %{-}i %{remote_user}i %{time_local}i %{request}i %{status}i %{body_bytes_sent}i %{http_referer}i %{http_user_agent}i'
log.debug('NGINX_LOG_FORMAT: %s' % NGINX_LOG_FORMAT)

# TODO: 这样写的有点奇怪了 :(
NGINX_LOG_DIRECTIVE_MAP = {
                 '%{remote_addr}i': 'remote_addr', # remote address
                 '%{remote_user}i': 'remote_user', # remote user
                 '%{time_local}i': 'time_local',  # local time
                 '%{request}i': 'request',     # request url
                 '%{status}i': 'status',     # http status
                 '%{body_bytes_sent}i': 'body_bytes_sent', # body_bytes_sent
                 '%{http_referer}i': 'http_referer',
                 '%{http_user_agent}i': 'http_user_agent',
                 '%{-}i': '-', # nginx weird format :(
}
log.debug('NGINX_LOG_DIRECTIVE_MAP: %s' % NGINX_LOG_DIRECTIVE_MAP)

def dump_obj(fn, obj):
    import pickle
    with open(fn, 'wb') as fb:
        pickle.dump(obj, fb)

class LogLineGenerator(object):
    def __init__(self, fobj, log_format=NGINX_LOG_FORMAT):
        if isinstance(fobj, (file, tarfile.ExFileObject)):
            self.fobj = fobj
        elif isinstance(fobj, str):
            self.fobj = open(fobj)
        else:
            raise TypeError('fobj only support file object or file path. Got: %r' % type(fobj))
        self.log_format = log_format
        self.re_tsquote = re.compile(r'(\[|\])')

    def _quote_translator(self):
        for line in self.fobj:
            line = line.strip()
            # log.debug(line)
            yield self.re_tsquote.sub('"', line)

    def get_loglines(self):
        reader = csv.DictReader(self._quote_translator(),
                                fieldnames=self.log_format.split(' '),
                                delimiter=' ',
                                quotechar='"')
        for line in reader:
            yield line

def get_logobj_from_tarfile(filename):
    '''从tar文件里获取日志文件对象'''

    tf = tarfile.open(filename)
    for member in tf.members:
        if member.name == 'mini_stat_access.run':
            yield LogLineGenerator(tf.extractfile(member))

def query_to_dict(query):
    '''
    >>> q = 'a=a&b=b'
    >>> query_to_dict(q)
    {'a': 'a', 'b': 'b'}
    >>> q = 'a=a&b=b&c='
    >>> query_to_dict(q)
    {'a': 'a', 'c': '', 'b': 'b'}
    >>> q = 'a=a%XX&b=b&c=c'
    >>> query_to_dict(q)
    {'a': 'a%XX', 'c': 'c', 'b': 'b'}
    '''
    return dict([k.split('=') for k in query.split('&')])

def parse_request_url(url_str):
    method, url, protocol = url_str.split()
    o = urlparse.urlparse(url)
    ret = query_to_dict(o.query)
    # convert all str to int which is type is number
    for k, v in ret.iteritems():
        if v.isdigit():
            ret[k] = int(v)
    return ret

def parse_non_digit(s, default=0, keep_postive=True):
    '''
    >>> parse_non_digit('yu')
    0
    >>> parse_non_digit('-11')
    0
    >>> parse_non_digit('-11', False)
    -11
    >>> parse_non_digit('')
    0
    >>> parse_non_digit(0)
    0
    >>> parse_non_digit(1)
    1
    >>> parse_non_digit(32321)
    32321
    '''
    ret = 0
    if isinstance(s, int):
        ret = s
    elif isinstance(s, str):
        try:
            ret = int(s)
        except ValueError:
            pass
    if keep_postive:
        ret = ret > 0 and ret or 0
    return ret

class Statistics(object):
    def __init__(self, st):
        self.st = st
        self.sum_size = 0
        self.lost_users_traffic = 0
        self._parse()

    def _parse(self, convert_unit=True):
        self.lost_users = 0
        self._statistics = []
        for id, val in self.st.iteritems():
            max_size = max(val['size'])
            is_lost = max_size < SIZE_THRESHOLD
            speed_list = val['speed']
            aver_speed = float(sum(speed_list)) / len(speed_list)
            if convert_unit:
                max_size = max_size / MB
                aver_speed = '{:.2f}'.format(aver_speed / KB)
            self._statistics.append((id, max_size, aver_speed, is_lost))
            if is_lost: self.lost_users += 1

    def _sum_traffic(self):
        if self.sum_size > 0:
            return self.sum_size
        for id, val in self.st.iteritems():
            self.sum_size += max(val['size'])
        return self.sum_size

    # TODO: to summarise action!=speed traffic
    def sum_traffic(self):
        return self._sum_traffic()

    def _sum_lost_users_traffic(self):
        if self.lost_users_traffic > 0:
            return self.lost_users_traffic
        for id, val in self.st.iteritems():
            max_size = max(val['size'])
            is_lost = max_size < SIZE_THRESHOLD
            if is_lost:
                self.lost_users_traffic += max_size
        return self.lost_users_traffic

    def sum_lost_users_traffic(self):
        return self._sum_lost_users_traffic()

    def to_str(self):
        for st in self._statistics:
            print '-' * 20
            print '%10s: %s' % ('guid', st[0])
            print '%10s: %s MB' % ('max_size', st[1])
            print '%10s: %.2f KB' % ('aver_speed', st[2])
            print '%10s: %s' % ('is_lost', st[3])
        print 'lost users: %d' % self.lost_users

    def to_csv(self, csvfile):
        with open(csvfile, 'wb') as fb:
            writer = csv.writer(fb, delimiter=",", quotechar='"')
            writer.writerow(['guid', 'max_size/MB', 'aver_speed/KB', 'is_lost'])
            writer.writerows(self._statistics)

def get_files_by_ext(path, ext):
    '''通过文件的后缀名来获得某个目录下的所有相关文件'''

    for fn in os.listdir(path):
        if fn.endswith(ext):
            yield os.path.join(path, fn)

def main():
    if len(sys.argv) < 2:
        print >> sys.stderr, "%s tar_directory" % sys.argv[0]
        sys.exit(2)

    tar_dir = sys.argv[1]
    # TODO: 将所有的数据使用一个dict来存储,所有的数据都在内存里面,是否有更好的方法,比如使用持久存储
    statistics_results = {}
    tar_files = get_files_by_ext(tar_dir, '.tar.gz')
    # log.info('found tar files %r' % tar_files)
    items_can_not_st = 0
    used_time = 0.0
    valid_ids = {}
    for tar_file in tar_files:
        start_time = time.time()
        log.info('starting parse tar file %s' % tar_file)
        for logobj in get_logobj_from_tarfile(os.path.join(tar_dir, tar_file)):
            for line in logobj.get_loglines():
                request_url = line['%{request}i']
                # skip the line which doesn't include action=speed string
                if 'action=speed' not in request_url:
                    continue
                d = parse_request_url(request_url)
                # convert none value | str | negative value to 0
                size = parse_non_digit(d.get('size', 0))
                speed = parse_non_digit(d.get('speed', 0))

                # 默认初始化所有的guid都为无效,直到遇到一个有效的值
                valid_ids[d['id']] = False
                if (size == 0 or speed == 0):
                    items_can_not_st += 1
                    # log.debug('invalid guid %s' % d['id'])
                else:
                    valid_ids[d['id']] = True

                if d['id'] not in statistics_results:
                    statistics_results[d['id']] = d
                    statistics_results[d['id']]['size'] = []
                    statistics_results[d['id']]['speed'] = []
                statistics_results[d['id']]['size'].append(size)
                statistics_results[d['id']]['speed'].append(speed)


            duration_time = time.time() - start_time
            used_time += duration_time
            log.debug('used %.2f seconds to finish' % duration_time)
            log.info('finished file %s' % tar_file)

    # 统计无效的guid
    invalid_guids = []
    for guid, is_valid in valid_ids.iteritems():
        if not is_valid:
            log.info('Invalid guid %s' % guid)
            del statistics_results[guid]
            invalid_guids.append(guid)

    log.debug('Used %.2f seconds' % used_time)

    statistics = Statistics(statistics_results)
    statistics.to_csv('statistics.csv')

    log.debug('Invalid guids: %r' % invalid_guids)
    log.info('Invalid guids: %d' % len(invalid_guids))
    log.info('Items can not statistics: %d' % items_can_not_st)
    log.info('Lost users: %d' % statistics.lost_users)
    log.info('Lost users traffic: %d bytes' % statistics.sum_lost_users_traffic())
    log.info('Summarise all traffic: %d bytes' % statistics.sum_traffic())

    # dump obj to file
    dump_obj('statistics_results.pkl', statistics_results)
    dump_obj('invalid_guid.pkl', invalid_guids)

def t():
    import pickle
    statistics_results = pickle.load(open('statistics_results.pkl'))
    statistics = Statistics(statistics_results)
    # statistics.to_csv('statistics.csv')
    print statistics.sum_traffic()
    print statistics.sum_lost_users_traffic()

def test():
    import doctest
    doctest.testmod()

if __name__ == '__main__':
    main()