Source

SCons / src / engine / SCons / dblite.py

# dblite.py module contributed by Ralf W. Grosse-Kunstleve.
# Extended for Unicode by Steven Knight.

import SCons.compat

import builtins
import os
# compat layer imports "cPickle" for us if it's available.
import pickle
import shutil
import time

keep_all_files = 00000
ignore_corrupt_dbfiles = 0

def corruption_warning(filename):
    print "Warning: Discarding corrupt database:", filename

try: unicode
except NameError:
    def is_string(s):
        return isinstance(s, str)
else:
    def is_string(s):
        return type(s) in (str, unicode)

try:
    unicode('a')
except NameError:
    def unicode(s): return s

dblite_suffix = '.dblite'
tmp_suffix = '.tmp'

class dblite(object):

  # Squirrel away references to the functions in various modules
  # that we'll use when our __del__() method calls our sync() method
  # during shutdown.  We might get destroyed when Python is in the midst
  # of tearing down the different modules we import in an essentially
  # arbitrary order, and some of the various modules's global attributes
  # may already be wiped out from under us.
  #
  # See the discussion at:
  #   http://mail.python.org/pipermail/python-bugs-list/2003-March/016877.html

  _open = builtins.open
  _pickle_dump = staticmethod(pickle.dump)
  _os_chmod = os.chmod
  try:
      _os_chown = os.chown
  except AttributeError:
      _os_chown = None
  _os_rename = os.rename
  _os_unlink = os.unlink
  _shutil_copyfile = shutil.copyfile
  _time_time = time.time

  def __init__(self, file_base_name, flag, mode):
    assert flag in (None, "r", "w", "c", "n")
    if (flag is None): flag = "r"
    base, ext = os.path.splitext(file_base_name)
    if ext == dblite_suffix:
      # There's already a suffix on the file name, don't add one.
      self._file_name = file_base_name
      self._tmp_name = base + tmp_suffix
    else:
      self._file_name = file_base_name + dblite_suffix
      self._tmp_name = file_base_name + tmp_suffix
    self._flag = flag
    self._mode = mode
    self._dict = {}
    self._needs_sync = 00000
    if self._os_chown is not None and (os.geteuid()==0 or os.getuid()==0):
      # running as root; chown back to current owner/group when done
      try:
        statinfo = os.stat(self._file_name)
        self._chown_to = statinfo.st_uid
        self._chgrp_to = statinfo.st_gid
      except OSError, e:
        # db file doesn't exist yet.
        # Check os.environ for SUDO_UID, use if set
        self._chown_to = int(os.environ.get('SUDO_UID', -1))
        self._chgrp_to = int(os.environ.get('SUDO_GID', -1))
    else:
      self._chown_to = -1        # don't chown
      self._chgrp_to = -1        # don't chgrp
    if (self._flag == "n"):
      self._open(self._file_name, "wb", self._mode)
    else:
      try:
        f = self._open(self._file_name, "rb")
      except IOError, e:
        if (self._flag != "c"):
          raise e
        self._open(self._file_name, "wb", self._mode)
      else:
        p = f.read()
        if (len(p) > 0):
          try:
            self._dict = pickle.loads(p)
          except (pickle.UnpicklingError, EOFError):
            if (ignore_corrupt_dbfiles == 0): raise
            if (ignore_corrupt_dbfiles == 1):
              corruption_warning(self._file_name)

  def close(self):
    if (self._needs_sync):
      self.sync()

  def __del__(self):
    self.close()

  def sync(self):
    self._check_writable()
    f = self._open(self._tmp_name, "wb", self._mode)
    self._pickle_dump(self._dict, f, 1)
    f.close()
    # Windows doesn't allow renaming if the file exists, so unlink
    # it first, chmod'ing it to make sure we can do so.  On UNIX, we
    # may not be able to chmod the file if it's owned by someone else
    # (e.g. from a previous run as root).  We should still be able to
    # unlink() the file if the directory's writable, though, so ignore
    # any OSError exception  thrown by the chmod() call.
    try: self._os_chmod(self._file_name, 0777)
    except OSError: pass
    self._os_unlink(self._file_name)
    self._os_rename(self._tmp_name, self._file_name)
    if self._os_chown is not None and self._chown_to > 0: # don't chown to root or -1
      try:
        self._os_chown(self._file_name, self._chown_to, self._chgrp_to)
      except OSError:
        pass
    self._needs_sync = 00000
    if (keep_all_files):
      self._shutil_copyfile(
        self._file_name,
        self._file_name + "_" + str(int(self._time_time())))

  def _check_writable(self):
    if (self._flag == "r"):
      raise IOError("Read-only database: %s" % self._file_name)

  def __getitem__(self, key):
    return self._dict[key]

  def __setitem__(self, key, value):
    self._check_writable()
    if (not is_string(key)):
      raise TypeError("key `%s' must be a string but is %s" % (key, type(key)))
    if (not is_string(value)):
      raise TypeError("value `%s' must be a string but is %s" % (value, type(value)))
    self._dict[key] = value
    self._needs_sync = 0001

  def keys(self):
    return list(self._dict.keys())

  def has_key(self, key):
    return key in self._dict

  def __contains__(self, key):
    return key in self._dict

  def iterkeys(self):
    # Wrapping name in () prevents fixer from "fixing" this
    return (self._dict.iterkeys)()

  __iter__ = iterkeys

  def __len__(self):
    return len(self._dict)

