Commits

Mechiel Lukkien  committed ccafbaf Draft

move code for "streaming in" changes from hg/pull to library.

Revlog.xappend is changed too, to make the caller supply the new
nodeid. it shouldn't be called if the nodeid is present already.

  • Participants
  • Parent commits f64ee3a

Comments (0)

Files changed (5)

 - read up on all the formats.  dirstate, undo.dirstate undo.branch
   (for rollback), journal.dirstate, journal.branch, wlock;  lock,
   journal, fncache, etc.;  http://mercurial.selenic.com/wiki/FileFormats
-- hg/commit: split into .d files at 128k size of .i
 
 - cgi/websrv: test with various client versions
 

File appl/cmd/hg/commit.b

 		if(ok != 0)
 			error(sprint("stat %q: %r", f));
 
-		rl := repo.xopenrevlog(path);
-
 		fp1 := fp2 := hg->nullnode;
 		if((mf1 := m1.find(path)) != nil)
 			fp1 = mf1.nodeid;
 		if((mf2 := m2.find(path)) != nil)
 			fp2 = mf2.nodeid;
 
+		rl := repo.xopenrevlog(path);
+
+		nodeid := hg->xcreatenodeid(buf, fp1, fp2);
+		if(rl.xfindnodeid(nodeid, 0) != nil)
+			continue;
+
 		say(sprint("adding to revlog for file %#q, fp1 %s, fp2 %s", path, fp1, fp2));
-		ne := rl.xappend(repo, tr, fp1, fp2, link, buf);
+		ne := rl.xappend(repo, tr, nodeid, fp1, fp2, link, buf);
 		filenodeids[i] = ne.nodeid;
 		say(sprint("file now at nodeid %s", ne.nodeid));
 
 	say("adding to manifest");
 	ml := repo.xmanifestlog();
 	mbuf := m.xpack();
-	me := ml.xappend(repo, tr, m1.nodeid, m2.nodeid, link, mbuf);
+	mnodeid := hg->xcreatenodeid(mbuf, m1.nodeid, m2.nodeid);
+	if(ml.xfindnodeid(mnodeid, 0) == nil)
+		ml.xappend(repo, tr, mnodeid, m1.nodeid, m2.nodeid, link, mbuf);
 
 	say("adding to changelog");
 	cl := repo.xchangelog();
-	cmsg := sprint("%s\n%s\n%d %d\n%s\n\n%s", me.nodeid, user, now, tzoff, join(rev(modfiles), "\n"), msg);
+	cmsg := sprint("%s\n%s\n%d %d\n%s\n\n%s", mnodeid, user, now, tzoff, join(rev(modfiles), "\n"), msg);
 	say(sprint("change message:"));
 	say(cmsg);
-	ce := cl.xappend(repo, tr, ds.p1, ds.p2, link, array of byte cmsg);
+	cbuf := array of byte cmsg;
+	cnodeid := hg->xcreatenodeid(cbuf, ds.p1, ds.p2);
+	ce := cl.xappend(repo, tr, cnodeid, ds.p1, ds.p2, link, cbuf);
 
 	nds.p1 = ce.nodeid;
 	repo.xwritedirstate(nds);

File appl/cmd/hg/pull.b

 	if(b == nil)
 		error("fopen");
 
-	warn("adding changesets");
-	cl := repo.xchangelog();
-	(chtab, nchangesets) := revlogwrite(b, cl, 1, nil);
-
-	warn("adding manifests");
-	ml := repo.xmanifestlog();
-	revlogwrite(b, ml, 0, chtab);
-	
-	warn("adding file changes");
-	nfiles := 0;
-	nchanges := 0;
-	for(;;) {
-		i := bg32(b);
-		if(i == 0)
-			break;
-
-		namebuf := breadn(b, i-4);
-		name := string namebuf;
-		rl := repo.xopenrevlog(name);
-		(nil, nn) := revlogwrite(b, rl, 0, chtab);
-		nfiles++;
-		nchanges += nn;
-	}
-
-	case b.getc() {
-	Bufio->EOF =>	;
-	Bufio->ERROR =>	error(sprint("error reading end of changegroup: %r"));
-	* =>		error(sprint("data past end of changegroup..."));
-	}
-
-	warn(sprint("added %d changesets with %d changes to %d files", nchangesets, nchanges, nfiles));
+	hg->xstreamin(repo, b);
 }
 
 isknown(n: string): int
 	return s[:12];
 }
 
