Source

pylint_goatlog / reporters / goatlog.py

from __future__ import absolute_import

import sys
import json
import logging

from collections import defaultdict

from pylint.reporters import BaseReporter
from pylint.interfaces import IReporter

from logilab.common.ureports import TextWriter

from goatlog import LogManager
from goatlog.handlers import JsonHandler, ColorizedOutputHandler
from goatlog.utils import Statuses

class ComplexEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, set):
            return list(obj)

        return json.JSONEncoder.default(self, obj)

class GoatlogReporter(BaseReporter):
    __implements__ = IReporter
    extension = 'json'
    lines_messages = defaultdict(list)

    def __init__(self, output_file = None):
        self._current_path = None
        self._log_manager = None

        BaseReporter.__init__(self, output_file)

    def set_output(self, output_file = None):
        if output_file is None:
            output_file = sys.stdout

        self.output = output_file

        if output_file is sys.stdout:
            self._log_manager = LogManager('pylint.stdout', [ColorizedOutputHandler()])
        else:
            self._log_manager = LogManager('pylint.%s' % output_file.name,
                [JsonHandler(output_file)])

        # Define statuses
        self.statuses = Statuses()
        self.statuses.add_status('I', 1, levelno=logging.INFO,
            log_level=self._log_manager.info, human='Info')
        self.statuses.add_status('C', 2, levelno=logging.INFO,
            log_level=self._log_manager.info, human='Convention')
        self.statuses.add_status('R', 3, levelno=logging.INFO,
            log_level=self._log_manager.info, human='Refactor')
        self.statuses.add_status('W', 4, levelno=logging.WARNING,
            log_level=self._log_manager.warning, human='Warning')
        self.statuses.add_status('E', 5, levelno=logging.ERROR,
            log_level=self._log_manager.error, human='Error')
        self.statuses.add_status('F', 6, levelno=logging.CRITICAL,
            log_level=self._log_manager.critical, human='Fatal')

        self._current_path = None
        self._file_status = 'I'

    def add_message(self, msg_id, location, msg):
        '''Main method
        '''
        path, _, obj, line, col_offset = location
        if not path == self._current_path:

            if self._current_path is not None:
                self.close_file_context()

            self._log_manager.open('file', path)
            self._current_path = path

        # Groupe messages by line
        self.lines_messages[line].append((line, obj, col_offset, msg_id, msg))

    def close_file_context(self):
        '''Print all messages grouped by line, reset storage and close file context
        '''
        self.print_messages_by_line()
        self.lines_messages = defaultdict(list)

        self._log_manager.close('file', self._current_path,
            self.statuses[self._file_status]['human'],
            loglevel=self.statuses[self._file_status]['levelno'])

        self._file_status = 'I'

    def print_messages_by_line(self):
        '''Print all messages, start and stop line contexts
        '''
        max_line = max(self.lines_messages)

        for line in xrange(1, max_line + 1):

            datas = self.lines_messages.get(line)
            if not datas:
                continue

            self._log_manager.open('line', line)
            statuses = map(self.print_a_message, datas)
            line_status = self.statuses.max(*statuses)
            line_data = self.statuses[line_status]

            self._file_status = self.statuses.max(self._file_status,
                line_status)

            self._log_manager.close('line', line, line_data['human'],
                line_data['levelno'])

    def print_a_message(self, data):
        _, obj, col_offset, msg_id, msg = data
        if obj:
            obj = ':%s' % obj
        if self.include_ids:
            sigle = msg_id
        else:
            sigle = msg_id[0]

        level = self.statuses[sigle]['log_level']
        level('%s,%s%s: %s' % (sigle, col_offset, obj, msg))
        return sigle

    def on_close(self, stats):
        self.stats = stats

    def _display(self, layout):
        """launch layouts display"""

        if self._current_path is not None:
            self.close_file_context()

        if self.output is sys.stdout:
            print >> self.output
            TextWriter().format(layout, self.output)
        else:
            json.dump(self.stats, self.output, cls=ComplexEncoder)