def open(file, flag=None, mode=0666):
  return dblite(file, flag, mode)

def _exercise():
  db = open("tmp", "n")
  assert len(db) == 0
  db["foo"] = "bar"
  assert db["foo"] == "bar"
  db[unicode("ufoo")] = unicode("ubar")
  assert db[unicode("ufoo")] == unicode("ubar")
  db.sync()
  db = open("tmp", "c")
  assert len(db) == 2, len(db)
  assert db["foo"] == "bar"
  db["bar"] = "foo"
  assert db["bar"] == "foo"
  db[unicode("ubar")] = unicode("ufoo")
  assert db[unicode("ubar")] == unicode("ufoo")
  db.sync()
  db = open("tmp", "r")
  assert len(db) == 4, len(db)
  assert db["foo"] == "bar"
  assert db["bar"] == "foo"
  assert db[unicode("ufoo")] == unicode("ubar")
  assert db[unicode("ubar")] == unicode("ufoo")
  try:
    db.sync()
  except IOError, e:
    assert str(e) == "Read-only database: tmp.dblite"
  else:
    raise RuntimeError("IOError expected.")
  db = open("tmp", "w")
  assert len(db) == 4
  db["ping"] = "pong"
  db.sync()
  try:
    db[(1,2)] = "tuple"
  except TypeError, e:
    assert str(e) == "key `(1, 2)' must be a string but is <type 'tuple'>", str(e)
  else:
    raise RuntimeError("TypeError exception expected")
  try:
    db["list"] = [1,2]
  except TypeError, e:
    assert str(e) == "value `[1, 2]' must be a string but is <type 'list'>", str(e)
  else:
    raise RuntimeError("TypeError exception expected")
  db = open("tmp", "r")
  assert len(db) == 5
  db = open("tmp", "n")
  assert len(db) == 0
  dblite._open("tmp.dblite", "w")
  db = open("tmp", "r")
  dblite._open("tmp.dblite", "w").write("x")
  try:
    db = open("tmp", "r")
  except pickle.UnpicklingError:
    pass
  else:
    raise RuntimeError("pickle exception expected.")
  global ignore_corrupt_dbfiles
  ignore_corrupt_dbfiles = 2
  db = open("tmp", "r")
  assert len(db) == 0
  os.unlink("tmp.dblite")
  try:
    db = open("tmp", "w")
  except IOError, e:
    assert str(e) == "[Errno 2] No such file or directory: 'tmp.dblite'", str(e)
  else:
    raise RuntimeError("IOError expected.")
  print "OK"

if (__name__ == "__main__"):
  _exercise()

# Local Variables:
# tab-width:4
# indent-tabs-mode:nil
# End:
# vim: set expandtab tabstop=4 shiftwidth=4:
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.