-revlogwrite(b: ref Iobuf, rl: ref Revlog, ischlog: int, chtab: ref Strhash[ref Entry]): (ref Strhash[ref Entry], int)
-{
-	tab := Strhash[ref Entry].new(31, nil);
-	ents := rl.xentries();
-	for(i := 0; i < len ents; i++)
-		tab.add(ents[i].nodeid, ents[i]);
-
-	if(ischlog)
-		chtab = tab;
-
-	indexonly := rl.isindexonly();
-	ipath := rl.path+".i";
-	repo.xensuredirs(ipath);
-	ib := hg->xbopencreate(ipath, Sys->OWRITE, 8r666);
-	if(ib.seek(big 0, Bufio->SEEKEND) < big 0)
-		error(sprint("seek %q: %r", ipath));
-	db: ref Iobuf;
-	if(!indexonly) {
-		dpath := rl.path+".d";
-		db = hg->xbopencreate(dpath, Sys->OWRITE, 8r666);
-		if(ib.seek(big 0, Bufio->SEEKEND) < big 0)
-			error(sprint("seek %q: %r", dpath));
-	}
-
-	base := firstrev := nents := len ents;
-	current: array of byte;
-	offset := big 0;
-	if(len ents > 0) {
-		ee := ents[len ents-1];
-		offset = ee.offset+big ee.csize;
-	}
-	deltasizes := 0;
-
-	nchanges := 0;
-	for(;;) {
-		i = bg32(b);
-		if(i == 0)
-			break;
-
-		(rev, p1, p2, link, delta) := breadchunk(b, i);
-		if(dflag) {
-			say(sprint("\trev=%s", rev));
-			say(sprint("\tp1=%s", p1));
-			say(sprint("\tp2=%s", p2));
-			say(sprint("\tlink=%s", link));
-			say(sprint("\tlen delta=%d", len delta));
-		}
-
-		if(ischlog && rev != link)
-			error(sprint("changelog entry %s with bogus link %s", rev, link));
-		if(!ischlog && chtab.find(link) == nil)
-			error(sprint("entry %s references absent changelog link %s", rev, link));
-
-		p := Patch.xparse(delta);
-		if(dflag) {
-			say(sprint("\tpatch, sizediff %d", p.sizediff()));
-			say(sprint("\t%s", p.text()));
-		}
-		
-		p1rev := p2rev := -1;
-		if(p1 != hg->nullnode)
-			p1rev = (p1e := findentry("p1", rev, tab, p1)).rev;
-		if(p2 != hg->nullnode)
-			p2rev = findentry("p2", rev, tab, p2).rev;
-
-		selfrev := nents++;
-		linkrev := selfrev;
-		if(!ischlog)
-			linkrev = findentry("link", rev, chtab, link).rev;
-
-		if(selfrev == firstrev) {
-			if(p1rev >= 0) {
-				# first change, relative to p1
-				base = p1e.base;
-				current = rl.xget(p1rev);
-				say(sprint("first change relative to p1 %s (%d, base %d)", p1, p1rev, base));
-				say(sprint("first data is %q", string current));
-			} else {
-				if(len p.l != 1 || (c := hd p.l).start != 0 || c.end != 0)
-					error(sprint("first chunk is not full version"));
-			}
-			deltasizes = 0; # xxx fix
-		}
-
-		say(sprint("new entry: selfrev %d p1rev %d p2rev %d linkrev %d (base %d)", selfrev, p1rev, p2rev, linkrev, base));
-
-		data: array of byte;
-		if(len p.l == 1 && (c := hd p.l).start == 0 && c.end == len current) {
-			say("patch covers entire file, storing full copy");
-			base = selfrev;
-			data = current = (hd p.l).buf;
-			deltasizes = 0;
-
-			compr := compress(data);
-			if(len compr < len data*90/100) {
-				data = compr;
-			} else {
-				nd := array[1+len data] of byte;
-				nd[0] = byte 'u';
-				nd[1:] = data;
-				data = nd;
-			}
-		} else {
-			current = p.apply(current);
-			if(selfrev == firstrev && p1rev >= 0 && p1rev != selfrev-1 || deltasizes+len delta > 2*len current) {
-				say("delta against p1 which is not previous or delta's too big, storing full copy");
-				base = selfrev;
-				data = current;
-				deltasizes = 0;
-
-				compr := compress(data);
-				if(len compr < len data*90/100) {
-					data = compr;
-				} else {
-					nd := array[1+len data] of byte;
-					nd[0] = byte 'u';
-					nd[1:] = data;
-					data = nd;
-				}
-			} else {
-				say("storing delta");
-				data = delta;
-				deltasizes += len delta;
-
-				compr := compress(data);
-				if(len compr < len data*90/100)
-					data = compr;
-			}
-		}
-		nrev := hg->xcreatenodeid(current, p1, p2);
-		if(nrev != rev)
-			error(sprint("nodeid mismatch, expected %s saw %s", rev, nrev));
-
-		say(sprint("new base %d", base));
-		flags := 0;
-		e := ref Entry(selfrev, offset, big 0, flags, len data, len current, base, linkrev, p1rev, p2rev, rev);
-
-		ebuf := array[hg->Entrysize] of byte;
-		e.xpack(ebuf, indexonly);
-		if(ib.write(ebuf, len ebuf) != len ebuf)
-			error(sprint("write: %r"));
-		if(indexonly) {
-			if(ib.write(data, len data) != len data)
-				error(sprint("write: %r"));
-		} else {
-			if(db.write(data, len data) != len data)
-				error(sprint("write: %r"));
-		}
-
-		offset += big len data;
-		tab.add(e.nodeid, e);
-		nchanges++;
-	}
-
-	if(ib.flush() == Bufio->ERROR)
-		error(sprint("write: %r"));
-	if(db != nil && db.flush() == Bufio->ERROR)
-		error(sprint("write: %r"));
-	return (tab, nchanges);
-}
-
-findentry(name, rev: string, tab: ref Strhash[ref Entry], n: string): ref Entry
-{
-	e := tab.find(n);
-	if(e == nil)
-		error(sprint("missing %s %s for nodeid %s", name, n, rev));
-	return e;
-}
-
-bg32(b: ref Iobuf): int
-{
-	return g32i(breadn(b, 4), 0).t0;
-}
-
-breadchunk(b: ref Iobuf, n: int): (string, string, string, string, array of byte)
-{
-	n -= 4;
-	if(n < 4*20)
-		error("short chunk");
-	buf := breadn(b, n);
-	o := 0;
-	rev := buf[o:o+20];
-	o += 20;
-	p1 := buf[o:o+20];
-	o += 20;
-	p2 := buf[o:o+20];
-	o += 20;
-	link := buf[o:o+20];
-	o += 20;
-	delta := buf[o:];
-	return (hex(rev), hex(p1), hex(p2), hex(link), delta);
-}
-
-breadn(b: ref Iobuf, n: int): array of byte
-{
-	buf := array[n] of byte;
-	h := 0;
-	while(h < n) {
-		nn := b.read(buf[h:], n-h);
-		if(nn == Bufio->EOF)
-			error("premature eof");
-		if(nn == Bufio->ERROR)
-			error(sprint("reading: %r"));
-		h += nn;
-	}
-	return buf;
-}
-
-compress(d: array of byte): array of byte
-{
-	(nd, err) := filtertool->convert(deflate, "z", d);
-	if(err != nil)
-		error("deflate: "+err);
-	return nd;
-}
-
-tablength(t: ref Strhash[ref Entry]): int
-{
-	n := 0;
-	for(i := 0; i < len t.items; i++)
-		n += len t.items[i];
-	return n;
-}
-
 error(s: string)
 {
 	raise "hg:"+s;

File appl/lib/mercurial.b

 	Strhash: import tables;
 include "util0.m";
 	util: Util0;
-	eq, hasstr, p32, p32i, p16, stripws, prefix, suffix, rev, max, l2a, readfile, writefile: import util;
+	g32i, eq, hasstr, p32, p32i, p16, stripws, prefix, suffix, rev, max, l2a, readfile, writefile: import util;
 include "mercurial.m";
 
 
 	return d;
 }
 
