Source

whoosh / benchmark / enron.py

from __future__ import division
import os.path, tarfile
from email import message_from_string
from marshal import dump, load
from zlib import compress, decompress

try:
    import xappy
except ImportError:
    pass

from whoosh import analysis, fields
from whoosh.compat import urlretrieve, next
from whoosh.support.bench import Bench, Spec
from whoosh.util import now


# Benchmark class

class Enron(Spec):
    name = "enron"

    enron_archive_url = "http://www.cs.cmu.edu/~enron/enron_mail_082109.tar.gz"
    enron_archive_filename = "enron_mail_082109.tar.gz"
    cache_filename = "enron_cache.pickle"

    header_to_field = {"Date": "date", "From": "frm", "To": "to",
                   "Subject": "subject", "Cc": "cc", "Bcc": "bcc"}

    main_field = "body"
    headline_field = "subject"

    field_order = ("subject", "date", "from", "to", "cc", "bcc", "body")

    cachefile = None

    # Functions for downloading and then reading the email archive and caching
    # the messages in an easier-to-digest format

    def download_archive(self, archive):
        print("Downloading Enron email archive to %r..." % archive)
        t = now()
        urlretrieve(self.enron_archive_url, archive)
        print("Downloaded in ", now() - t, "seconds")

    @staticmethod
    def get_texts(archive):
        archive = tarfile.open(archive, "r:gz")
        while True:
            entry = next(archive)
            archive.members = []
            if entry is None:
                break
            f = archive.extractfile(entry)
            if f is not None:
                text = f.read()
                yield text

    @staticmethod
    def get_messages(archive, headers=True):
        header_to_field = Enron.header_to_field
        for text in Enron.get_texts(archive):
            message = message_from_string(text)
            body = message.as_string().decode("latin_1")
            blank = body.find("\n\n")
            if blank > -1:
                body = body[blank+2:]
            d = {"body": body}
            if headers:
                for k in message.keys():
                    fn = header_to_field.get(k)
                    if not fn: continue
                    v = message.get(k).strip()
                    if v:
                        d[fn] = v.decode("latin_1")
            yield d

    def cache_messages(self, archive, cache):
        print("Caching messages in %s..." % cache)

        if not os.path.exists(archive):
            raise Exception("Archive file %r does not exist" % archive)

        t = now()
        f = open(cache, "wb")
        c = 0
        for d in self.get_messages(archive):
            c += 1
            dump(d, f)
            if not c % 1000: print(c)
        f.close()
        print("Cached messages in ", now() - t, "seconds")

    def setup(self):
        archive = os.path.abspath(os.path.join(self.options.dir, self.enron_archive_filename))
        cache = os.path.abspath(os.path.join(self.options.dir, self.cache_filename))

        if not os.path.exists(archive):
            self.download_archive(archive)
        else:
            print("Archive is OK")

        if not os.path.exists(cache):
            self.cache_messages(archive, cache)
        else:
            print("Cache is OK")

    def documents(self):
        if not os.path.exists(self.cache_filename):
            raise Exception("Message cache does not exist, use --setup")

        f = open(self.cache_filename, "rb")
        try:
            while True:
                self.filepos = f.tell()
                d = load(f)
                yield d
        except EOFError:
            pass
        f.close()

    def whoosh_schema(self):
        ana = analysis.StemmingAnalyzer(maxsize=40, cachesize=None)
        storebody = self.options.storebody
        schema = fields.Schema(body=fields.TEXT(analyzer=ana, stored=storebody),
                               filepos=fields.STORED,
                               date=fields.ID(stored=True),
                               frm=fields.ID(stored=True),
                               to=fields.IDLIST(stored=True),
                               subject=fields.TEXT(stored=True),
                               cc=fields.IDLIST,
                               bcc=fields.IDLIST)
        return schema

    def xappy_indexer_connection(self, path):
        conn = xappy.IndexerConnection(path)
        conn.add_field_action('body', xappy.FieldActions.INDEX_FREETEXT, language='en')
        if self.options.storebody:
            conn.add_field_action('body', xappy.FieldActions.STORE_CONTENT)
        conn.add_field_action('date', xappy.FieldActions.INDEX_EXACT)
        conn.add_field_action('date', xappy.FieldActions.STORE_CONTENT)
        conn.add_field_action('frm', xappy.FieldActions.INDEX_EXACT)
        conn.add_field_action('frm', xappy.FieldActions.STORE_CONTENT)
        conn.add_field_action('to', xappy.FieldActions.INDEX_EXACT)
        conn.add_field_action('to', xappy.FieldActions.STORE_CONTENT)
        conn.add_field_action('subject', xappy.FieldActions.INDEX_FREETEXT, language='en')
        conn.add_field_action('subject', xappy.FieldActions.STORE_CONTENT)
        conn.add_field_action('cc', xappy.FieldActions.INDEX_EXACT)
        conn.add_field_action('bcc', xappy.FieldActions.INDEX_EXACT)
        return conn

    def zcatalog_setup(self, cat):
        from zcatalog import indexes
        for name in ("date", "frm"):
            cat[name] = indexes.FieldIndex(field_name=name)
        for name in ("to", "subject", "cc", "bcc", "body"):
            cat[name] = indexes.TextIndex(field_name=name)

    def process_document_whoosh(self, d):
        d["filepos"] = self.filepos
        if self.options.storebody:
            mf = self.main_field
            d["_stored_%s" % mf] = compress(d[mf], 9)

    def process_result_whoosh(self, d):
        mf = self.main_field
        if mf in d:
            d.fields()[mf] = decompress(d[mf])
        else:
            if not self.cachefile:
                self.cachefile = open(self.cache_filename, "rb")
            filepos = d["filepos"]
            self.cachefile.seek(filepos)
            dd = load(self.cachefile)
            d.fields()[mf] = dd[mf]
        return d

    def process_document_xapian(self, d):
        d[self.main_field] = " ".join([d.get(name, "") for name
                                       in self.field_order])



if __name__=="__main__":
    Bench().run(Enron)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.