Source

IMDBReview_NaiveBayes / NaiveBayes.py

Full commit
# NLP Programming Assignment #3
# NaiveBayes
# 2012

#
# The area for you to implement is marked with TODO!
# Generally, you should not need to touch things *not* marked TODO
#
# Remember that when you submit your code, it is not run from the command line 
# and your main() will *not* be run. To be safest, restrict your changes to
# addExample() and classify() and anything you further invoke from there.
#


import sys
import getopt
import os
import math

class NaiveBayes:
  class TrainSplit:
    """Represents a set of training/testing data. self.train is a list of Examples, as is self.test. 
    """
    def __init__(self):
      self.train = []
      self.test = []

  class Example:
    """Represents a document with a label. klass is 'pos' or 'neg' by convention.
       words is a list of strings.
    """
    def __init__(self):
      self.klass = ''
      self.words = []


  def __init__(self):
    """NaiveBayes initialization"""
    self.FILTER_STOP_WORDS = False
    self.LOAD_TRAINED_DATA = False
    self.stopList = set(self.readFile('../data/english.stop'))
    self.numFolds = 10

  #############################################################################
  # TODO TODO TODO TODO TODO 
  
  
  positive_examples = { }
  negative_examples = { }

  def classify(self, words):
    """ TODO
      'words' is a list of words to classify. Return 'pos' or 'neg' classification.
    """
    positive_u = sum([pos_word for pos_word in self.positive_examples.values()])
    negative_u = sum([neg_word for neg_word in self.negative_examples.values()])

    RSV = 0

    for word in words:
      if not self.isLegalWord(word):
        continue

      word = self.filterWord(word)

      positive_count = 0
      if word in self.positive_examples:
        positive_count = self.positive_examples[word]

      negative_count = 0
      if word in self.negative_examples:
        negative_count = self.negative_examples[word]

      p_pos = float(positive_count + 1) / float(positive_u + 1 * len(self.positive_examples))
      p_neg = float(negative_count + 1) / float(negative_u + 1 * len(self.negative_examples))

      if not word in self.positive_examples:
        self.positive_examples[word] = 1
      else:
        self.positive_examples[word] += 1

      if not word in self.negative_examples:
        self.negative_examples[word] = 1
      else:
        self.negative_examples[word] += 1

      RSV += math.log((p_pos * (1 - p_neg)) / (p_neg * (1 - p_pos)))

    if RSV >= 0:
      return 'pos'
    else:
      return 'neg'

  def save_trained(self, file):
    output = open(file, 'w+');
    for word, count in self.positive_examples.items():
      output.write(word + ' ' + str(count) + ' pos\n');
    for word, count in self.negative_examples.items():
      output.write(word + ' ' + str(count) + ' neg\n');

#    print 'output ' + 'pos: '+ str(len(self.positive_examples)) + ' neg: ' + str(len(self.negative_examples));
    output.close();

  def read_trained(self, file):
    input = open(file, 'r');
    lines = input.readlines();
    input.close();
    
    self.positive_examples = {}
    self.negative_examples = {}
    for line in lines:
      words = line.split();
      if(words[2] == 'pos'):
        self.positive_examples[words[0]] = (int)(words[1]);
      else:
        self.negative_examples[words[0]] = (int)(words[1]);