-Revlog.xappend(rl: self ref Revlog, r: ref Repo, tr: ref Transact, p1, p2: string, link: int, buf: array of byte): ref Entry
+xstreamin(r: ref Repo, b: ref Iobuf)
+{
+	tr := r.xtransact();
+	{
+		xstreamin0(r, tr, b);
+		r.xcommit(tr);
+	} exception {
+	"hg:*" =>
+		r.xrollback(tr);
+		raise;
+	}
+}
+
+xstreamin0(r: ref Repo, tr: ref Transact, b: ref Iobuf)
+{
+	warn("adding changesets");
+	cl := r.xchangelog();
+	nchangesets := cl.xstream(r, tr, b, 1, cl);
+
+	warn("adding manifests");
+	ml := r.xmanifestlog();
+	ml.xstream(r, tr, b, 0, cl);
+	
+	warn("adding file changes");
+	nfiles := 0;
+	nchanges := 0;
+	for(;;) {
+		i := bg32(b);
+		if(i == 0)
+			break;
+
+		namebuf := breadn0(b, i-4);
+		name := string namebuf;
+		rl := r.xopenrevlog(name);
+		nchanges += rl.xstream(r, tr, b, 0, cl);
+		nfiles++;
+	}
+
+	case b.getc() {
+	Bufio->EOF =>	;
+	Bufio->ERROR =>	error(sprint("error reading end of changegroup: %r"));
+	* =>		error(sprint("data past end of changegroup..."));
+	}
+
+	warn(sprint("added %d changesets with %d changes to %d files", nchangesets, nchanges, nfiles));
+}
+
+Revlog.xappend(rl: self ref Revlog, r: ref Repo, tr: ref Transact, nodeid, p1, p2: string, link: int, buf: array of byte): ref Entry
 {
 	xreopen(rl);
 
 	p1rev := p2rev := -1;
-	if(p1 != nullnode) {
+	if(p1 != nullnode)
 		p1rev = rl.xfindnodeid(p1, 1).rev;
-		if(p2 != nullnode)
-			p2rev = rl.xfindnodeid(p2, 1).rev;
-	}
-	nodeid := xcreatenodeid(buf, p1, p2);
-
-	e := rl.xfindnodeid(nodeid, 0);
-	if(e != nil)
-		return e;
+	if(p2 != nullnode)
+		p2rev = rl.xfindnodeid(p2, 1).rev;
 
 	ipath := rl.path+".i";
 	dpath := rl.path+".d";
 			dsize = ee.offset+big ee.csize;
 		}
 	}
