Commits

Mechiel Lukkien committed 73d3a57 Draft

first release. import from tarball.

Comments (0)

Files changed (11)

+CC = cc
+LD = cc
+CFLAGS = -g -pthread -Wall
+LDFLAGS = -static -g -pthread -Wall
+LIBS = -lcrypto
+NROFF = nroff -mandoc -Tutf8
+# to remove escape characters, run through col -b
+
+ofiles = pack.o util.o proto.o
+
+.SUFFIXES: .c .o
+.c.o:
+	$(CC) $(CFLAGS) -c $<
+
+all: memventi memventi.0
+
+memventi: $(ofiles) memventi.o
+	$(LD) $(LDFLAGS) -o $@ $(ofiles) memventi.o $(LIBS)
+
+memventi.0: memventi.8
+	$(NROFF) memventi.8 > memventi.0
+
+clean:
+	-rm -f memventi *.o memventi.0
+#!/usr/bin/env python
+
+import sys
+import math
+from types import *
+from pprint import pprint
+
+
+Tplain, Tpow2range, Tpow10range, Tinterval = range(4)
+keys = {
+	"maxdatafile":		Tpow2range,
+	"blocksize":		Tpow2range,
+	"collisioninterval":	Tpow10range,
+
+	"maxinitmem":		Tplain,
+	"maxtotalmem":		Tplain,
+	"minchainentries":	Tpow2range,
+	"minmaxblocksperhead":	Tinterval,
+}
+
+def log2(n):
+	return math.log(n)/math.log(2)
+
+def fromintsuffix(s):
+	suffix = ['', 'k', 'm', 'g', 't']
+	num = ""
+	while s and s[0:1].isdigit():
+		num += s[0:1]
+		s = s[1:]
+	i = int(num)
+	if not s in suffix:
+		raise Exception("invalid integer suffix")
+	i = int(i * math.pow(1024, suffix.index(s)))
+	return i
+
+def toroundsuffix(num):
+	suffix = ['', 'k', 'm', 'g', 't']
+	i = 0
+	while num >= 1024*10:
+		num /= 1024
+		i += 1
+	return str(num)+suffix[i]
+
+
+def mkrange(rangestr, f):
+	val = []
+	for startendstr in rangestr.split(","):
+		if not "-" in startendstr:
+			val.append(fromintsuffix(startendstr))
+		else:
+			startstr, endstr = startendstr.split("-")
+			start = fromintsuffix(startstr)
+			end = fromintsuffix(endstr)
+			while start <= end:
+				val.append(start)
+				start = f(start)
+	return val
+
+def mkpow2range(startendstr):
+	return mkrange(startendstr, lambda x: x*2)
+	
+def mkpow10range(startendstr):
+	return mkrange(startendstr, lambda x: x*10)
+
+def mkinterval(startendstr):
+	if startendstr.startswith("-"):
+		return (0, fromintsuffix(startendstr[1:]))
+	startstr, endstr = startendstr.split("-")
+	return fromintsuffix(startstr), fromintsuffix(endstr)
+
+def mkcfgs(config):
+	cfgs = [{}]
+	for key, value in config.items():
+		if type(value) == NoneType or type(value) == TupleType or type(value) == IntType:
+			for c in cfgs:
+				c[key] = value
+		elif type(value) == ListType:
+			newcfgs = []
+			for elem in value:
+				for c in cfgs:
+					c = c.copy()
+					c[key] = elem
+					newcfgs.append(c)
+			cfgs = newcfgs
+		else:
+			raise Exception("invalid type of value: %r" % type(value))
+	return cfgs
+
+
+def warn(s):
+	print >>sys.stderr, s
+
+def usage(prog):
+	usagestr = "usage: %s maxdatafile start-end blocksize start-end collisioninterval start-end" + \
+		" [maxinitmem size] [maxtotalmem size] [minchainentries start-end] [minmaxblocksperhead start-end]"
+	print >>sys.stderr, usagestr % prog
+	sys.exit(1)
+
+
+def main(prog, *args):
+	required = ["maxdatafile", "blocksize", "collisioninterval"]
+	config = {}
+
+	if len(args) % 2 != 0:
+		usage(prog)
+	while len(args) != 0:
+		key, value, args = args[0], args[1], args[2:]
+		if key not in keys:
+			usage(prog)
+		ktype = keys[key]
+		if ktype == Tplain:
+			value = fromintsuffix(value)
+		elif ktype == Tpow2range:
+			value = mkpow2range(value)
+		elif ktype == Tpow10range:
+			value = mkpow10range(value)
+		elif ktype == Tinterval:
+			value = mkinterval(value)
+		else:
+			raise Exception("internal error")
+
+		config[key] = value
+
+	missing = list(set(required) - set(config.keys()))
+	if len(missing) != 0:
+		warn("missing keys: %s" % ", ".join(missing))
+		usage(prog)
+
+	if not "minchainentries" in config:
+		config["minchainentries"] = [4, 8]
+
+	for k in keys.keys():
+		if not k in config:
+			config[k] = None
+
+	cfgs = mkcfgs(config)
+
+	chainsize = 9
+	results = []
+	for cfg in cfgs:
+		maxdatafile =		cfg["maxdatafile"]
+		blocksize =		cfg["blocksize"]
+		collisioninterval =	cfg["collisioninterval"]
+		maxinitmem =		cfg["maxinitmem"]
+		maxtotalmem =		cfg["maxtotalmem"]
+		minchainentries =	cfg["minchainentries"]
+		minmaxblocksperhead =	cfg["minmaxblocksperhead"]
+		
+		totalblocks = maxdatafile / blocksize
+		addrwidth = int(0.9 + round(log2(maxdatafile), 1))
+		totalscorewidth = int(0.9 + log2(totalblocks)) + int(0.9 + round(abs(log2(1.0/collisioninterval)), 1))
+
+		result = {
+		"config":		cfg,
+		"totalblocks":		toroundsuffix(totalblocks),
+		"addrwidth":		addrwidth,
+		"totalscorewidth":	totalscorewidth,
+		}
+
+		for i in range(1, totalscorewidth+1):
+			headscorewidth = i
+			entryscorewidth = totalscorewidth - headscorewidth
+			if headscorewidth <= 0 or entryscorewidth <= 0:
+				continue
+			nheads = 2**headscorewidth
+			minheadmem = nheads*chainsize
+			entrywidth = entryscorewidth+addrwidth+8
+			chaindatabytes = (entrywidth*minchainentries+ 8 - 1) / 8
+			headunused = int(1.0+(chainsize+chaindatabytes)/2.0)
+			blocksperhead = int(1.0 + float(totalblocks)/nheads)
+			chainsperhead = int(1.0 + float(blocksperhead)/minchainentries)
+			maxmemused = int(minheadmem + nheads*(0.5+chainsperhead)*(chainsize+chaindatabytes))
+			memperblock = float(maxmemused) / totalblocks
+
+			n = totalblocks
+			d = 2**totalscorewidth
+			collisions = int(round(n - d + d*math.pow((float(d)-1)/float(d), n)))
+			# xxx also calculate 2nd order and further collisions
+
+			if maxinitmem != None and minheadmem > maxinitmem:
+				continue
+			if maxtotalmem != None and maxmemused > maxtotalmem:
+				continue
+			if minmaxblocksperhead != None and (blocksperhead < minmaxblocksperhead[0] or blocksperhead > minmaxblocksperhead[1]):
+				continue
+
+			newresult = result.copy()
+			newresult.update({
+			"headscorewidth":	headscorewidth,
+			"entryscorewidth":	entryscorewidth,
+			"headcount":		toroundsuffix(nheads),
+			"blocksperhead":	toroundsuffix(blocksperhead),
+			"entrywidth":		entrywidth,
+			"chaindatasize":	toroundsuffix(chaindatabytes),
+
+			"minmemused":		toroundsuffix(minheadmem),
+			"maxmemused":		toroundsuffix(maxmemused),
+			"unusedmem":		toroundsuffix(nheads*headunused),
+			"collisions":		collisions,
+			"memperblock":		memperblock,
+			"sort":			maxmemused,
+			})
+			results.append(newresult)
+
+	print "results:"
+	results.sort(lambda x, y: cmp(x['sort'], y['sort']))
+	pprint(results)
+		
+
+if __name__ == "__main__":
+	main(*sys.argv)
+/* memventi.c */
+
+typedef unsigned char uchar;
+typedef unsigned long long uvlong;
+typedef ushort uint16;
+typedef uint uint32;
+typedef uvlong uint64;
+
+
+enum {
+	Magicsize	= 4,
+	Tablemagicsize	= 4,
+	Scoresize	= 20,
+	Indexscoresize	= 8,
+	Diskiheadersize	= Indexscoresize+1+6,
+	Diskdheadersize	= Magicsize+Scoresize+1+2,
+
+	Chainentriesmin	= 8,
+	Bufallocsize	= 1*1024*1024,
+	Chainallocn	= 8*1024,
+
+	Datamax		= 56 * 1024,
+	Stringmax	= 1024,
+
+	Headermagic	= 0x2f9d81e5,
+	Indexmagic	= 0x119dab59,
+	Tablemagic	= 0x5dc31acd,
+};
+/*	Tableheadersize = Tablemagisize+1+ */
+
+
+typedef struct DHeader DHeader;
+typedef struct IHeader IHeader;
+
+struct DHeader {
+	uchar score[Scoresize];
+	uchar type;
+	ushort size;
+};
+
+
+struct IHeader {
+	uchar indexscore[Indexscoresize];
+	uchar type;
+	uvlong offset;
+};
+
+
+/* proto.c */
+enum {
+	Rerror		= 1,
+	Tping,
+	Rping,
+	Thello,
+	Rhello,
+	Tgoodbye,
+	Rgoodbye,
+	Tread		= 12,
+	Rread,
+	Twrite,
+	Rwrite,
+	Tsync,
+	Rsync,
+};
+
+typedef struct Vmsg Vmsg;
+
+struct Vmsg {
+	ushort msize;
+	uchar op;
+	uchar tag;
+	uchar score[Scoresize];
+	uchar type;
+	ushort count;
+	char *msg;
+	uchar *data;
+	ushort dsize;
+};
+
+
+/* util.c */
+typedef struct Lock Lock;
+typedef struct RWLock RWLock;
+
+struct Lock {
+	pthread_mutex_t lock;
+};
+
+struct RWLock {
+	pthread_rwlock_t rwlock;
+};
+
+extern int debugflag;
+#define MAX(a, b)	((a) > (b) ? (a) : (b))
+#define MIN(a, b)	((a) < (b) ? (a) : (b))
+#define nil NULL
+#define nelem(p)	((sizeof (p))/sizeof (p)[0])
+#define GET8(p)		((p)[0])
+#define GET16(p)        ((((uint16)((p)[0]))<<8)+((p)[1]))
+#define GET24(p)        ((((uint32)GET8(p))<<16)+GET16((p)+1))
+#define GET32(p)        ((((uint32)GET16(p))<<16)+GET16((p)+2))
+#define GET48(p)        ((((uint64)GET16(p))<<32)+GET32((p)+2))
+#define GET64(p)        ((((uint64)GET32(p))<<32)+GET32((p)+4))
+#define PUT8(p, v)	((p)[0] = v)
+#define PUT16(p, v)     (((p)[0] = (uchar)((v)>>8)), ((p)[1] = (uchar)(v)))
+#define PUT32(p, v)     (PUT16((p), (uint16)((v)>>16)), PUT16((p)+2, (uint16)(v)))
+#define PUT48(p, v)     (PUT16((p), (uint16)((v)>>32)), PUT32((p)+2, (uint32)(v)))
+#define PUT64(p, v)     (PUT32((p), (uint32)((v)>>32)), PUT32((p)+4, (uint32)(v)))
+
+/* pack.c */
+void	unpackiheader(uchar *, IHeader *);
+void	packiheader(uchar *, IHeader *);
+void	toiheader(IHeader *, DHeader *, uvlong);
+char	*unpackdheader(uchar *, DHeader *);
+void	packdheader(uchar *, DHeader *);
+uvlong	getuvlong(uchar *, uint, uint);
+void	putuvlong(uchar *, uvlong, uint, uint);
+
+/* util.c */
+void	sha1(uchar *, uchar *, uint);
+void	*lockedmalloc(ulong);
+void	errsyslog(int, const char *, ...);
+void	errxsyslog(int, const char *, ...);
+void	debug(int, char *, ...);
+uvlong	filesize(int);
+uvlong	roundup(uvlong, uint);
+char *scorestr(uchar *score);
+char *dheaderfmt(DHeader *dh);
+void	*emalloc(ulong);
+void	*trymalloc(ulong);
+void	*erealloc(void *, ulong);
+ssize_t	preadn(int, void *, size_t, off_t);
+ssize_t	writen(int, char *, size_t);
+uvlong	msec(void);
+int	lockinit(Lock *l);
+void	lock(Lock *l);
+void	unlock(Lock *l);
+int	rwlockinit(RWLock *l);
+void	rlock(RWLock *l);
+void	wlock(RWLock *l);
+void	runlock(RWLock *l);
+void	wunlock(RWLock *l);
+
+/* proto.c */
+int	readvmsg(FILE *, Vmsg *, uchar *);
+int	writevmsg(int, Vmsg *, uchar *);
+.\" public domain, by mechiel lukkien, 2007-02-23
+.Dd February 23, 2007
+.Dt memventi 8
+.Os memventi
+.Sh NAME
+.Nm memventi
+.Nd venti daemon with in-memory index
+.Sh SYNOPSIS
+.Nm
+.Op Fl fvD
+.Op Fl r Ar host!port
+.Op Fl w Ar host!port
+.Op Fl i Ar indexfile
+.Op Fl d Ar datafile
+.Ar headscorewidth entryscorewidth addrwidth
+.Sh DESCRIPTION
+.Nm Memventi
+is a small venti daemon that runs on unix.  A venti daemon allows for storing and retrieving data (in blocks of up to 56 KB) with a numeric type and its SHA1 hash (called score) as address, all with a simple protocol on top of TCP.  Once written, data cannot be removed.  If data needs to be stored and has a score that is already present, the data is not written again.  Authentication is not needed, anyone can read any block.  The idea is that the address, the score, is just a shorter version of the "content":  only if you know the content, you can determine the score and if you know the score, you know the content.  A more advanced venti is used by Plan 9 for filesystem block storage and backup storage.
+.Pp
+The data blocks are written sequentially to the specified
+.Ar datafile
+or 
+.Pa data
+by default.  This file is opened append-only, therefore previously written data cannot be modified or removed by memventi.  Each block has a block header prepended to it, the header contains the following information: score of the block and the type of the block (both part of the address) and the length of the data.
+In the
+.Ar indexfile ,
+.Pa index
+by default, a part of the score, the type and the offset of the header of the corresponding data block in the data file, is stored.  This file is loaded into main memory (and verified or fixed to match the data file) at startup and remains in memory throughout operation for fast score lookups.  This contrasts with the Plan 9 venti which reads scores from a big hashtable on index disks (and with a cache) to do lookups of scores.  The scheme used by
+.Nm memventi
+cannot store as many data blocks, because main memory is limited.  It is however, easier to setup and maintain.
+.Pp
+Main memory usage is reduced as much as possible.  For each block, the numeric type is kept in main memory and only just enough of the score, which is 160 bits in total.  For reading, a "hit" of the in-memory score-part often means the data block is found with one disk read (the header in the data file is checked to see whether the entire score matches).  Sometimes, a hit was false and the block in the data file was different (thus, the score might not be present at all).  At other times multiple "hits" are found when reading a score.  In this case, the data blocks for the hits have to be read from data file until the full score is encountered.
+The
+.Ar headscorewidth ,
+.Ar entryscorewidth and
+.Ar addrwidth
+can be specified such that a double hit only occurs once every 1000 scores (when the venti is full), triple hits will occur much less often.
+For writes, if the in-memory index has one or more hits, the disk has to be checked.  If the data is already present, it is not written again.  Otherwise, it is simply appended to the data and index file and an entry put in memory.
+.Pp
+.Ar Headscorewidth
+is the number of bits of the score used for the number of buckets in the lookup table.  For example, 9 bits means there will be 512 buckets (heads) in the lookup table.
+.Ar Entryscorewidth
+is the number of bits of the score used for each entry (one for each data block) in the buckets.
+.Ar Addrwidth
+is the number of bits to use for addressing in the datafile.  Fewer bits results in less memory used, but also reduces the maximum memventi storage capacity.  Appropriate values for these variables can be determined using the program
+.Nm calc.py .
+It returns reasonable values when given a maximum data file size, average block size and collisioninterval (1000 means one of every 1000 scores may have a collision).  More parameters may be specified to further narrow down the right values.
+.Ss Options
+.Bl -tag -width Fl
+.It Fl f
+Do not daemonize, stay in foreground.
+.It Fl v
+Be more verbose (to syslog).
+.It Fl D
+Print debugging information to standard error.
+.It Fl r Ar host!port
+Listen on the specified TCP port, on the specified host.  The port-part (including the exclamation mark) is optional and defaults to 17034.  The connection does not allow writes, only reads.  This can be used to prevent public memventi's to be filled up.
+.It Fl w Ar host!port
+Same as the
+.Fl r
+option, but does allow writing.  If no
+.Fl r
+or
+.Fl w
+options are specified, memventi listens read/write on localhost!17034.
+.It Fl i Ar indexfile
+File to write index entries to,
+.Ar index
+by default.
+.It Fl d Ar datafile
+File to write data blocks to,
+.Ar data
+by default.
+.El
+.Pp
+When a SIGUSR1 is received, a histogram of the lengths of the heads (the number of blocks in the heads) is printed to standard out.  When a SIGUSR2 is received, a histogram of the number of disk accesses needed to fulfill operations is printed to standard out.  The only other statistics are written to syslog at startup, they contain the number of bytes read, whether the index was synchronized and how long startup up took.
+.Pp
+Whenever a problem is encountered, such as an error when writing a block to a file, memventi puts itself in degraded mode.  In this mode, only read operations are handled.  When it goes in degraded mode, a message is written to syslog clearly explaining the problem.
+.Sh SEE ALSO
+From Plan 9 from User Space:
+.Xr vac 1 ,
+.Xr venti 1 ,
+.Xr venti 3 ,
+.Xr vacfs 4 ,
+.Xr venti 7 ,
+.Xr venti 8 ,
+.Pp
+The paper titled
+"Venti: a new approach to archival storage", by Sean Quinlan and Sean Dorward.
+.Pp
+The
+.Nm memventi
+website, http://www.xs4all.nl/~mechiel/projects/memventi/.
+.Sh AUTHORS
+Mechiel Lukkien, <mechiel@xs4all.nl> or <mechiel@ueber.net>.  All files are in the public domain.
+.Sh CAVEATS
+The memory used for the lookup table buckets and entries is mlock-ed so lookups are always fast.  Some systems, notably OpenBSD/i386 do not allow non-root users to mlock memory.
+.Pp
+Data blocks are not compressed.
+.Pp
+Memventi is not optimized for speed.
+.Pp
+There are no tools to aid in data recovery, e.g. checking the data file for consistency and fixing problems.  For now,
+.Nm dd
+should be the biggest help to remove trailing partially written blocks.
+.Pp
+Starting up is slow since the entire table has to be read in memory.
+.Pp
+Calc.py should be explained better.  Some important things:  start-end ranges can be specified as numbers with a prefix such as k, m, g, etc.  Multiple ranges can be specified, each separated by a comma.
+.Pp
+Memventi first binds to addresses, then initializes (involving a fork) and only then starts accepting connections.  On OpenBSD, this does not work:  new connections are not "seen" by the accept.  The soluation is to first initialize and then bind and accept.
+#include "memventi.h"
+
+
+typedef struct Args Args;
+typedef struct Chain Chain;
+typedef struct Netaddr Netaddr;
+
+enum {
+	Listenmax	= 32,
+	Addressesmax	= 16,
+};
+
+enum {
+	Srunning,	/* running normally */
+	Sdegraded,	/* a write error has occured, disallow writes */
+	Sclosing,	/* shutting down, stop handling requests */
+};
+
+
+struct Args {
+	int fd;
+	int allowwrite;
+	uchar *buf;
+};
+
+struct Chain {
+	Chain *next;
+	uchar *data;
+	uchar n;
+} __attribute__((__packed__));
+
+struct Netaddr {
+	char *host;
+	char *port;
+};
+
+static int fflag;
+static int vflag;
+
+static int datafd;
+static int indexfd;
+static uvlong datafilesize;
+static uvlong indexfilesize;
+
+static char *datafile = "data";
+static char *indexfile = "index";
+
+static Chain *heads;
+static ulong nheads;
+static uchar *mem;
+static uchar *memend;
+static uvlong nblocks;
+
+static int headscorewidth;
+static int entryscorewidth;
+static int addrwidth;
+static uvlong endaddr;
+static int mementrysize;
+static uint initheadlen;
+
+static char *defaultport= "17034";
+
+static RWLock htlock[256];
+static Lock disklock;
+static Lock statelock;
+static int state;
+
+static pthread_t readlistenthread[Listenmax];
+static pthread_t writelistenthread[Listenmax];
+static pthread_t syncprocthread;
+static int nreadlistens, nwritelistens;
+
+static uvlong nlookups;
+static uvlong diskhisto[Addressesmax];
+
+
+static uchar zeroscore[Scoresize] = {
+	0xda, 0x39, 0xa3, 0xee, 0x5e, 0x6b, 0x4b, 0xd, 0x32, 0x55,
+	0xbf, 0xef, 0x95, 0x60, 0x18, 0x90, 0xaf, 0xd8, 0x7, 0x9
+};
+
+
+static uvlong
+getaddr(Chain *c, int i)
+{
+	return getuvlong(c->data, 8*c->n+(entryscorewidth+addrwidth)*i+entryscorewidth, addrwidth);
+}
+
+
+static int
+isscore(uchar *score, Chain *c, int i)
+{
+	uvlong n1, n2;
+
+	n1 = getuvlong(score, headscorewidth, entryscorewidth);
+	n2 = getuvlong(c->data, c->n*8+(entryscorewidth+addrwidth)*i, entryscorewidth);
+	return n1 == n2;
+}
+
+
+static int
+istype(uchar type, Chain *c, int i)
+{
+	return c->data[i] == type;
+}
+
+
+static int
+isend(Chain *c, int i)
+{
+	return getaddr(c, i) == endaddr;
+}
+
+
+static void
+disklookuphisto(void)
+{
+	int i;
+
+	printf("disk lookup histogram:\n");
+	printf("count   frequency:\n");
+	for(i = 0; i < nelem(diskhisto); i++) {
+		if(diskhisto[i] != 0)
+			printf("%7d  %llu\n", i, diskhisto[i]);
+	}
+	printf("total memory lookups: %llu\n", nlookups);
+}
+
+
+static ulong
+headcount(Chain *c)
+{
+	ulong n;
+	int i;
+	uchar *p;
+
+	n = 0;
+	for(; c->next != nil; c = c->next)
+		n += c->n;
+
+	p = c->data;
+	for(i = 0; i < c->n; i++)
+		if(isend(c, i))
+			break;
+		else
+			n++;
+	return n;
+}
+
+
+static void
+headhisto(void)
+{
+	ulong *freqs;
+	ulong i, j;
+	ulong lastindex;
+	ulong index;
+
+	freqs = emalloc(sizeof freqs[0]);
+	lastindex = 0;
+	freqs[0] = 0;
+	for(i = 0; i < nelem(htlock); i++)
+		rlock(&htlock[i]);
+	for(i = 0; i < nheads; i++) {
+		index = headcount(&heads[i]);
+		if(index > lastindex)
+			freqs = erealloc(freqs, sizeof freqs[0] * (index+1));
+		for(j = lastindex+1; j <= index; j++)
+			freqs[j] = 0;
+		freqs[index] += 1;
+		if(index > lastindex)
+			lastindex = index;
+	}
+	for(i = 0; i < nelem(htlock); i++)
+		runlock(&htlock[i]);
+
+	printf("head length histogram:\n");
+	printf("count    frequency\n");
+	for(i = 0; i <= lastindex; i++) {
+		if(freqs[i] != 0)
+			printf("%7lu  %10lu\n", i, freqs[i]);
+	}
+	printf("nblocks: %llu\n", nblocks);
+	free(freqs);
+}
+
+
+static int
+lookup(uchar *score, uchar type, uvlong *addr)
+{
+	ulong index;
+	Chain *c;
+	int i, n;
+	uvlong entryaddr;
+
+	nlookups++;
+
+	index = getuvlong(score, 0, headscorewidth);
+
+	n = 0;
+	for(c = &heads[index]; c != nil; c = c->next) {
+		for(i = 0; i < c->n; i++) {
+			entryaddr = getaddr(c, i);
+			if(entryaddr == endaddr)
+				break;
+			if(!istype(type, c, i) || !isscore(score, c, i))
+				continue;
+			if(n >= Addressesmax)
+				return -1;
+			addr[n++] = entryaddr;
+		}
+	}
+	return n;
+}
+
+
+static uchar *
+bufalloc(int n)
+{
+	uchar *p;
+
+	if(n == 0)
+		return nil;
+
+	if(mem == nil || mem+n >= memend) {
+		mem = lockedmalloc(Bufallocsize);
+		if(mem == nil)
+			return nil;
+		memend = mem+Bufallocsize;
+		memset(mem, (uchar)0xff, Bufallocsize);
+	}
+	p = mem;
+	mem += n;
+	return p;
+}
+
+
+static Chain *
+chainalloc(int n)
+{
+	static Chain *b = nil;
+	static Chain *e = nil;
+	Chain *newb;
+	uchar *p;
+
+	if(b == nil || b >= e) {
+		newb = lockedmalloc(sizeof newb[0] * Chainallocn);
+		if(newb == nil)
+			return nil;
+		b = newb;
+		e = &b[Chainallocn];
+	}
+	n = MAX(n, Chainentriesmin);
+	p = bufalloc(roundup(n*mementrysize, 8)/8);
+	if(p == nil)
+		return nil;
+	b->data = p;
+	b->next = nil;
+	b->n = n;
+	return b++;
+}
+
+
+static void
+putentry(Chain *c, int i, uchar *score, uchar type, uvlong addr)
+{
+	int bitoffset;
+	uvlong n;
+
+	c->data[i] = type;
+	bitoffset = 8*c->n+(entryscorewidth+addrwidth)*i;
+	n = getuvlong(score, headscorewidth, entryscorewidth);
+	putuvlong(c->data, n, bitoffset, entryscorewidth);
+	putuvlong(c->data, addr, bitoffset+entryscorewidth, addrwidth);
+}
+
+
+static int
+insert(uchar *score, uchar type, uvlong addr)
+{
+	ulong index;
+	int i;
+	Chain *c;
+	int headlen;
+	int nalloc;
+
+	index = getuvlong(score, 0, headscorewidth);
+
+	c = &heads[index];
+	while(c->next != nil)
+		c = c->next;
+
+	for(i = 0; i < c->n; i++) {
+		if(!isend(c, i))
+			continue;
+		putentry(c, i, score, type, addr);
+		return 1;
+	}
+
+	if(c == &heads[index] && c->n == 0) {
+		c->data = bufalloc(roundup(Chainentriesmin*mementrysize, 8)/8);
+		if(c->data == nil)
+			return 0;
+		c->n = Chainentriesmin;
+	} else {
+		headlen = headcount(&heads[index]);
+		nalloc = MIN(255, MAX(0, (int)initheadlen - headlen));
+		nalloc = 0;
+		c->next = chainalloc(nalloc);
+		if(c->next == nil)
+			return 0;
+		c = c->next;
+	}
+	putentry(c, 0, score, type, addr);
+	return 1;
+}
+
+
+static uvlong
+disklookup(uvlong *addr, int naddr, uchar *score, uchar type, int readdata, uchar *data, DHeader *dh, char **errmsg)
+{
+	int i;
+	char *err;
+	uchar diskscore[Scoresize];
+	uvlong offset;
+	int n;
+	int want;
+
+	diskhisto[naddr] += 1;
+
+	want = Diskdheadersize;
+	if(readdata)
+		want += 8*1024;
+
+	*errmsg = nil;
+	for(i = 0; i < naddr; i++) {
+		offset = addr[i];
+		n = preadn(datafd, data, want, offset);
+		if(n <= 0) {
+			*errmsg = "error reading header";
+			syslog(LOG_WARNING, "disklookup: error reading header for block at offset=%llu, score=%s type=%d: %s",
+				offset, scorestr(score), (int)type, (n < 0) ? strerror(errno) : "end of file");
+			continue;
+		}
+		if(n < Diskdheadersize) {
+			*errmsg = "short read for header";
+			syslog(LOG_WARNING, "disklookup: short read for header for block at offset=%llu, have=%d, score=%s type=%d",
+				offset, n, scorestr(score), (int)type);
+			continue;
+		}
+
+		err = unpackdheader(data, dh);
+		if(err != nil) {
+			*errmsg = err;
+			syslog(LOG_WARNING, "disklookup: unpacking header for block at offset=%llu, score=%s type=%d: %s",
+				offset, scorestr(score), (int)type, *errmsg);
+			continue;
+		}
+
+		if(memcmp(score, dh->score, Scoresize) != 0 || dh->type != type)
+			continue;
+
+		if(readdata) {
+			if(dh->size > n-Diskdheadersize) {
+				n = preadn(datafd, data, dh->size, offset+Diskdheadersize);
+				if(n <= 0) {
+					*errmsg = "disklookup: error reading data";
+					syslog(LOG_WARNING, "error reading data for block at offset=%llu, score=%s type=%d: %s",
+						offset, scorestr(score), (int)type, (n < 0) ? strerror(errno) : "end of file");
+				}
+				if(n != dh->size) {
+					*errmsg = "disklookup: short read for data";
+					syslog(LOG_WARNING, "short read for data for block at offset=%llu, have=%d, score=%s type=%d",
+						offset, n, scorestr(score), (int)type);
+				}
+			} else {
+				memmove(data, data+Diskdheadersize, dh->size);
+			}
+			sha1(diskscore, data, dh->size);
+			if(memcmp(diskscore, score, Scoresize) != 0) {
+				*errmsg = "score on disk invalid";
+				syslog(LOG_ALERT, "disklookup: datafile %s has wrong score (has %s, claims %s) in block at offset=%llu size=%d type=%d",
+					datafile, scorestr(diskscore), scorestr(dh->score), offset, (int)dh->size, (int)dh->type);
+				return ~0ULL;
+			}
+		}
+		return offset;
+	}
+	return ~0ULL;
+}
+
+
+static char *
+indexstore(IHeader *ih)
+{
+	uchar ihbuf[Diskiheadersize];
+	int n;
+	static char errmsg[512];
+
+	packiheader(ihbuf, ih);
+	n = pwrite(indexfd, ihbuf, sizeof ihbuf, indexfilesize);
+	if(n <= 0) {
+		snprintf(errmsg, sizeof errmsg,
+			"indexstore: writing header to indexfile %s at offset=%llu for datafile block at offset=%llu: %s",
+			indexfile, indexfilesize, ih->offset, (n < 0) ? strerror(errno) : "end of file");
+		syslog(LOG_ALERT, "%s", errmsg);
+		return errmsg;
+	}
+	if(n != sizeof ihbuf) {
+		snprintf(errmsg, sizeof errmsg,
+			"indexstore: short write for header to indexfile %s at offset=%llu for datafile block at offset=%llu, "
+			"dangling bytes at end of datafile %s",
+			indexfile, indexfilesize, ih->offset, datafile);
+		syslog(LOG_ALERT, "%s", errmsg);
+		return errmsg;
+	}
+	return nil;
+}
+
+
+static uvlong
+store(DHeader *dh, uchar *data)
+{
+	uchar buf[Diskdheadersize];
+	uvlong offset;
+	int n;
+	IHeader ih;
+	char *errmsg;
+
+	packdheader(buf, dh);
+	offset = datafilesize;
+
+	debug(LOG_DEBUG, "writing data, offset=%llu size=%d", offset, (int)dh->size);
+
+	n = pwrite(datafd, buf, sizeof buf, offset);
+	if(n <= 0) {
+		syslog(LOG_ALERT, "store: writing header to datafile %s, block at offset=%llu, %s: %s",
+			datafile, offset, dheaderfmt(dh), (n < 0) ? strerror(errno) : "end of file");
+		return ~0ULL;
+	}
+	if(n != sizeof buf) {
+		syslog(LOG_ALERT, "store: short write for header, %d dangling bytes at end of datafile %s, block at offset=%llu, %s",
+			n, datafile, offset, dheaderfmt(dh));
+		return ~0ULL;
+	}
+
+	n = pwrite(datafd, data, dh->size, offset+Diskdheadersize);
+	if(n <= 0) {
+		syslog(LOG_ALERT, "store: writing data to datafile %s, block at offset=%llu, %s: %s",
+			datafile, offset, dheaderfmt(dh), (n < 0) ? strerror(errno) : "end of file");
+		return ~0ULL;
+	}
+	if(n != dh->size) {
+		syslog(LOG_ALERT, "store: short write for data, %d dangling bytes at end of datafile %s, "
+			"block at offset=%llu, %s, header for partly written block remains at end of file!",
+			n+Diskdheadersize, datafile, offset, dheaderfmt(dh));
+		return ~0ULL;
+	}
+
+	datafilesize += dh->size+Diskdheadersize;
+
+	toiheader(&ih, dh, offset);
+	errmsg = indexstore(&ih);
+	if(errmsg != nil)
+		return ~0ULL;
+
+	nblocks++;
+	return offset;
+}
+
+
+static int
+safe_lookup(uchar *score, uchar type, uvlong *addr)
+{
+	int n;
+	RWLock *htl;
+
+	htl = &htlock[GET8(score)];
+	rlock(htl);
+	n = lookup(score, type, addr);
+	runlock(htl);
+	return n;
+}
+
+
+static void
+safe_sync(void)
+{
+	lock(&disklock);
+	fsync(datafd);
+	fsync(indexfd);
+	unlock(&disklock);
+}
+
+
+static void
+stateset(int s)
+{
+	lock(&statelock);
+	state = s;
+	unlock(&statelock);
+}
+
+static int
+stateget(void)
+{
+	int s;
+	lock(&statelock);
+	s = state;
+	unlock(&statelock);
+	return s;
+}
+
+
+static char *
+readiheader(uvlong offset, IHeader *ih)
+{
+	uchar ihbuf[Diskiheadersize];
+	int n;
+
+	n = preadn(indexfd, ihbuf, sizeof ihbuf, offset);
+	if(n < 0)
+		return strerror(errno);
+	if(n == 0)
+		return "end of file";
+	if(n != sizeof ihbuf)
+		return "short read";
+	unpackiheader(ihbuf, ih);
+	return nil;
+}
+
+
+static char *
+readblock(uvlong offset, DHeader *dh, uchar *data)
+{
+	int n;
+	uchar dhbuf[Diskdheadersize];
+	static char errmsg[128];
+	char *msg;
+
+	if(offset+Diskdheadersize > datafilesize)
+		return "offset+size lies outside datafile";
+
+	n = preadn(datafd, dhbuf, sizeof dhbuf, offset);
+	if(n < 0) {
+		snprintf(errmsg, sizeof errmsg, "error reading header: %s", strerror(errno));
+		return errmsg;
+	}
+	if(n == 0)
+		return "end of file while reading header";
+	if(n != sizeof dhbuf)
+		return "short read on header";
+
+	msg = unpackdheader(dhbuf, dh);
+	if(msg != nil) {
+		snprintf(errmsg, sizeof errmsg, "parsing header: %s", msg);
+		return errmsg;
+	}
+
+	n = preadn(datafd, data, dh->size, offset+Diskdheadersize);
+	if(n < 0) {
+		snprintf(errmsg, sizeof errmsg, "error reading data: %s", strerror(errno));
+		return errmsg;
+	}
+	if(n == 0)
+		return "end of file while reading data";
+	if(n != dh->size)
+		return "short read on data";
+
+	return nil;
+}
+
+
+static void
+init(void)
+{
+	uvlong end;
+	uvlong ioffset, doffset;
+	IHeader ih;
+	DHeader dh;
+	int n;
+	int i;
+	char *errmsg;
+	uchar data[Datamax];
+	uchar score[Scoresize];
+	uchar ihbuf[Diskiheadersize];
+	uvlong origiblocks;
+	uvlong len;
+	uvlong off;
+	uvlong nindexadded;
+	uvlong start, totalstart;
+	uvlong dataread;
+
+	totalstart = msec();
+
+	datafd = open(datafile, O_RDWR|O_CREAT|O_APPEND, 0600);
+	if(datafd < 0)
+		errsyslog(1, "opening datafile %s", datafile);
+	datafilesize = filesize(datafd);
+	indexfd = open(indexfile, O_RDWR|O_CREAT|O_APPEND, 0600);
+	if(indexfd < 0)
+		errsyslog(1, "opening indexfile %s", indexfile);
+	indexfilesize = filesize(indexfd);
+
+	if(indexfilesize % Diskiheadersize != 0)
+		errxsyslog(1, "indexfile size not multiple of index header size (%d)", (int)Diskiheadersize);
+
+	/* check if last index entry is valid, if any */
+	doffset = 0;
+	if(indexfilesize > 0) {
+		ioffset = indexfilesize-Diskiheadersize;
+		errmsg = readiheader(ioffset, &ih);
+		if(errmsg != nil)
+			errxsyslog(1, "reading last header from index at offset=%llu: %s", ioffset, errmsg);
+
+		if(ih.offset > datafilesize)
+			errxsyslog(1, "last header at offset=%llu in index point past end of datafile at block at offset=%llu",
+				ioffset, ih.offset);
+
+		errmsg = readblock(ih.offset, &dh, data);
+		if(errmsg != nil)
+			errxsyslog(1, "error reading disk block at offset=%llu that indexfile at offset=%llu claims is the last: %s",
+				ih.offset, ioffset, errmsg);
+
+		sha1(score, data, dh.size);
+		if(memcmp(score, dh.score, Scoresize) != 0)
+			errxsyslog(1, "invalid score for block at offset=%llu in datafile, has %s, claims %s",
+				ih.offset, scorestr(score), scorestr(dh.score));
+
+		if(memcmp(ih.indexscore, dh.score, Indexscoresize) != 0)
+			errxsyslog(1, "score in indexfile at offset=%llu does not match score in datafile at block at offset=%llu",
+				ioffset, ih.offset);
+
+		if(ih.type != dh.type)
+			errxsyslog(1, "type in indexfile at offset=%llu does not match score in datafile at block at offset=%llu",
+				ioffset, ih.offset);
+		doffset = ih.offset+Diskdheadersize+dh.size;
+	}
+
+	origiblocks = indexfilesize / Diskiheadersize;
+
+	/* read remaining datafile blocks (that are not in indexfile) and add to indexfile */
+	dataread = 0;
+	ioffset = indexfilesize;
+	nindexadded = 0;
+	start = msec();
+	while(doffset < datafilesize) {
+		errmsg = readblock(doffset, &dh, data);
+		if(errmsg != nil)
+			errxsyslog(1, "error reading block at offset=%llu (for adding to index): %s",
+				doffset, errmsg);
+		sha1(score, data, dh.size);
+		if(memcmp(score, dh.score, Scoresize) != 0)
+			errxsyslog(1, "invalid score for block at offset=%llu in datafile, has %s, claims %s (for adding to index)",
+				doffset, scorestr(score), scorestr(dh.score));
+		toiheader(&ih, &dh, doffset);
+		errmsg = indexstore(&ih);
+		if(errmsg != nil)
+			errxsyslog(1, "could not store newly read datafile block at offset=%llu to indexfile at offset=%llu, %s",
+				doffset, ioffset, dheaderfmt(&dh));
+		indexfilesize += Diskiheadersize;
+		ioffset += Diskiheadersize;
+		doffset += Diskdheadersize+dh.size;
+
+		dataread += Diskdheadersize+dh.size;
+		nindexadded++;
+	}
+	syslog(LOG_NOTICE, "added %llu entries from datafile (%llu bytes in datafile) to indexfile, in %.3fs",
+		nindexadded, dataread, (msec()-start)/1000.0);
+	nblocks = indexfilesize / Diskiheadersize;
+
+	nheads = 1<<headscorewidth;
+	len = nheads * sizeof heads[0];
+	heads = lockedmalloc(len);
+	if(heads == nil)
+		errsyslog(1, "malloc for initial heads, %llu bytes", len);
+	debug(LOG_DEBUG, "%llu bytes allocated for heads", len);
+
+	initheadlen = (nheads > 0) ? nblocks / nheads : 0;
+	len = nheads * roundup(initheadlen*mementrysize, 8)/8;
+	mem = malloc(len);
+	if(mem == nil)
+		errsyslog(1, "no memory for initial index entries");
+	memset(mem, 0xff, len);
+	memend = mem+len;
+	debug(LOG_DEBUG, "%llu bytes allocated for initial entries in index", len);
+
+	for(i = 0; i < nheads; i++) {
+		heads[i].n = 0;
+		heads[i].next = nil;
+		heads[i].data = nil;
+	}
+
+	end = indexfilesize;
+	off = 0;
+	start = msec();
+	while(off < end) {
+		n = pread(indexfd, ihbuf, sizeof ihbuf, off);
+		if(n <= 0)
+			errxsyslog(1, "error reading indexfile offset=%llu", off);
+		if(n != sizeof ihbuf)
+			errxsyslog(1, "short read for indexfile offset=%llu, have=%d want=%d", off,
+				(int)n, (int)sizeof ihbuf);
+			
+		unpackiheader(ihbuf, &ih);
+		if(!insert(ih.indexscore, ih.type, ih.offset))
+			errxsyslog(1, "error inserting in memory for indexfile offset=%llu", off);
+		off += sizeof ihbuf;
+	}
+	syslog(LOG_NOTICE, "init done, %llu bytes for heads, read %llu bytes in %.3fs from index, entire startup in %.3fs",
+		len, indexfilesize, (msec()-start)/1000.0, (msec()-totalstart)/1000.0);
+
+	for(i = 0; i < nelem(diskhisto); i++)
+		diskhisto[i] = 0;
+
+	if(!lockinit(&statelock))
+		errxsyslog(1, "init statelock");
+	if(!lockinit(&disklock))
+		errxsyslog(1, "init disklock");
+	for(i = 0; i < nelem(htlock); i++)
+		if(!rwlockinit(&htlock[i]))
+			errxsyslog(1, "init hash table lock");
+}
+
+
+static void *
+connproc(void *p)
+{
+	int fd;
+	FILE *f;
+	Vmsg in, out;
+	char buf[128];
+	char *l;
+	char handshake[] = "venti-02-simple\n";
+	DHeader dh;
+	int ok, okhdr;
+	int len;
+	int allowwrite;
+	Args *args;
+	int n;
+	uvlong addr;
+	uvlong addrs[Addressesmax];
+	char *errmsg;
+	RWLock *htl;
+	uchar *databuf;
+
+	args = (Args *)p;
+	fd = args->fd;
+	allowwrite = args->allowwrite;
+	databuf = args->buf;
+	free(p);
+
+	in.data = nil;
+
+	debug(LOG_DEBUG, "connproc: started, fd %d", fd);
+
+	f = fdopen(fd, "r");
+	if(f == nil) {
+		debug(LOG_DEBUG, "connproc: fdopen on fd %d: %s", fd, strerror(errno));
+		goto done;
+	}
+
+	if(write(fd, handshake, strlen(handshake)) != strlen(handshake)) {
+		debug(LOG_DEBUG, "error writing protocol handshake: %s", strerror(errno));
+		goto done;
+	}
+
+	l = fgets(buf, sizeof buf, f);
+	if(l == nil || strncmp(l, "venti-02", 8) != 0) {
+		debug(LOG_DEBUG, "error reading protocol handshake or wrong protocol version: %s",
+			ferror(f) ? strerror(errno) : "eof");
+		goto done;
+	}
+	if(l != nil && (len = strlen(l)) > 0 && l[len-1] == '\n')
+		l[len-1] = '\0';
+	debug(LOG_DEBUG, "connproc: have handshake version %s", l);
+
+	if(readvmsg(f, &in, databuf) == 0) {
+		debug(LOG_DEBUG, "error reading hello msg");
+		goto done;
+	}
+	if(in.op != Thello) {
+		debug(LOG_DEBUG, "first message not hello");
+		goto done;
+	}
+	debug(LOG_DEBUG, "connproc: have hello message");
+	out.op = in.op+1;
+	out.tag = in.tag;
+	if(writevmsg(fd, &out, databuf) == 0) {
+		debug(LOG_DEBUG, "error writing hello venti reponse");
+		goto done;
+	}
+	debug(LOG_DEBUG, "connproc: hello response written");
+
+	out.data = nil;
+	for(;;) {
+		free(in.data);
+		in.data = nil;
+		errmsg = nil;
+
+		if(readvmsg(f, &in, databuf) == 0)
+			goto done;
+
+		if(stateget() == Sclosing) {
+			if(in.op == Tgoodbye)
+				goto done;
+			out.op = Rerror;
+			out.msg = "venti shutting down";
+			if(writevmsg(fd, &out, databuf) == 0)
+				debug(LOG_DEBUG, "error writing venti shutdown message");
+			goto done;
+		}
+
+		out.op = in.op+1;
+		out.tag = in.tag;
+		debug(LOG_DEBUG, "connproc: read message");
+
+		switch(in.op) {
+		case Thello:
+			syslog(LOG_INFO, "read Thello after handshake");
+			goto done;
+			break;
+		case Tread:
+			debug(LOG_DEBUG, "request: op=read score=%s type=%d", scorestr(in.score), (int)in.type);
+			if(memcmp(in.score, zeroscore, Scoresize) == 0) {
+				out.data = nil;
+				out.dsize = 0;
+				break;
+			}
+
+			n = safe_lookup(in.score, in.type, addrs);
+			if(n == 0) {
+				out.op = Rerror;
+				out.msg = "no such score/type";
+				break;
+			}
+			if(n == -1) {
+				out.op = Rerror;
+				out.msg = "internal error (too many partial matches)";
+				break;
+			}
+			addr = disklookup(addrs, n, in.score, in.type, 1, databuf, &dh, &errmsg);
+			if(addr == ~0ULL) {
+				out.op = Rerror;
+				out.msg = "error retrieving data";
+				break;
+			}
+
+			if(dh.size > in.count) {
+				out.op = Rerror;
+				out.msg = "data larger than requested";
+			} else {
+				out.data = trymalloc(dh.size);
+				if(out.data == nil) {
+					out.op = Rerror;
+					out.msg = "out of memory";
+					syslog(LOG_WARNING, "connproc: out of memory for read of size %u", (uint)dh.size);
+					break;
+				}
+				memcpy(out.data, databuf, dh.size);
+				out.dsize = dh.size;
+			}
+			break;
+		case Twrite:
+			if(!allowwrite) {
+				out.op = Rerror;
+				out.msg = "no write access";
+				break;
+			}
+			if(stateget() == Sdegraded) {
+				out.op = Rerror;
+				out.msg = "cannot write";
+				break;
+			}
+
+			if(in.dsize == 0) {
+				memcpy(out.score, zeroscore, Scoresize);
+				break;
+			}
+
+			sha1(out.score, in.data, in.dsize);
+			debug(LOG_DEBUG, "request: op=write score=%s type=%d size=%d",
+				scorestr(out.score), (int)in.type, (int)in.dsize);
+
+			htl = &htlock[GET8(out.score)];
+			wlock(htl);
+			n = lookup(out.score, in.type, addrs);
+			if(n == -1) {
+				out.op = Rerror;
+				out.msg = "internal error (too many partial matches)";
+				wunlock(htl);
+				break;
+			}
+			if(n > 0) {
+				addr = disklookup(addrs, n, out.score, in.type, 0, databuf, &dh, &errmsg);
+				if(addr != ~0ULL) {
+					wunlock(htl);
+					break;
+				}
+				if(errmsg != nil) {
+					wunlock(htl);
+					out.op = Rerror;
+					out.msg = "internal error (could not confirm score presence)";
+					break;
+				}
+			}
+			okhdr = -1;
+			memcpy(dh.score, out.score, Scoresize);
+			dh.type = in.type;
+			dh.size = in.dsize;
+
+			if(datafilesize+Diskdheadersize+dh.size >= endaddr) {
+				wunlock(htl);
+				out.op = Rerror;
+				out.msg = "data file is full";
+				break;
+			}
+
+			lock(&disklock);
+			addr = store(&dh, in.data);
+			unlock(&disklock);
+
+			ok = addr != ~0ULL;
+			if(ok)
+				okhdr = insert(out.score, in.type, addr);
+			wunlock(htl);
+
+			if(!ok) {
+				stateset(Sdegraded);
+				out.op = Rerror;
+				out.msg = "error writing block";
+				syslog(LOG_WARNING, "connproc: error writing data, degraded to read-only mode");
+				break;
+			}
+			if(okhdr == 0) {
+				stateset(Sdegraded);
+				out.op = Rerror;
+				out.msg = "out of memory";
+				syslog(LOG_WARNING, "connproc: out of memory for storing index entry, "
+					"data file was written, degraded to read-only mode");
+				break;
+			}
+			break;
+		case Tsync:
+			if(allowwrite)
+				safe_sync();
+			break;
+		case Tgoodbye:
+			if(allowwrite)
+				safe_sync();
+			goto done;
+		default:
+			syslog(LOG_NOTICE, "invalid op %d", in.op);
+			goto done;
+			break;
+		}
+
+		debug(LOG_DEBUG, "connproc: have response for request");
+		if(writevmsg(fd, &out, databuf) == 0) {
+			debug(LOG_DEBUG, "error writing venti response");
+			free(out.data);
+			out.data = nil;
+			goto done;
+		}
+		free(out.data);
+		out.data = nil;
+		debug(LOG_DEBUG, "connproc: response for request written");
+	}
+
+done:
+	free(in.data);
+	free(databuf);
+	if(f != nil)
+		fclose(f);
+	close(fd);
+	debug(LOG_DEBUG, "connproc: done");
+	return nil;
+}
+
+
+static void *
+listenproc(void *p)
+{
+	struct sockaddr_storage addr;
+	socklen_t len;
+	pthread_t *thread;
+	Args *args;
+	int fd;
+	int listenfd, allowwrite;
+
+	args = (Args *)p;
+	listenfd = args->fd;
+	allowwrite = args->allowwrite;
+	free(p);
+
+	pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nil);
+
+	syslog(LOG_NOTICE, "listenproc: accepting connections...");
+	for(;;) {
+		fd = accept(listenfd, (struct sockaddr *)&addr, &len);
+		if(fd < 0)
+			continue;
+
+		thread = nil;
+		args = malloc(sizeof args[0]);
+		if(args == nil)
+			goto error;
+		args->fd = fd;
+		args->allowwrite = allowwrite;
+		args->buf = malloc(Datamax+8);
+		if(args->buf == nil)
+			goto error;
+
+		thread = malloc(sizeof thread[0]);
+		if(thread == nil)
+			goto error;
+		if(pthread_create(thread, nil, connproc, args) != 0)
+			goto error;
+		free(thread);
+		continue;
+
+	error:
+		if(args != nil)
+			free(args->buf);
+		free(args);
+		free(thread);
+		syslog(LOG_WARNING, "listenproc: could not create process: %s", strerror(errno));
+	}
+	return nil;
+}
+
+
+static void *
+syncproc(void *p)
+{
+	pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nil);
+	for(;;) {
+		sleep(10);
+		safe_sync();
+	}
+}
+
+
+static void *
+signalproc(void *p)
+{
+	int sig;
+	sigset_t mask;
+	int i;
+
+	sigemptyset(&mask);
+	sigaddset(&mask, SIGINT);
+	sigaddset(&mask, SIGTERM);
+	sigaddset(&mask, SIGUSR1);
+	sigaddset(&mask, SIGUSR2);
+
+	for(;;) {
+		sig = 0;
+		if(sigwait(&mask, &sig) != 0)
+			syslog(LOG_NOTICE, "sigwait returned an error, sig=%d: %s", sig, strerror(sig));
+
+		switch(sig) {
+		case SIGUSR1:
+			headhisto();
+			break;
+		case SIGUSR2:
+			disklookuphisto();
+			break;
+		case SIGINT:
+		case SIGTERM:
+			stateset(Sclosing);
+			syslog(LOG_INFO, "closing down");
+			for(i = 0; i < nreadlistens; i++)
+				pthread_cancel(readlistenthread[i]);
+			for(i = 0; i < nwritelistens; i++)
+				pthread_cancel(writelistenthread[i]);
+
+			for(i = 0; i < nelem(htlock); i++)
+				wlock(&htlock[i]);
+			lock(&disklock);
+			pthread_cancel(syncprocthread);
+			fsync(datafd);
+			fsync(indexfd);
+			syslog(LOG_NOTICE, "data and index flushed, exiting...");
+			exit(0);
+			break;
+		default:
+			syslog(LOG_NOTICE, "unexpected signal (%d)", sig);
+		}
+	}
+}
+
+
+int
+dobind(Netaddr *netaddr)
+{
+	struct sockaddr bound;
+	socklen_t boundlen;
+	int gaierr;
+	struct addrinfo *addrs;
+        struct addrinfo localhints = { AI_PASSIVE, PF_INET, SOCK_STREAM, 0, 0, NULL, NULL, NULL };
+	int listenfd;
+
+	listenfd = socket(PF_INET, SOCK_STREAM, 0);
+	if(listenfd < 0)
+		errsyslog(1, "socket");
+
+	gaierr = getaddrinfo(netaddr->host, netaddr->port, &localhints, &addrs);
+	if(gaierr)
+		errxsyslog(1, "getaddrinfo: %s", gai_strerror(gaierr));
+
+	boundlen = addrs->ai_addrlen;
+	memcpy(&bound, addrs->ai_addr, boundlen);
+	freeaddrinfo(addrs);
+
+	if(bind(listenfd, &bound, boundlen) != 0)
+		errsyslog(1, "bind");
+
+	if(listen(listenfd, 1) != 0)
+		errsyslog(1, "listen");
+
+	return listenfd;
+}
+
+
+void
+startlisten(pthread_t *thread, int listenfd,  int allowwrite)
+{
+	Args *args;
+
+	args = emalloc(sizeof args[0]);
+	args->fd = listenfd;
+	args->allowwrite = allowwrite;
+	if(pthread_create(thread, nil, listenproc, args) != 0)
+		errsyslog(1, "error creating listenproc");
+}
+
+
+static void
+usage(void)
+{
+	fprintf(stderr, "usage: memventi [-fvD] [-r host!port] [-w host!port] [-i indexfile] [-d datafile] headscorewidth entryscorewidth addrwidth\n");
+	exit(1);
+}
+
+
+int
+main(int argc, char *argv[])
+{
+	int ch;
+	sigset_t mask;
+	Netaddr readaddrs[Listenmax];
+	Netaddr writeaddrs[Listenmax];
+	Netaddr *netaddr;
+	int readfds[Listenmax];
+	int writefds[Listenmax];
+	int i;
+
+	fflag = 0;
+	vflag = 0;
+	nreadlistens = nwritelistens = 0;
+	while((ch = getopt(argc, argv, "Dfvd:i:r:w:")) != -1) {
+		switch(ch) {
+		case 'D':
+			debugflag = 1;
+			break;
+		case 'd':
+			datafile = optarg;
+			break;
+		case 'f':
+			fflag = 1;
+			break;
+		case 'i':
+			indexfile = optarg;
+			break;
+		case 'r':
+			if(nreadlistens == nelem(readaddrs))
+				errxsyslog(1, "too many read-only hosts specified");
+			netaddr = &readaddrs[nreadlistens++];
+			netaddr->port = strrchr(optarg, '!');
+			if(netaddr->port != nil)
+				*netaddr->port++ = '\0';
+			else
+				netaddr->port = defaultport;
+			netaddr->host = optarg;
+			break;
+		case 'w':
+			if(nreadlistens == nelem(writeaddrs))
+				errxsyslog(1, "too many read/write hosts specified");
+			netaddr = &writeaddrs[nwritelistens++];
+			netaddr->port = strrchr(optarg, '!');
+			if(netaddr->port != nil)
+				*netaddr->port++ = '\0';
+			else
+				netaddr->port = defaultport;
+			netaddr->host = optarg;
+			break;
+		case 'v':
+			vflag = 1;
+			break;
+		default:
+			usage();
+		}
+	}
+	argc -= optind;
+	argv += optind;
+
+	if(argc != 3)
+		usage();
+
+	headscorewidth = atoi(argv[0]);
+	entryscorewidth = atoi(argv[1]);
+	addrwidth = atoi(argv[2]);
+	if(headscorewidth <= 0 || entryscorewidth <= 0 || addrwidth <= 0)
+		usage();
+	if(headscorewidth + entryscorewidth > Indexscoresize*8)
+		errxsyslog(1, "too many bits in head and per entry, maximum is %d", Indexscoresize*8);
+	endaddr = (1ULL<<addrwidth)-1;
+	mementrysize = 8+entryscorewidth+addrwidth;
+
+	if(nreadlistens == 0 && nwritelistens == 0) {
+		writeaddrs[0].host = "localhost";
+		writeaddrs[0].port = defaultport;
+		nwritelistens++;
+	}
+
+	openlog("memventi", LOG_CONS|(fflag ? LOG_PERROR : 0), LOG_DAEMON);
+	setlogmask(LOG_UPTO(vflag ? LOG_DEBUG : LOG_NOTICE));
+
+	for(i = 0; i < nreadlistens; i++)
+		readfds[i] = dobind(&readaddrs[i]);
+	for(i = 0; i < nwritelistens; i++)
+		writefds[i] = dobind(&writeaddrs[i]);
+
+	sigemptyset(&mask);
+	sigaddset(&mask, SIGPIPE);
+	if(pthread_sigmask(SIG_BLOCK, &mask, NULL) != 0)
+		errsyslog(1, "pthread_sigmask");
+
+	init();
+	stateset(Srunning);
+
+	if(!fflag)
+		if(daemon(1, debugflag ? 1 : 0) != 0)
+			errsyslog(1, "could not daemonize");
+
+	for(i = 0; i < nreadlistens; i++)
+		startlisten(&readlistenthread[i], readfds[i], 0);
+	for(i = 0; i < nwritelistens; i++)
+		startlisten(&writelistenthread[i], writefds[i], 1);
+	if(pthread_create(&syncprocthread, nil, syncproc, nil) != 0)
+		errsyslog(1, "error creating syncproc");
+
+	sigaddset(&mask, SIGINT);
+	sigaddset(&mask, SIGTERM);
+	sigaddset(&mask, SIGUSR1);
+	sigaddset(&mask, SIGUSR2);
+	if(pthread_sigmask(SIG_BLOCK, &mask, NULL) != 0)
+		errsyslog(1, "pthread_sigmask");
+	signalproc(nil);
+	return 0;
+}
+#define _FILE_OFFSET_BITS 64	/* sigh, for gnu libc */
+#define _BSD_SOURCE
+#define _XOPEN_SOURCE 600
+
+#include <sys/types.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <sys/time.h>
+#include <sys/socket.h>
+
+#include <assert.h>
+#include <err.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <limits.h>
+#include <string.h>
+#include <time.h>
+#include <unistd.h>
+#include <signal.h>
+#include <syslog.h>
+#include <stdarg.h>
+#include <inttypes.h>
+#include <pthread.h>
+
+#include <openssl/sha.h>
+
+/* openbsd defines these in sys/param.h */
+#undef roundup
+#undef MAX
+#undef MIN
+
+#include "dat.h"
+#include "fns.h"
+#include "memventi.h"
+
+
+void
+unpackiheader(uchar *buf, IHeader *ih)
+{
+	memcpy(ih->indexscore, buf, Indexscoresize);
+	buf += Indexscoresize;
+	ih->type = GET8(buf);
+	buf += 1;
+	ih->offset = GET48(buf);
+	buf += 6;
+}
+
+
+void
+packiheader(uchar *buf, IHeader *ih)
+{
+	memcpy(buf, ih->indexscore, Indexscoresize);
+	buf += Indexscoresize;
+	PUT8(buf, ih->type);
+	buf += 1;
+	PUT48(buf, ih->offset);
+	buf += 6;
+}
+
+
+void
+toiheader(IHeader *ih, DHeader *dh, uvlong offset)
+{
+	memcpy(ih->indexscore, dh->score, Indexscoresize);
+	ih->type = dh->type;
+	ih->offset = offset;
+}
+
+
+char *
+unpackdheader(uchar *buf, DHeader *dh)
+{
+	uint magic;
+	static char errmsg[128];
+
+	magic = GET32(buf);
+	if(magic != Headermagic) {
+		snprintf(errmsg, sizeof errmsg, "invalid data, magic wrong (have 0x%x, want 0x%x)", magic, Headermagic);
+		return errmsg;
+	}
+	buf += 4;
+	memcpy(dh->score, buf, Scoresize);
+	buf += Scoresize;
+	dh->type = GET8(buf);
+	buf += 1;
+	dh->size = GET16(buf);
+	buf += 2;
+	if(dh->size > Datamax)
+		return "size too large";
+	return nil;
+}
+
+
+void
+packdheader(uchar *buf, DHeader *dh)
+{
+	PUT32(buf, Headermagic);
+	buf += 4;
+	memcpy(buf, dh->score, Scoresize);
+	buf += Scoresize;
+	PUT8(buf, dh->type);
+	buf += 1;
+	PUT16(buf, dh->size);
+	buf += 2;
+}
+
+
+uvlong
+getuvlong(uchar *data, uint bitoffset, uint bits)
+{
+	uvlong r;
+	int bbits;
+	int boff;
+	uchar *p;
+
+	r = 0;
+	p = &data[bitoffset / 8];
+
+	while(bits > 0) {
+		boff = bitoffset % 8;
+		bbits = MIN(bits, 8-boff);
+		r = (r<<bbits) | ((p[0]>>(8-bbits-boff)) & ((1<<bbits) - 1));
+		bitoffset += bbits;
+		bits -= bbits;
+		p++;
+	}
+	return r;
+}
+
+
+void
+putuvlong(uchar *data, uvlong addr, uint bitoffset, uint bits)
+{
+	uchar *p;
+	uchar v;
+	int boff, bbits;
+
+	p = &data[bitoffset/8];
+	while(bits > 0) {
+		boff = bitoffset % 8;
+		bbits = MIN(bits, 8-boff);
+		v = p[0];
+		p[0] = v & ~((1<<(8-boff))-1);
+		p[0] |= ((addr>>(bits-bbits)) & ~(1<<bbits)) << (8-boff-bbits);
+		p[0] |= v & ((1<<(8-boff-bbits))-1);
+		bitoffset += bbits;
+		bits -= bbits;
+		p++;
+	}
+}
+#include "memventi.h"
+
+
+static int
+readstr(uchar **bufp, uchar *end, uchar *s, int slen)
+{
+	ushort len;
+	uchar *buf;
+
+	buf = *bufp;
+
+	if(buf+2 > end)
+		return 0;
+
+	len = GET16(buf);
+	buf += 2;
+	if(s != nil) {
+		if(buf+len > end || len+1 > slen)
+			return 0;
+		memcpy(s, buf, len);
+		s[len] = '\0';
+	}
+	buf += len;
+	*bufp = buf;
+	return 1;
+}
+
+static int
+readmem(uchar **bufp, uchar *end)
+{
+	ushort len;
+	uchar *buf;
+
+	buf = *bufp;
+
+	if(buf+1 > end)
+		return 0;
+	len = GET8(buf);
+	buf += 1;
+	if(buf+len > end)
+		return 0;
+	buf += len;
+	*bufp = buf;
+	return 1;
+}
+
+
+static void
+writestr(uchar *p, char *s, int *lenp)
+{
+	int slen;
+
+	slen = strlen(s);
+	PUT16(p, slen);
+	p += 2;
+	memcpy(p, s, slen);
+	p += slen;
+	*lenp = 2+slen;
+}
+
+
+int
+readvmsg(FILE *f, Vmsg *m, uchar *buf)
+{
+	uchar *p;
+	uchar *end;
+
+	debug(LOG_DEBUG, "readvmsg: starting read");
+	if(fread(buf, 1, 2, f) != 2)
+		return 0;
+	m->msize = GET16(buf);
+	if(m->msize >= 8+Datamax)
+		return 0;
+
+	debug(LOG_DEBUG, "readvmsg: incoming message of %u bytes", (uint)m->msize);
+
+	if(fread(buf, 1, m->msize, f) != m->msize)
+		return 0;
+	if(m->msize < 2)
+		return 0;
+	end = buf + m->msize;
+	debug(LOG_DEBUG, "readvmsg: incoming message read");
+
+	p = buf;
+	m->op = GET8(p);
+	p += 1;
+	m->tag = GET8(p);
+	p += 1;
+
+	m->data = nil;
+	switch(m->op) {
+	case Thello:
+		if(readstr(&p, end, nil, 0) == 0)
+			return 0;
+		if(readstr(&p, end, nil, 0) == 0)
+			return 0;
+		if(p+1 > end)
+			return 0;
+		p += 1;
+
+		if(readmem(&p, end) == 0)	/* crypto */
+			return 0;
+		if(readmem(&p, end) == 0)	/* codec */
+			return 0;
+		break;
+	case Tread:
+		if(p+Scoresize+1+1+2 != end)
+			return 0;
+		memcpy(m->score, p, Scoresize);
+		p += Scoresize;
+		m->type = GET8(p);
+		p += 1;
+		p += 1;
+		m->count = GET16(p);
+		p += 2;
+		break;
+	case Twrite:
+		if(p+1+3 > end)
+			return 0;
+		m->type = GET8(p);
+		p += 1;
+		p += 3;
+		m->dsize = m->msize - 6;
+		m->data = trymalloc(m->dsize);
+		if(m->data == nil) {
+			debug(LOG_DEBUG, "readvmsg: out of memory for write of %u bytes", (uint)m->dsize);
+			return 0;
+		}
+		memcpy(m->data, p, m->dsize);
+		break;
+	case Tping:
+	case Tsync:
+	case Tgoodbye:
+		break;
+	default:
+		return 0;
+	}
+	return 1;
+}
+
+
+int
+writevmsg(int fd, Vmsg *m, uchar *buf)
+{
+	uchar *p;
+	int len;
+	int r, n;
+
+	p = buf+4;
+	switch(m->op) {
+	case Rhello:
+		writestr(p, "anonymous", &len);
+		p += len;
+		PUT8(p, 0);
+		p += 1;
+		PUT8(p, 0);
+		p += 1;
+
+		m->msize = 2+len+2;
+		break;
+	case Rread:
+		memcpy(p, m->data, m->dsize);
+		p += m->dsize;
+
+		m->msize = 2+m->dsize;
+		break;
+	case Rwrite:
+		memcpy(p, m->score, Scoresize);
+		p += Scoresize;
+
+		m->msize = 2+Scoresize;
+		break;
+	case Rerror:
+		writestr(p, m->msg, &len);
+		p += len;
+
+		m->msize = 2+len;
+		break;
+	case Rping:
+	case Rsync:
+		m->msize = 2;
+		break;
+	default:
+		syslog(LOG_EMERG, "writevmsg: missing case for op %d", m->op);
+		abort();
+		return 0;
+	}
+
+	p = buf;
+	PUT16(p, m->msize);
+	p += 2;
+	PUT8(p, m->op);
+	p += 1;
+	PUT8(p, m->tag);
+	p += 1;
+
+
+	n = 2+m->msize;
+	debug(LOG_DEBUG, "writevmsg: writing op %d msize %d", m->op, n);
+	r = writen(fd, (char *)buf, n);
+	if(r != n) {
+		debug(LOG_DEBUG, "writen: wrote %d instead of %d", r, n);
+		return 0;
+	}
+
+	return 1;
+}
+- read 16 last disk entries (using index offset as start) and verify
+- multiple procs for accessing datafile, reads concurrent, stores queued.
+- try to read index faster
+- speed up fixing index file, queue entries to write, queue blocks that have been written?
+- tool to find last valid lump and possibly invalid remainder (for half write during crash/power outage) 
+- look at protocol handling
+- to find duplicates: readdata | sed 's/.* score=\([^ ]*\).*/\1/' | sort | uniq -d (need to make readdata.c again)
+- compression of blocks?
+- multiple threads per connection
+- make lock for diskhisto
+- see if recovery is okay by killing a running memventi and restarting it
+- see how much overhead looking through data in a chain is.  up to how many entries in a head can we handle?
+#include "memventi.h"
+
+
+int debugflag = 0;
+
+
+void
+sha1(uchar *score, uchar *data, uint len)
+{
+	SHA1(data, len, score);
+}
+
+void *
+lockedmalloc(ulong len)
+{
+	void *p, *alignedp;
+	long pagesize;
+
+	pagesize = sysconf(_SC_PAGESIZE);
+	if(pagesize == -1)
+		errsyslog(1, "sysconf pagesize");
+	len = roundup(len, pagesize);
+	p = malloc(len+pagesize-1);
+	if(p == nil)
+		return nil;
+	alignedp = (void*)(((uintptr_t)p + pagesize - 1)&~(pagesize-1));
+	if(mlock(alignedp, len) != 0) {
+		syslog(LOG_WARNING, "mlock failed on memory of len=%lu", len);
+		free(p);
+		return nil;
+	}
+	debug(LOG_DEBUG, "lockedmalloc, %lu bytes allocated", len);
+	return alignedp;
+}
+
+void
+errsyslog(int eval, const char *fmt, ...)
+{
+	va_list ap;
+	char msg[256];
+	int len;
+	char *p;
+
+	va_start(ap, fmt);
+	vsnprintf(msg, sizeof msg, fmt, ap);
+	va_end(ap);
+
+	len = strlen(msg);
+	if(len+1 < sizeof msg) {
+		p = msg+len;
+		snprintf(p, sizeof msg-len, ": %s", strerror(errno));
+	}
+	syslog(LOG_ERR, "%s", msg);
+	exit(eval);
+}
+
+
+void
+errxsyslog(int eval, const char *fmt, ...)
+{
+	va_list ap;
+
+	va_start(ap, fmt);
+	vsyslog(LOG_ERR, fmt, ap);
+	va_end(ap);
+	exit(eval);
+}
+
+
+void
+debug(int level, char *fmt, ...)
+{
+
+	va_list ap;
+	char errmsg[256];
+
+	if(!debugflag)
+		return;
+
+	va_start(ap, fmt);
+	vsnprintf(errmsg, sizeof errmsg, fmt, ap);
+	fprintf(stderr, "debug: %s\n", errmsg);
+	va_end(ap);
+}
+
+
+uvlong
+filesize(int fd)
+{
+	struct stat sb;
+	if(fstat(fd, &sb) < 0)
+		errsyslog(1, "fstat fd=%d", fd);
+	return sb.st_size;
+}
+
+
+uvlong
+roundup(uvlong n, uint round)
+{
+	assert((round & (round-1)) == 0);
+	return (n+round-1)&~(round-1);
+}
+
+
+char *
+dheaderfmt(DHeader *dh)
+{
+	static char buf[3][128];
+	static int i = 0;
+	char *p;
+
+	p = buf[i];
+	i = (i+1) % nelem(buf);
+
+	snprintf(p, sizeof buf[0], "score=%s type=%d size=%d", scorestr(dh->score), (int)dh->type, (int)dh->size);
+
+	return p;
+}
+
+
+static char
+hex(uchar c)
+{
+	if(c <= 9)
+		return '0'+c;
+	if(c <= 0xf)
+		return 'a'+c-0xa;
+	errx(1, "wrong value (%u)", (uint)c);
+}
+
+
+char *
+scorestr(uchar *score)
+{
+	static char bufs[3][2*Scoresize+1];
+	static int index = 0;
+	int i;
+	char *s;
+
+	s = bufs[index];
+	index = (index+1) % 3;
+
+	for(i = 0; i < Scoresize; i++) {
+		s[2*i] = hex(score[i]>>4);
+		s[2*i+1] = hex(score[i]&0xf);
+	}
+	s[2*Scoresize] = 0;
+
+	return s;
+}
+
+
+void *
+emalloc(ulong len)
+{
+	void *p;
+	p = malloc(len);
+	if(p == nil)
+		errsyslog(1, "malloc");
+	return p;
+}
+
+
+void *
+trymalloc(ulong len)
+{
+	void *p;
+	int i;
+	for(i = 0; i < 10; i++) {
+		p = malloc(len);
+		if(p != nil)
+			return p;
+		sleep(1);
+	}
+	return nil;
+}
+
+
+void *
+erealloc(void *p, ulong len)
+{
+	p = realloc(p, len);
+	if(p == nil)
+		errsyslog(1, "realloc");
+	return p;
+}
+
+
+ssize_t
+preadn(int fd, void *buf, size_t count, off_t offset)
+{
+	ssize_t have, r;
+
+	have = 0;
+	while(count > have) {
+		r = pread(fd, buf, count-have, offset+have);
+		if(r < 0)
+			return r;
+		if(r == 0)
+			break;
+		have += r;
+	}
+	return have;
+}
+
+
+ssize_t
+writen(int fd, char *buf, size_t len)
+{
+	int n, r;
+
+	n = len;
+	while(n > 0) {
+		r = write(fd, buf, n);
+		if(r < 0)
+			return r;
+		n -= r;
+		buf += r;
+	}
+	return len;
+}
+
+
+uvlong
+msec(void)
+{
+	struct timeval tv;
+	gettimeofday(&tv, nil);
+	return tv.tv_sec * 1000 + tv.tv_usec/1000;
+}
+
+
+int
+lockinit(Lock *l)
+{
+	return pthread_mutex_init(&l->lock, nil) == 0;
+}
+
+void
+lock(Lock *l)
+{
+	pthread_mutex_lock(&l->lock);
+}
+
+void
+unlock(Lock *l)
+{
+	pthread_mutex_unlock(&l->lock);
+}
+
+
+int