#    print 'read ' + 'pos: ' + str(len(self.positive_examples)) + ' neg: ' + str(len(self.negative_examples));

  def isLegalWord(self, word):
    if '.' in word or '_' in word or '=' in word or '/' in word or '\\' in word or '`' in word or '&' in word or word[0] == '\'' or '-' in word:
      return False
    return True

  def filterWord(self, word):
    if '\'t' in word and word[0] != '\'':
      return 'NOT_' + word;
    return word

  def addExample(self, klass, words):
    """
     * TODO
     * Train your model on an example document with label klass ('pos' or 'neg') and
     * words, a list of strings.
     * You should store whatever data structures you use for your classifier 
     * in the NaiveBayes class.
     * Returns nothing
    """
    if klass == 'pos':
      for word in words:
        if self.isLegalWord(word):
          filtered = self.filterWord(word)
          if not filtered in self.positive_examples:
            self.positive_examples[filtered] = 1
          else:
            self.positive_examples[filtered] += 1

    elif klass == 'neg':      
      for word in words:
        if self.isLegalWord(word):
          filtered = self.filterWord(word)
          if not filtered in self.negative_examples:
            self.negative_examples[filtered] = 1
          else:
            self.negative_examples[filtered] += 1

    pass
      

  # TODO TODO TODO TODO TODO 
  #############################################################################
  
  
  def readFile(self, fileName):
    """
     * Code for reading a file.  you probably don't want to modify anything here, 
     * unless you don't like the way we segment files.
    """
    contents = []
    f = open(fileName)
    for line in f:
      contents.append(line)
    f.close()
    result = self.segmentWords('\n'.join(contents)) 
    return result

  
  def segmentWords(self, s):
    """
     * Splits lines on whitespace for file reading
    """
    return s.split()

  
  def trainSplit(self, trainDir):
    """Takes in a trainDir, returns one TrainSplit with train set."""
    split = self.TrainSplit()
    posTrainFileNames = os.listdir('%s/pos/' % trainDir)
    negTrainFileNames = os.listdir('%s/neg/' % trainDir)
    for fileName in posTrainFileNames:
      example = self.Example()
      example.words = self.readFile('%s/pos/%s' % (trainDir, fileName))
      example.klass = 'pos'
      split.train.append(example)
    for fileName in negTrainFileNames:
      example = self.Example()
      example.words = self.readFile('%s/neg/%s' % (trainDir, fileName))
      example.klass = 'neg'
      split.train.append(example)
    return split

  def train(self, split):
    for example in split.train:
      words = example.words
      if self.FILTER_STOP_WORDS:
        words =  self.filterStopWords(words)
      self.addExample(example.klass, words)

  def crossValidationSplits(self, trainDir):
    """Returns a lsit of TrainSplits corresponding to the cross validation splits."""
    splits = [] 
    posTrainFileNames = os.listdir('%s/pos/' % trainDir)
    negTrainFileNames = os.listdir('%s/neg/' % trainDir)
    #for fileName in trainFileNames:
    for fold in range(0, self.numFolds):
      split = self.TrainSplit()
      for fileName in posTrainFileNames:
        example = self.Example()
        example.words = self.readFile('%s/pos/%s' % (trainDir, fileName))
        example.klass = 'pos'
        if fileName[2] == str(fold):
          split.test.append(example)
        else:
          split.train.append(example)
      for fileName in negTrainFileNames:
        example = self.Example()
        example.words = self.readFile('%s/neg/%s' % (trainDir, fileName))
        example.klass = 'neg'
        if fileName[2] == str(fold):
          split.test.append(example)
        else:
          split.train.append(example)
      splits.append(split)
    return splits


  def test(self, split):
    """Returns a list of labels for split.test."""
    labels = []
    for example in split.test:
      words = example.words
      if self.FILTER_STOP_WORDS:
        words =  self.filterStopWords(words)
      guess = self.classify(words)
      labels.append(guess)
    return labels
  
  def buildSplits(self, args):
    """Builds the splits for training/testing"""
    trainData = [] 
    testData = []
    splits = []
    trainDir = args[0]
    if len(args) == 1: 
      print '[INFO]\tPerforming %d-fold cross-validation on data set:\t%s' % (self.numFolds, trainDir)

      posTrainFileNames = os.listdir('%s/pos/' % trainDir)
      negTrainFileNames = os.listdir('%s/neg/' % trainDir)
      for fold in range(0, self.numFolds):
        split = self.TrainSplit()
        for fileName in posTrainFileNames:
          example = self.Example()
          example.words = self.readFile('%s/pos/%s' % (trainDir, fileName))
          example.klass = 'pos'
          if fileName[2] == str(fold):
            split.test.append(example)
          else:
            split.train.append(example)
        for fileName in negTrainFileNames:
          example = self.Example()
          example.words = self.readFile('%s/neg/%s' % (trainDir, fileName))
          example.klass = 'neg'
          if fileName[2] == str(fold):
            split.test.append(example)
          else:
            split.train.append(example)
        splits.append(split)
    elif len(args) == 2:
      split = self.TrainSplit()
      testDir = args[1]

      if not self.LOAD_TRAINED_DATA:
        print '[INFO]\tTraining on data set:\t%s testing on data set:\t%s' % (trainDir, testDir)
        posTrainFileNames = os.listdir('%s/pos/' % trainDir)
        negTrainFileNames = os.listdir('%s/neg/' % trainDir)
        for fileName in posTrainFileNames:
          example = self.Example()
          example.words = self.readFile('%s/pos/%s' % (trainDir, fileName))
          example.klass = 'pos'
          split.train.append(example)
        for fileName in negTrainFileNames:
          example = self.Example()
          example.words = self.readFile('%s/neg/%s' % (trainDir, fileName))
          example.klass = 'neg'
          split.train.append(example)
      else:
        print '[INFO]\tTraining on saved data set:\t%s testing on data set:\t%s' % (trainDir, testDir)
        self.read_trained(trainDir)

      posTestFileNames = os.listdir('%s/pos/' % testDir)
      negTestFileNames = os.listdir('%s/neg/' % testDir)
      for fileName in posTestFileNames:
        example = self.Example()
        example.words = self.readFile('%s/pos/%s' % (testDir, fileName)) 
        example.klass = 'pos'
        split.test.append(example)
      for fileName in negTestFileNames:
        example = self.Example()
        example.words = self.readFile('%s/neg/%s' % (testDir, fileName)) 
        example.klass = 'neg'
        split.test.append(example)
      splits.append(split)
    return splits
  
  def filterStopWords(self, words):
    """Filters stop words."""
    filtered = []
    for word in words:
      if not word in self.stopList and word.strip() != '':
        filtered.append(word)
    return filtered



def main():
  nb = NaiveBayes()
  (options, args) = getopt.getopt(sys.argv[1:], 'fd')
  if ('-f','') in options:
    nb.FILTER_STOP_WORDS = True
  if ('-d','') in options:
    nb.LOAD_TRAINED_DATA = True

  splits = nb.buildSplits(args)
  avgAccuracy = 0.0
  fold = 0
  for split in splits:
    classifier = NaiveBayes()
    accuracy = 0.0
    for example in split.train:
      words = example.words
      if nb.FILTER_STOP_WORDS:
        words =  classifier.filterStopWords(words)
      classifier.addExample(example.klass, words)

    if nb.LOAD_TRAINED_DATA:
      classifier.positive_examples = nb.positive_examples
      classifier.negative_examples = nb.negative_examples
  
    for example in split.test:
      words = example.words
      if nb.FILTER_STOP_WORDS:
        words =  classifier.filterStopWords(words)
      guess = classifier.classify(words)
      if example.klass == guess:
        accuracy += 1.0
      #classifier.addExample(example.klass, words)
      nb.addExample(example.klass, words)

    classifier.save_trained('trained.raw');

    accuracy = accuracy / len(split.test)
    avgAccuracy += accuracy
    print '[INFO]\tFold %d Accuracy: %f' % (fold, accuracy) 
    fold += 1
  avgAccuracy = avgAccuracy / fold
  print '[INFO]\tAccuracy: %f' % avgAccuracy

if __name__ == "__main__":
    main()