+	if(link < 0)
+		link = nrev;
 
 	if(!tr.has(ipath))
 		tr.add(rl.rlpath+".i", isize);
 	(base, buf) = rl.xstorebuf(buf, nrev);
 
 	# if we grow a .i-only revlog to beyond 128k, create a .d and rewrite the .i
-	isindexonly := rl.isindexonly();
-	if(isindexonly && isize+big Entrysize+big len buf >= big (128*1024)) {
+	if(rl.isindexonly() && isize+big Entrysize+big len buf >= big (128*1024)) {
 		say(sprint("no longer indexonly, writing %q", dpath));
 
 		ifd := xopen(ipath, Sys->OREAD);
 		isize = big 0;
 		dsize = big 0;
 		for(i := 0; i < len rl.ents; i++) {
-			e = rl.ents[i];
+			e := rl.ents[i];
 
 			if(i == 0)
 				p16(ibuf, 0, 0); # clear the Indexonly bits
 		if(sys->remove(ipath) != 0 || sys->fwstat(nifd, ndir) != 0)
 			error(sprint("remove %q and rename of %q failed: %r", ipath, nipath));
 
-		isindexonly = 0;
+		rl.flags &= ~Indexonly;
+		rl.ifd = rl.dfd = nil;
+		rl.bd = nil;
 
 		tr.add(rl.rlpath+".d", dsize);
 		tr.add(rl.rlpath+".i", isize);
 	}
 
 	ioffset := big 0;
-	if(isindexonly)
+	if(rl.isindexonly())
 		ioffset = isize+big Entrysize;
 
 	flags := 0;
-	e = ref Entry (nrev, offset, ioffset, flags, len buf, uncsize, base, link, p1rev, p2rev, nodeid);
+	e := ref Entry (nrev, offset, ioffset, flags, len buf, uncsize, base, link, p1rev, p2rev, nodeid);
 say(sprint("revlog %q, will be adding %s", rl.path, e.text()));
 	ebuf := array[Entrysize] of byte;
-	e.xpack(ebuf, isindexonly);
+	e.xpack(ebuf, rl.isindexonly());
 	nents := array[len rl.ents+1] of ref Entry;
 	nents[:] = rl.ents;
 	nents[len rl.ents] = e;
 	if(sys->pwrite(ifd, ebuf, len ebuf, isize) != len ebuf)
 		error(sprint("write %q: %r", ipath));
 	isize += big Entrysize;
-	if(isindexonly) {
+	if(rl.isindexonly()) {
 		if(sys->pwrite(ifd, buf, len buf, isize) != len buf)
 			error(sprint("write %q: %r", ipath));
 	} else {
 	return e;
 }
 
+Revlog.xstream(rl: self ref Revlog, r: ref Repo, tr: ref Transact, b: ref Bufio->Iobuf, ischlog: int, cl: ref Revlog): int
+{
+	buf: array of byte;
+	nchanges := 0;
+	for(;;) {
+		i := bg32(b);
+		if(i == 0)
+			break;
+
+		(rev, p1, p2, link, delta) := breadchunk(b, i);
+		say(sprint("\trev=%s", rev));
+		say(sprint("\tp1=%s", p1));
+		say(sprint("\tp2=%s", p2));
+		say(sprint("\tlink=%s", link));
+		say(sprint("\tlen delta=%d", len delta));
+
+		if(ischlog && rev != link)
+			error(sprint("changelog entry %s with bogus link %s", rev, link));
+		if(!ischlog && cl.xfindnodeid(link, 0) == nil)
+			error(sprint("nodeid %s references unknown changelog link %s", rev, link));
+
+		p := Patch.xparse(delta);
+		say(sprint("\tpatch, sizediff %d", p.sizediff()));
+		say(sprint("\t%s", p.text()));
+		
+		linkrev := -1;
+		if(!ischlog)
+			linkrev = cl.xfindnodeid(link, 1).rev;
+
+		if(buf == nil) {
+			if(p1 != nullnode) {
+				buf = rl.xgetnodeid(p1);
+			} else {
+				if(len p.l != 1 || (c := hd p.l).start != 0 || c.end != 0)
+					error(sprint("first chunk is not full version"));
+				buf = array[0] of byte;
+			}
+		}
+
+		buf = p.apply(buf);
+
+		nodeid := xcreatenodeid(buf, p1, p2);
+		if(nodeid != rev)
+			error(sprint("nodeid mismatch, expected %s saw %s", rev, nodeid));
+
+		if(rl.xfindnodeid(nodeid, 0) != nil)
+			error(sprint("already have nodeid %s", nodeid));
+
+		rl.xappend(r, tr, nodeid, p1, p2, linkrev, buf);
+		nchanges++;
+	}
+	return nchanges;
+}
+
 xreadlines(b: ref Iobuf): list of string
 {
 	l: list of string;
 	return (0, nil);
 }
 
+
+breadn0(b: ref Iobuf, n: int): array of byte
+{
+	nn := breadn(b, d := array[n] of byte, n);
+	if(nn < 0)
+		error(sprint("reading: %r"));
+	if(nn != n)
+		error("short read");
+	return d;
+}
+
+bg32(b: ref Iobuf): int
+{
+	return g32i(breadn0(b, 4), 0).t0;
+}
+
+breadchunk(b: ref Iobuf, n: int): (string, string, string, string, array of byte)
+{
+	n -= 4;
+	if(n < 4*20)
+		error("short chunk");
+	buf := breadn0(b, n);
+	o := 0;
+	rev := buf[o:o+20];
+	o += 20;
+	p1 := buf[o:o+20];
+	o += 20;
+	p2 := buf[o:o+20];
+	o += 20;
+	link := buf[o:o+20];
+	o += 20;
+	delta := buf[o:];
+	return (hex(rev), hex(p1), hex(p2), hex(link), delta);
+}
+
 xreadfile(p: string): array of byte
 {
 	buf := readfile(p, -1);

File module/mercurial.m

 	xbopencreate:	fn(f: string, mode, perm: int): ref Bufio->Iobuf;
 	xdirstate:	fn(r: ref Repo, all: int): ref Dirstate;
 	xparsetags:	fn(r: ref Repo, s: string): list of ref Tag;
+	xstreamin:	fn(r: ref Repo, b: ref Bufio->Iobuf);
 
 	Entrysize:	con 64;
 
 
 		xentries:	fn(rl: self ref Revlog): array of ref Entry;
 		isindexonly:	fn(rl: self ref Revlog): int;
-		xappend:	fn(rl: self ref Revlog, r: ref Repo, tr: ref Transact, p1, p2: string, link: int, buf: array of byte): ref Entry;
-
-		#xstream;	fn(rl: ref Revlog, tr: ref Transact, cg: ref Bufio->Iobuf, ischlog: int, cl: ref Revlog): int;
+		xappend:	fn(rl: self ref Revlog, r: ref Repo, tr: ref Transact, nodeid, p1, p2: string, link: int, buf: array of byte): ref Entry;
+		xstream:	fn(rl: self ref Revlog, r: ref Repo, tr: ref Transact, b: ref Bufio->Iobuf, ischlog: int, cl: ref Revlog): int;
 	};
 
 	Repo: adt {