Anonymous avatar Anonymous committed 977106d

Replace the qwalk API (to retreive on disk envelopes at runtime) with
a simple QOP_WALK queue operation. Some knf and formating fixes while
there.

ok gilles@

Comments (0)

Files changed (5)

usr.sbin/smtpd/queue.c

-/*	$OpenBSD: queue.c,v 1.143 2012/11/20 09:47:45 eric Exp $	*/
+/*	$OpenBSD: queue.c,v 1.144 2012/11/23 09:25:44 eric Exp $	*/
 
 /*
  * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
 static void
 queue_timeout(int fd, short event, void *p)
 {
-	static struct qwalk	*q = NULL;
-	static uint32_t		 msgid = 0;
-	static size_t		 evpcount = 0;
-	struct event		*ev = p;
-	struct envelope		 envelope;
-	struct timeval		 tv;
-	uint64_t		 evpid;
-
-	if (q == NULL) {
-		log_debug("debug: queue: loading queue into scheduler");
-		q = qwalk_new(0);
-	}
-
-	while (qwalk(q, &evpid)) {
+	static uint32_t	 msgid = 0;
+	struct envelope	 evp;
+	struct event	*ev = p;
+	struct timeval	 tv;
+	int		 r;
 
-		if (msgid && evpid_to_msgid(evpid) != msgid && evpcount) {
+	r = queue_envelope_walk(&evp);
+	if (r == -1) {
+		if (msgid)
 			imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
 			    IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &msgid,
 			    sizeof msgid);
-			evpcount = 0;
-		}
-		msgid = evpid_to_msgid(evpid);
-
-		if (!queue_envelope_load(evpid, &envelope))
-			log_warnx("warn: Failed to load envelope %016"PRIx64,
-			    evpid);
-		else {
-			imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
-			    IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, &envelope,
-			    sizeof envelope);
-			evpcount++;
-		}
-
-		tv.tv_sec = 0;
-		tv.tv_usec = 0;
-		evtimer_add(ev, &tv);	
+		log_debug("debug: queue: done loading queue into scheduler");
 		return;
 	}
 
-	if (msgid && evpcount) {
+	if (r) {
+		if (msgid && evpid_to_msgid(evp.id) != msgid)
+			imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
+			    IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &msgid,
+			    sizeof msgid);
+		msgid = evpid_to_msgid(evp.id);
 		imsg_compose_event(env->sc_ievs[PROC_SCHEDULER],
-		    IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &msgid, sizeof msgid);
-		evpcount = 0;
+		    IMSG_QUEUE_SUBMIT_ENVELOPE, 0, 0, -1, &evp, sizeof evp);
 	}
 
-	log_debug("debug: queue: done loading queue into scheduler");
-	qwalk_close(q);
+	tv.tv_sec = 0;
+	tv.tv_usec = 10;
+	evtimer_add(ev, &tv);
 }

usr.sbin/smtpd/queue_backend.c

-/*	$OpenBSD: queue_backend.c,v 1.40 2012/11/12 14:58:53 eric Exp $	*/
+/*	$OpenBSD: queue_backend.c,v 1.41 2012/11/23 09:25:44 eric Exp $	*/
 
 /*
  * Copyright (c) 2011 Gilles Chehade <gilles@openbsd.org>
 static const char* envelope_validate(struct envelope *);
 
 extern struct queue_backend	queue_backend_fs;
+extern struct queue_backend	queue_backend_ram;
 
 int
 queue_message_incoming_path(uint32_t msgid, char *buf, size_t len)
 		return (0);
 
 	if (env->sc_queue_flags & QUEUE_COMPRESS) {
-		evplen = compress_buffer(evp, evplen, evpbufcom, sizeof evpbufcom);
+		evplen = compress_buffer(evp, evplen, evpbufcom,
+		    sizeof evpbufcom);
 		if (evplen == 0)
 			return (0);
 		evp = evpbufcom;
 	evplen = evpbufsize;
 
 	if (env->sc_queue_flags & QUEUE_COMPRESS) {
-		evplen = uncompress_buffer(evp, evplen, evpbufcom, sizeof evpbufcom);
+		evplen = uncompress_buffer(evp, evplen, evpbufcom,
+		    sizeof evpbufcom);
 		if (evplen == 0)
 			return (0);
 		evp = evpbufcom;
 	size_t		 evplen;
 
 	ep->id = evpid;
-	evplen = env->sc_queue->envelope(QOP_LOAD, &ep->id, evpbuf, sizeof evpbuf);
+	evplen = env->sc_queue->envelope(QOP_LOAD, &ep->id, evpbuf,
+	    sizeof evpbuf);
 	if (evplen == 0)
 		return (0);
-		
+
 	if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
 		if ((e = envelope_validate(ep)) == NULL) {
 			ep->id = evpid;
 			return (1);
 		}
-		log_debug("debug: invalid envelope %016" PRIx64 ": %s", ep->id, e);
+		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
+		    ep->id, e);
 	}
 	return (0);
 }
 	return env->sc_queue->envelope(QOP_UPDATE, &ep->id, evpbuf, evplen);
 }
 
-void *
-qwalk_new(uint32_t msgid)
-{
-	return env->sc_queue->qwalk_new(msgid);
-}
-
 int
-qwalk(void *hdl, uint64_t *evpid)
+queue_envelope_walk(struct envelope *ep)
 {
-	return env->sc_queue->qwalk(hdl, evpid);
-}
+	const char	*e;
+	uint64_t	 evpid;
+	char		 evpbuf[sizeof(struct envelope)];
+	int		 r;
 
-void
-qwalk_close(void *hdl)
-{
-	return env->sc_queue->qwalk_close(hdl);
+	r = env->sc_queue->envelope(QOP_WALK, &evpid, evpbuf, sizeof evpbuf);
+	if (r == -1 || r == 0)
+		return (r);
+
+	if (queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
+		if ((e = envelope_validate(ep)) == NULL) {
+			ep->id = evpid;
+			return (1);
+		}
+		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
+		    ep->id, e);
+	}
+	return (0);
 }
 
 uint32_t
 {
 	uint32_t msgid;
 
-	while((msgid = arc4random_uniform(0xffffffff)) == 0)
+	while ((msgid = arc4random_uniform(0xffffffff)) == 0)
 		;
 
 	return msgid;
 	uint32_t rnd;
 	uint64_t evpid;
 
-	while((rnd = arc4random_uniform(0xffffffff)) == 0)
+	while ((rnd = arc4random_uniform(0xffffffff)) == 0)
 		;
 
 	evpid = msgid;
 	return evpid;
 }
 
-
-/**/
 static const char*
 envelope_validate(struct envelope *ep)
 {

usr.sbin/smtpd/queue_fsqueue.c

-/*	$OpenBSD: queue_fsqueue.c,v 1.55 2012/11/12 14:58:53 eric Exp $	*/
+/*	$OpenBSD: queue_fsqueue.c,v 1.56 2012/11/23 09:25:44 eric Exp $	*/
 
 /*
  * Copyright (c) 2011 Gilles Chehade <gilles@openbsd.org>
 static int	fsqueue_envelope_load(uint64_t, char *, size_t);
 static int	fsqueue_envelope_update(uint64_t, char *, size_t);
 static int	fsqueue_envelope_delete(uint64_t);
+static int	fsqueue_envelope_walk(uint64_t *, char *, size_t);
 
 static int	fsqueue_message_create(uint32_t *);
 static int	fsqueue_message_commit(uint32_t);
 static int	fsqueue_message(enum queue_op, uint32_t *);
 static int	fsqueue_envelope(enum queue_op , uint64_t *, char *, size_t);
 
-static void    *fsqueue_qwalk_new(uint32_t);
+static void    *fsqueue_qwalk_new(void);
 static int	fsqueue_qwalk(void *, uint64_t *);
 static void	fsqueue_qwalk_close(void *);
 
 #define PATH_EVPTMP		PATH_INCOMING "/envelope.tmp"
 
 struct queue_backend	queue_backend_fs = {
-	  fsqueue_init,
-	  fsqueue_message,
-	  fsqueue_envelope,
-	  fsqueue_qwalk_new,
-	  fsqueue_qwalk,
-	  fsqueue_qwalk_close
+	fsqueue_init,
+	fsqueue_message,
+	fsqueue_envelope,
 };
 
 static struct timespec	startup;
 		if (queued)
 			fsqueue_envelope_path(*evpid, path, sizeof(path));
 		else
-			queue_envelope_incoming_path(*evpid, path, sizeof(path));
+			queue_envelope_incoming_path(*evpid, path,
+			    sizeof(path));
 
 		if (stat(path, &sb) == -1 && errno == ENOENT)
 			goto found;
 }
 
 static int
+fsqueue_envelope_walk(uint64_t *evpid, char *buf, size_t len)
+{
+	static int	 done = 0;
+	static void	*hdl = NULL;
+
+	if (done)
+		return (-1);
+
+	if (hdl == NULL)
+		hdl = fsqueue_qwalk_new();
+
+	if (fsqueue_qwalk(hdl, evpid))
+		return (fsqueue_envelope_load(*evpid, buf, len));
+
+	fsqueue_qwalk_close(hdl);
+	done = 1;
+	return (-1);
+}
+
+static int
 fsqueue_message_create(uint32_t *msgid)
 {
 	char rootdir[MAXPATHLEN];
 
 again:
 	*msgid = queue_generate_msgid();
-	
+
 	/* prevent possible collision later when moving to Q_QUEUE */
 	fsqueue_message_path(*msgid, rootdir, sizeof(rootdir));
 	if (stat(rootdir, &sb) != -1 || errno != ENOENT)
 	int  retry = 0;
 
 	fsqueue_message_path(msgid, rootdir, sizeof(rootdir));
-	fsqueue_message_corrupt_path(msgid, corruptdir, sizeof(corruptdir));
+	fsqueue_message_corrupt_path(msgid, corruptdir,
+	    sizeof(corruptdir));
 
 again:
 	if (stat(corruptdir, &sb) != -1 || errno != ENOENT) {
-		fsqueue_message_corrupt_path(msgid, corruptdir, sizeof(corruptdir));
+		fsqueue_message_corrupt_path(msgid, corruptdir,
+		    sizeof(corruptdir));
 		snprintf(buf, sizeof(buf), ".%i", retry++);
 		strlcat(corruptdir, buf, sizeof(corruptdir));
 		goto again;
 static int
 fsqueue_message(enum queue_op qop, uint32_t *msgid)
 {
-        switch (qop) {
-        case QOP_CREATE:
+	switch (qop) {
+	case QOP_CREATE:
 		return fsqueue_message_create(msgid);
-
-        case QOP_DELETE:
+	case QOP_DELETE:
 		return fsqueue_message_delete(*msgid);
-
-        case QOP_COMMIT:
+	case QOP_COMMIT:
 		return fsqueue_message_commit(*msgid);
-
-        case QOP_FD_R:
-                return fsqueue_message_fd_r(*msgid);
-
+	case QOP_FD_R:
+		return fsqueue_message_fd_r(*msgid);
 	case QOP_CORRUPT:
 		return fsqueue_message_corrupt(*msgid);
-
-        default:
+	default:
 		fatalx("queue_fsqueue_message: unsupported operation.");
-        }
-
+	}
 	return 0;
 }
 
 static int
 fsqueue_envelope(enum queue_op qop, uint64_t *evpid, char *buf, size_t len)
 {
-        switch (qop) {
-        case QOP_CREATE:
+	switch (qop) {
+	case QOP_CREATE:
 		return fsqueue_envelope_create(evpid, buf, len);
-
-        case QOP_DELETE:
+	case QOP_DELETE:
 		return fsqueue_envelope_delete(*evpid);
-
-        case QOP_LOAD:
+	case QOP_LOAD:
 		return fsqueue_envelope_load(*evpid, buf, len);
-
-        case QOP_UPDATE:
+	case QOP_UPDATE:
 		return fsqueue_envelope_update(*evpid, buf, len);
-
-        default:
+	case QOP_WALK:
+		return fsqueue_envelope_walk(evpid, buf, len);
+	default:
 		fatalx("queue_fsqueue_envelope: unsupported operation.");
-        }
-
+	}
 	return 0;
 }
 
 struct qwalk {
 	FTS	*fts;
-	uint32_t msgid;
 	int	 depth;
 };
 
 static void *
-fsqueue_qwalk_new(uint32_t msgid)
+fsqueue_qwalk_new(void)
 {
 	char		 path[MAXPATHLEN];
 	char * const	 path_argv[] = { path, NULL };
 	struct qwalk	*q;
 
 	q = xcalloc(1, sizeof(*q), "fsqueue_qwalk_new");
-	q->msgid = msgid;
 	strlcpy(path, PATH_QUEUE, sizeof(path));
 	q->fts = fts_open(path_argv,
 	    FTS_PHYSICAL | FTS_NOCHDIR, NULL);
 fsqueue_qwalk(void *hdl, uint64_t *evpid)
 {
 	struct qwalk	*q = hdl;
-        FTSENT 		*e;
+	FTSENT		*e;
 	char		*tmp;
-	uint32_t	 msgid;
 
-        while ((e = fts_read(q->fts)) != NULL) {
-
-		switch(e->fts_info) {
+	while ((e = fts_read(q->fts)) != NULL) {
+		switch (e->fts_info) {
 		case FTS_D:
 			q->depth += 1;
 			if (q->depth == 2 && e->fts_namelen != 2) {
 				fts_set(q->fts, e, FTS_SKIP);
 				break;
 			}
-			if (q->msgid && (q->depth == 2 || q->depth == 3)) {
-				msgid = strtoull(e->fts_name, &tmp, 16);
-				if (msgid != (q->depth == 1) ?
-				    (q->msgid & 0xff) : q->msgid) {
-					fts_set(q->fts, e, FTS_SKIP);
-					break;
-				}
-			}
 			break;
 
 		case FTS_DP:
 		}
 	}
 
-        return (0); 
+	return (0);
 }

usr.sbin/smtpd/smtpctl.c

-/*	$OpenBSD: smtpctl.c,v 1.97 2012/11/20 09:47:46 eric Exp $	*/
+/*	$OpenBSD: smtpctl.c,v 1.98 2012/11/23 09:25:44 eric Exp $	*/
 
 /*
  * Copyright (c) 2006 Pierre-Yves Ritschard <pyr@openbsd.org>
 static void
 show_queue(int flags)
 {
-	struct qwalk	*q;
 	struct envelope	 envelope;
-	uint64_t	 evpid;
+	int		 r;
 
 	log_init(1);
 
 	if (chroot(PATH_SPOOL) == -1 || chdir(".") == -1)
 		err(1, "%s", PATH_SPOOL);
 
-	q = qwalk_new(0);
-
-	while (qwalk(q, &evpid)) {
-		if (! queue_envelope_load(evpid, &envelope))
-			continue;
-		show_queue_envelope(&envelope, flags);
-	}
-
-	qwalk_close(q);
+	while ((r = queue_envelope_walk(&envelope)) != -1)
+		if (r)
+			show_queue_envelope(&envelope, flags);
 }
 
-
 static void
 show_queue_envelope(struct envelope *e, int online)
 {

usr.sbin/smtpd/smtpd.h

-/*	$OpenBSD: smtpd.h,v 1.396 2012/11/20 09:47:46 eric Exp $	*/
+/*	$OpenBSD: smtpd.h,v 1.397 2012/11/23 09:25:44 eric Exp $	*/
 
 /*
  * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
 #define MAX_NAME_SIZE		 64
 
 #define MAX_HOPS_COUNT		 100
-#define	DEFAULT_MAX_BODY_SIZE  	(35*1024*1024)
+#define	DEFAULT_MAX_BODY_SIZE	(35*1024*1024)
 
 #define MAX_TAG_SIZE		 32
 
 #define SMTPD_QUEUE_MAXINTERVAL	 (4 * 60 * 60)
 #define SMTPD_QUEUE_EXPIRY	 (4 * 24 * 60 * 60)
 #define SMTPD_USER		 "_smtpd"
-#define SMTPD_FILTER_USER      	 "_smtpmfa"
+#define SMTPD_FILTER_USER	 "_smtpmfa"
 #define SMTPD_SOCKET		 "/var/run/smtpd.sock"
 #define SMTPD_BANNER		 "220 %s ESMTP OpenSMTPD"
 #define SMTPD_SESSION_TIMEOUT	 300
 /* max len of any smtp line */
 #define	SMTP_LINE_MAX		MAX_LINE_SIZE
 
-#define F_STARTTLS		 0x01
-#define F_SMTPS			 0x02
-#define F_AUTH			 0x04
-#define F_SSL			(F_SMTPS|F_STARTTLS)
-#define	F_STARTTLS_REQUIRE     	 0x08
-#define	F_AUTH_REQUIRE		 0x10
+#define F_STARTTLS		0x01
+#define F_SMTPS			0x02
+#define F_AUTH			0x04
+#define F_SSL		       (F_SMTPS|F_STARTTLS)
+#define	F_STARTTLS_REQUIRE	0x08
+#define	F_AUTH_REQUIRE		0x10
 
 #define	F_BACKUP		0x20	/* XXX - MUST BE SYNC-ED WITH ROUTE_BACKUP */
 
 	IMSG_MDA_DONE,
 
 	IMSG_MFA_CONNECT,
- 	IMSG_MFA_HELO,
- 	IMSG_MFA_MAIL,
- 	IMSG_MFA_RCPT,
- 	IMSG_MFA_DATALINE,
+	IMSG_MFA_HELO,
+	IMSG_MFA_MAIL,
+	IMSG_MFA_RCPT,
+	IMSG_MFA_DATALINE,
 	IMSG_MFA_QUIT,
 	IMSG_MFA_CLOSE,
 	IMSG_MFA_RSET,
 };
 
 struct rule {
-	TAILQ_ENTRY(rule)		 r_entry;
-	enum decision			 r_decision;
-	char				 r_tag[MAX_TAG_SIZE];
-	int				 r_accept;
-	struct map			*r_sources;
-	struct cond			 r_condition;
-	enum action_type		 r_action;
+	TAILQ_ENTRY(rule)		r_entry;
+	enum decision			r_decision;
+	char				r_tag[MAX_TAG_SIZE];
+	int				r_accept;
+	struct map		       *r_sources;
+	struct cond			r_condition;
+	enum action_type		r_action;
 	union rule_dest {
-		char			 buffer[MAX_RULEBUFFER_LEN];
-		struct relayhost       	 relayhost;
-	}				 r_value;
+		char			buffer[MAX_RULEBUFFER_LEN];
+		struct relayhost	relayhost;
+	}				r_value;
 
-	struct mailaddr			*r_as;
-	objid_t				 r_amap;
-	time_t				 r_qexpire;
+	struct mailaddr		       *r_as;
+	objid_t				r_amap;
+	time_t				r_qexpire;
 };
 
 struct mailaddr {
 	DF_INTERNAL		= 0x8, /* internal expansion forward */
 
 	/* runstate, not saved on disk */
+
 	DF_PENDING		= 0x10,
 	DF_INFLIGHT		= 0x20,
 };
 };
 
 struct expandnode {
-	RB_ENTRY(expandnode)	 entry;
-	TAILQ_ENTRY(expandnode)	 tq_entry;
-	enum expand_type       	 type;
-	int			 sameuser;
-	int			 alias;
-	struct rule		*rule;
-	struct expandnode	*parent;
-	unsigned int		 depth;
+	RB_ENTRY(expandnode)	entry;
+	TAILQ_ENTRY(expandnode)	tq_entry;
+	enum expand_type	type;
+	int			sameuser;
+	int			alias;
+	struct rule	       *rule;
+	struct expandnode      *parent;
+	unsigned int		depth;
 	union {
 		/*
 		 * user field handles both expansion user and system user
 		 * so we MUST make it large enough to fit a mailaddr user
 		 */
-		char		 user[MAX_LOCALPART_SIZE];
-		char		 buffer[MAX_RULEBUFFER_LEN];
-		struct mailaddr	 mailaddr;
-	} 			 u;
+		char		user[MAX_LOCALPART_SIZE];
+		char		buffer[MAX_RULEBUFFER_LEN];
+		struct mailaddr	mailaddr;
+	}			u;
 };
 
 struct expand {
 
 
 struct smtpd {
-	char					 sc_conffile[MAXPATHLEN];
-	size_t					 sc_maxsize;
-
-#define SMTPD_OPT_VERBOSE			 0x00000001
-#define SMTPD_OPT_NOACTION			 0x00000002
-	uint32_t				 sc_opts;
-#define SMTPD_CONFIGURING			 0x00000001
-#define SMTPD_EXITING				 0x00000002
-#define SMTPD_MDA_PAUSED		       	 0x00000004
-#define SMTPD_MTA_PAUSED		       	 0x00000008
-#define SMTPD_SMTP_PAUSED		       	 0x00000010
-#define SMTPD_MDA_BUSY			       	 0x00000020
-#define SMTPD_MTA_BUSY			       	 0x00000040
-#define SMTPD_BOUNCE_BUSY      		       	 0x00000080
-#define SMTPD_SMTP_DISABLED			 0x00000100
-	uint32_t				 sc_flags;
-	uint32_t				 sc_queue_flags;
-#define QUEUE_COMPRESS				 0x00000001
-	char					*sc_queue_compress_algo;
-	int					 sc_qexpire;
-	struct event				 sc_ev;
-	int					 *sc_pipes[PROC_COUNT]
-							[PROC_COUNT];
-	struct imsgev				*sc_ievs[PROC_COUNT];
-	int					 sc_instances[PROC_COUNT];
-	int					 sc_instance;
-	char					*sc_title[PROC_COUNT];
-	struct passwd				*sc_pw;
-	char					 sc_hostname[MAXHOSTNAMELEN];
-	struct queue_backend			*sc_queue;
-	struct compress_backend			*sc_compress;
-	struct scheduler_backend		*sc_scheduler;
-	struct stat_backend			*sc_stat;
+	char				sc_conffile[MAXPATHLEN];
+	size_t				sc_maxsize;
+
+#define SMTPD_OPT_VERBOSE		0x00000001
+#define SMTPD_OPT_NOACTION		0x00000002
+	uint32_t			sc_opts;
+#define SMTPD_CONFIGURING		0x00000001
+#define SMTPD_EXITING			0x00000002
+#define SMTPD_MDA_PAUSED		0x00000004
+#define SMTPD_MTA_PAUSED		0x00000008
+#define SMTPD_SMTP_PAUSED		0x00000010
+#define SMTPD_MDA_BUSY			0x00000020
+#define SMTPD_MTA_BUSY			0x00000040
+#define SMTPD_BOUNCE_BUSY		0x00000080
+#define SMTPD_SMTP_DISABLED		0x00000100
+	uint32_t			sc_flags;
+	uint32_t			sc_queue_flags;
+#define QUEUE_COMPRESS			0x00000001
+	char			       *sc_queue_compress_algo;
+	int				sc_qexpire;
+	struct event			sc_ev;
+	int			       *sc_pipes[PROC_COUNT][PROC_COUNT];
+	struct imsgev		       *sc_ievs[PROC_COUNT];
+	int				sc_instances[PROC_COUNT];
+	int				sc_instance;
+	char			       *sc_title[PROC_COUNT];
+	struct passwd		       *sc_pw;
+	char				sc_hostname[MAXHOSTNAMELEN];
+	struct queue_backend	       *sc_queue;
+	struct compress_backend	       *sc_compress;
+	struct scheduler_backend       *sc_scheduler;
+	struct stat_backend	       *sc_stat;
 
 	time_t					 sc_uptime;
 
 };
 
 enum queue_op {
-	QOP_INVALID=0,
 	QOP_CREATE,
 	QOP_DELETE,
 	QOP_UPDATE,
+	QOP_WALK,
 	QOP_COMMIT,
 	QOP_LOAD,
 	QOP_FD_R,
 };
 
 struct queue_backend {
-	int (*init)(int);
-	int (*message)(enum queue_op, uint32_t *);
-	int (*envelope)(enum queue_op, uint64_t *, char *, size_t);
-
-	void *(*qwalk_new)(uint32_t);
-	int   (*qwalk)(void *, uint64_t *);
-	void  (*qwalk_close)(void *);
+	int	(*init)(int);
+	int	(*message)(enum queue_op, uint32_t *);
+	int	(*envelope)(enum queue_op, uint64_t *, char *, size_t);
 };
 
 struct compress_backend {
 };
 
 struct auth_backend {
-	int (*authenticate)(char *, char *);
+	int	(*authenticate)(char *, char *);
 };
 
 
 
 /* delivery_backend */
 struct delivery_backend {
-	int			allow_root;
-	void (*open)(struct deliver *);
+	int	allow_root;
+	void	(*open)(struct deliver *);
 };
 
 struct evpstate {
 int envelope_load_buffer(struct envelope *, char *, size_t);
 int envelope_dump_buffer(struct envelope *, char *, size_t);
 
+
 /* expand.c */
 int expand_cmp(struct expandnode *, struct expandnode *);
 void expand_insert(struct expand *, struct expandnode *);
 void expand_free(struct expand *);
 RB_PROTOTYPE(expandtree, expandnode, nodes, expand_cmp);
 
+
 /* forward.c */
 int forwards_get(int, struct expand *);
 
 /* lka.c */
 pid_t lka(void);
 
+
 /* lka_session.c */
 void lka_session(struct submit_status *);
 void lka_session_forward_reply(struct forward_req *, int);
 
+
 /* map.c */
 void *map_open(struct map *);
 void  map_update(struct map *);
 void  map_close(struct map *, void *);
-
 void *map_lookup(objid_t, const char *, enum map_kind);
 int map_compare(objid_t, const char *, enum map_kind,
     int (*)(const char *, const char *));
 void mta_route_collect(struct mta_route *);
 const char *mta_route_to_text(struct mta_route *);
 
+
 /* mta_session.c */
 void mta_session(struct mta_route *);
 void mta_session_imsg(struct imsgev *, struct imsg *);
 
+
 /* parse.y */
 int parse_config(struct smtpd *, const char *, int);
 int cmdline_symset(char *);
 
+
 /* queue.c */
 pid_t queue(void);
 
+
 /* queue_backend.c */
 uint32_t queue_generate_msgid(void);
 uint64_t queue_generate_evpid(uint32_t msgid);
 int queue_envelope_delete(struct envelope *);
 int queue_envelope_load(uint64_t, struct envelope *);
 int queue_envelope_update(struct envelope *);
-void *qwalk_new(uint32_t);
-int   qwalk(void *, uint64_t *);
-void  qwalk_close(void *);
+int queue_envelope_walk(struct envelope *);
+
 
 /* compress_backend.c */
 struct compress_backend *compress_backend_lookup(const char *);
 /* scheduler.c */
 pid_t scheduler(void);
 
+
 /* scheduler_bakend.c */
 struct scheduler_backend *scheduler_backend_lookup(const char *);
 void scheduler_info(struct scheduler_info *, struct envelope *);
 time_t scheduler_compute_schedule(struct scheduler_info *);
 
+
 /* smtp.c */
 pid_t smtp(void);
 void smtp_resume(void);
 void smtp_destroy(struct session *);
 
+
 /* smtp_session.c */
 void session_init(struct listener *, struct session *);
 int session_cmp(struct session *, struct session *);
 void session_pickup(struct session *, struct submit_status *);
 void session_destroy(struct session *, const char *);
 void session_respond(struct session *, char *, ...)
-	__attribute__ ((format (printf, 2, 3)));
-
+	__attribute__((format (printf, 2, 3)));
 SPLAY_PROTOTYPE(sessiontree, session, s_nodes, session_cmp);
 
 
 void	stat_increment(const char *, size_t);
 void	stat_decrement(const char *, size_t);
 void	stat_set(const char *, const struct stat_value *);
-
 struct stat_value *stat_counter(size_t);
 struct stat_value *stat_timestamp(time_t);
 struct stat_value *stat_timeval(struct timeval *);
 void addargs(arglist *, char *, ...)
 	__attribute__((format(printf, 2, 3)));
 int bsnprintf(char *, size_t, const char *, ...)
-	__attribute__ ((format (printf, 3, 4)));
+	__attribute__((format (printf, 3, 4)));
 int mkdirs(char *, mode_t);
 int safe_fclose(FILE *);
 int hostname_match(const char *, const char *);
 int session_socket_error(int);
 uint64_t strtoevpid(const char *);
 
+
 /* waitq.c */
 int  waitq_wait(void *, void (*)(void *, void *, void *), void *);
 void waitq_run(void *, void *);
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.