Commits

Andrew Dunstan committed eff770b

Latest patch applied, in part by hand. Builds and runs.

Comments (0)

Files changed (15)

src/bin/pg_dump/Makefile

 
 OBJS=	pg_backup_archiver.o pg_backup_db.o pg_backup_custom.o \
 	pg_backup_null.o pg_backup_tar.o \
-	pg_backup_directory.o dumpmem.o dumputils.o compress_io.o $(WIN32RES)
+	pg_backup_directory.o dumpmem.o dumputils.o compress_io.o \
+	parallel.o $(WIN32RES)
 
 KEYWRDOBJS = keywords.o kwlookup.o
 

src/bin/pg_dump/compress_io.c

 #include "compress_io.h"
 #include "dumpmem.h"
 #include "dumputils.h"
+#include "parallel.h"
 
 /*----------------------
  * Compressor API
 WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs,
 				   const void *data, size_t dLen)
 {
+	/* Are we aborting? */
+	checkAborting(AH);
+
 	switch (cs->comprAlg)
 	{
 		case COMPR_ALG_LIBZ:
 	/* no minimal chunk size for zlib */
 	while ((cnt = readF(AH, &buf, &buflen)))
 	{
+		/* Are we aborting? */
+		checkAborting(AH);
+
 		zp->next_in = (void *) buf;
 		zp->avail_in = cnt;
 
 
 	while ((cnt = readF(AH, &buf, &buflen)))
 	{
+		/* Are we aborting? */
+		checkAborting(AH);
+
 		ahwrite(buf, 1, cnt, AH);
 	}
 

src/bin/pg_dump/dumputils.c

 
 #include <ctype.h>
 
+#include "dumpmem.h"
 #include "dumputils.h"
 
 #include "parser/keywords.h"
 }	on_exit_nicely_list[MAX_ON_EXIT_NICELY];
 
 static int	on_exit_nicely_index;
+void (*on_exit_msg_func)(const char *modulename, const char *fmt, va_list ap) = vwrite_msg;
 
 #define supports_grant_options(version) ((version) >= 70400)
 
 static char *copyAclUserName(PQExpBuffer output, char *input);
 static void AddAcl(PQExpBuffer aclbuf, const char *keyword,
 	   const char *subname);
+static PQExpBuffer getThreadLocalPQExpBuffer(void);
 
 #ifdef WIN32
+static void shutdown_parallel_dump_utils(int code, void* unused);
 static bool parallel_init_done = false;
 static DWORD tls_index;
 static DWORD mainThreadId;
+
+static void
+shutdown_parallel_dump_utils(int code, void* unused)
+{
+	/* Call the cleanup function only from the main thread */
+	if (mainThreadId == GetCurrentThreadId())
+		WSACleanup();
+}
 #endif
 
 void
 #ifdef WIN32
 	if (!parallel_init_done)
 	{
+		WSADATA	wsaData;
+		int		err;
+
 		tls_index = TlsAlloc();
-		parallel_init_done = true;
 		mainThreadId = GetCurrentThreadId();
+		err = WSAStartup(MAKEWORD(2, 2), &wsaData);
+		if (err != 0)
+		{
+			fprintf(stderr, _("WSAStartup failed: %d\n"), err);
+			exit_nicely(1);
+		}
+		on_exit_nicely(shutdown_parallel_dump_utils, NULL);
+		parallel_init_done = true;
 	}
 #endif
 }
 
 /*
- *	Quotes input string if it's not a legitimate SQL identifier as-is.
- *
- *	Note that the returned string must be used before calling fmtId again,
- *	since we re-use the same return buffer each time.  Non-reentrant but
- *	reduces memory leakage. (On Windows the memory leakage will be one buffer
- *	per thread, which is at least better than one per call).
+ * Non-reentrant but reduces memory leakage. (On Windows the memory leakage
+ * will be one buffer per thread, which is at least better than one per call).
  */
-const char *
-fmtId(const char *rawid)
+static PQExpBuffer
+getThreadLocalPQExpBuffer(void)
 {
 	/*
 	 * The Tls code goes awry if we use a static var, so we provide for both
 	static PQExpBuffer s_id_return = NULL;
 	PQExpBuffer id_return;
 
-	const char *cp;
-	bool		need_quotes = false;
-
 #ifdef WIN32
 	if (parallel_init_done)
 		id_return = (PQExpBuffer) TlsGetValue(tls_index);		/* 0 when not set */
 
 	}
 
+	return id_return;
+}
+
+/*
+ *	Quotes input string if it's not a legitimate SQL identifier as-is.
+ *
+ *	Note that the returned string must be used before calling fmtId again,
+ *	since we re-use the same return buffer each time.
+ */
+const char *
+fmtId(const char *rawid)
+{
+	PQExpBuffer id_return = getThreadLocalPQExpBuffer();
+
+	const char *cp;
+	bool		need_quotes = false;
+
 	/*
 	 * These checks need to match the identifier production in scan.l. Don't
 	 * use islower() etc.
 	return id_return->data;
 }
 
+/*
+ * fmtQualifiedId - convert a qualified name to the proper format for
+ * the source database.
+ *
+ * Like fmtId, use the result before calling again.
+ *
+ * Since we call fmtId and it also uses getThreadLocalPQExpBuffer() we cannot
+ * use it until we're finished with calling fmtId().
+ */
+const char *
+fmtQualifiedId(int remoteVersion, const char *schema, const char *id)
+{
+	PQExpBuffer id_return;
+	PQExpBuffer lcl_pqexp = createPQExpBuffer();
+
+	/* Suppress schema name if fetching from pre-7.3 DB */
+	if (remoteVersion >= 70300 && schema && *schema)
+	{
+		appendPQExpBuffer(lcl_pqexp, "%s.", fmtId(schema));
+	}
+	appendPQExpBuffer(lcl_pqexp, "%s", fmtId(id));
+
+	id_return = getThreadLocalPQExpBuffer();
+
+	appendPQExpBuffer(id_return, "%s", lcl_pqexp->data);
+	destroyPQExpBuffer(lcl_pqexp);
+
+	return id_return->data;
+}
 
 /*
  * Convert a string value to an SQL string literal and append it to
 	va_list		ap;
 
 	va_start(ap, fmt);
-	vwrite_msg(modulename, fmt, ap);
+	on_exit_msg_func(modulename, fmt, ap);
 	va_end(ap);
 
 	exit_nicely(1);

src/bin/pg_dump/dumputils.h

 
 extern void init_parallel_dump_utils(void);
 extern const char *fmtId(const char *identifier);
+extern const char *fmtQualifiedId(int remoteVersion,
+								  const char *schema, const char *id);
 extern void appendStringLiteral(PQExpBuffer buf, const char *str,
 					int encoding, bool std_strings);
 extern void appendStringLiteralConn(PQExpBuffer buf, const char *str,
 extern void
 exit_horribly(const char *modulename, const char *fmt,...)
 __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3), noreturn));
+extern void (*on_exit_msg_func)(const char *modulename, const char *fmt, va_list ap)
+				__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 0)));
 extern void on_exit_nicely(on_exit_nicely_callback function, void *arg);
 extern void exit_nicely(int code) __attribute__((noreturn));
 

src/bin/pg_dump/pg_backup.h

 	int			minRemoteVersion;		/* allowable range */
 	int			maxRemoteVersion;
 
+	int			numWorkers;		/* number of parallel processes */
+	char	   *sync_snapshot_id;  /* sync snapshot id for parallel operation */
+
 	/* info needed for string escaping */
 	int			encoding;		/* libpq code for client_encoding */
 	bool		std_strings;	/* standard_conforming_strings */
+	char	   *use_role;		/* Issue SET ROLE to this */
 
 	/* error handling */
 	bool		exit_on_error;	/* whether to exit on SQL errors... */
 	int			suppressDumpWarnings;	/* Suppress output of WARNING entries
 										 * to stderr */
 	bool		single_txn;
-	int			number_of_jobs;
 
 	bool	   *idWanted;		/* array showing which dump IDs to emit */
 } RestoreOptions;
 
 extern RestoreOptions *NewRestoreOptions(void);
 
+/* We have one in pg_dump.c and another one in pg_restore.c */
+extern void _SetupWorker(Archive *AHX, RestoreOptions *ropt);
+
 /* Rearrange and filter TOC entries */
 extern void SortTocFromFile(Archive *AHX, RestoreOptions *ropt);
 

src/bin/pg_dump/pg_backup_archiver.c

 #include "pg_backup_db.h"
 #include "dumpmem.h"
 #include "dumputils.h"
+#include "parallel.h"
 
 #include <ctype.h>
+#include <fcntl.h>
 #include <unistd.h>
 #include <sys/stat.h>
 #include <sys/types.h>
 
 #include "libpq/libpq-fs.h"
 
-/*
- * Special exit values from worker children.  We reserve 0 for normal
- * success; 1 and other small values should be interpreted as crashes.
- */
-#define WORKER_CREATE_DONE		10
-#define WORKER_INHIBIT_DATA		11
-#define WORKER_IGNORED_ERRORS	12
-
-/*
- * Unix uses exit to return result from worker child, so function is void.
- * Windows thread result comes via function return.
- */
-#ifndef WIN32
-#define parallel_restore_result void
-#else
-#define parallel_restore_result DWORD
-#endif
-
-/* IDs for worker children are either PIDs or thread handles */
-#ifndef WIN32
-#define thandle pid_t
-#else
-#define thandle HANDLE
-#endif
-
-typedef struct ParallelStateEntry
-{
-#ifdef WIN32
-	unsigned int threadId;
-#else
-	pid_t		pid;
-#endif
-	ArchiveHandle *AH;
-} ParallelStateEntry;
-
-typedef struct ParallelState
-{
-	int			numWorkers;
-	ParallelStateEntry *pse;
-} ParallelState;
-
-/* Arguments needed for a worker child */
-typedef struct _restore_args
-{
-	ArchiveHandle *AH;
-	TocEntry   *te;
-	ParallelStateEntry *pse;
-} RestoreArgs;
-
-/* State for each parallel activity slot */
-typedef struct _parallel_slot
-{
-	thandle		child_id;
-	RestoreArgs *args;
-} ParallelSlot;
-
-typedef struct ShutdownInformation
-{
-	ParallelState *pstate;
-	Archive    *AHX;
-} ShutdownInformation;
-
-static ShutdownInformation shutdown_info;
-
-#define NO_SLOT (-1)
-
 #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
 #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
 
 static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
 static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
 static void buildTocEntryArrays(ArchiveHandle *AH);
-static TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id);
 static void _moveBefore(ArchiveHandle *AH, TocEntry *pos, TocEntry *te);
 static int	_discoverArchiveFormat(ArchiveHandle *AH);
 
 
 static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
 				  RestoreOptions *ropt, bool is_parallel);
-static void restore_toc_entries_parallel(ArchiveHandle *AH);
-static thandle spawn_restore(RestoreArgs *args);
-static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status);
-static bool work_in_progress(ParallelSlot *slots, int n_slots);
-static int	get_next_slot(ParallelSlot *slots, int n_slots);
+static void restore_toc_entries_prefork(ArchiveHandle *AH);
+static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
+										 TocEntry *pending_list);
+static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list);
 static void par_list_header_init(TocEntry *l);
 static void par_list_append(TocEntry *l, TocEntry *te);
 static void par_list_remove(TocEntry *te);
 static TocEntry *get_next_work_item(ArchiveHandle *AH,
 				   TocEntry *ready_list,
-				   ParallelSlot *slots, int n_slots);
-static parallel_restore_result parallel_restore(RestoreArgs *args);
+				   ParallelState *pstate);
 static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
-			   thandle worker, int status,
-			   ParallelSlot *slots, int n_slots);
+			   int worker, int status,
+			   ParallelState *pstate);
 static void fix_dependencies(ArchiveHandle *AH);
 static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
 static void repoint_table_dependencies(ArchiveHandle *AH);
 					TocEntry *ready_list);
 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
 static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
-static ArchiveHandle *CloneArchive(ArchiveHandle *AH);
-static void DeCloneArchive(ArchiveHandle *AH);
-
-static void setProcessIdentifier(ParallelStateEntry *pse, ArchiveHandle *AH);
-static void unsetProcessIdentifier(ParallelStateEntry *pse);
-static ParallelStateEntry *GetMyPSEntry(ParallelState *pstate);
-static void archive_close_connection(int code, void *arg);
-
 
 /*
  *	Wrapper functions.
 	/*
 	 * If we're going to do parallel restore, there are some restrictions.
 	 */
-	parallel_mode = (ropt->number_of_jobs > 1 && ropt->useDB);
+	parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
 	if (parallel_mode)
 	{
 		/* We haven't got round to making this work for all archive formats */
 	 * In parallel mode, turn control over to the parallel-restore logic.
 	 */
 	if (parallel_mode)
-		restore_toc_entries_parallel(AH);
+	{
+		ParallelState  *pstate;
+		TocEntry		pending_list;
+
+		par_list_header_init(&pending_list);
+
+		/* This runs PRE_DATA items and then disconnects from the database */
+		restore_toc_entries_prefork(AH);
+		Assert(AH->connection == NULL);
+
+		/* ParallelBackupStart() will actually fork the processes */
+		pstate = ParallelBackupStart(AH, ropt);
+		restore_toc_entries_parallel(AH, pstate, &pending_list);
+		ParallelBackupEnd(AH, pstate);
+
+		/* reconnect the master and see if we missed something */
+		restore_toc_entries_postfork(AH, &pending_list);
+		Assert(AH->connection != NULL);
+	}
 	else
 	{
 		for (te = AH->toc->next; te != AH->toc; te = te->next)
 restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
 				  RestoreOptions *ropt, bool is_parallel)
 {
-	int			retval = 0;
+	int			status = WORKER_OK;
 	teReqs		reqs;
 	bool		defnDumped;
 
 				if (ropt->noDataForFailedTables)
 				{
 					if (is_parallel)
-						retval = WORKER_INHIBIT_DATA;
+						status = WORKER_INHIBIT_DATA;
 					else
 						inhibit_data_for_failed_table(AH, te);
 				}
 				 * just set the return value.
 				 */
 				if (is_parallel)
-					retval = WORKER_CREATE_DONE;
+					status = WORKER_CREATE_DONE;
 				else
 					mark_create_done(AH, te);
 			}
 		}
 	}
 
-	return retval;
+	if (AH->public.n_errors > 0 && status == WORKER_OK)
+		status = WORKER_IGNORED_ERRORS;
+
+	return status;
 }
 
 /*
 	}
 }
 
-static TocEntry *
+TocEntry *
 getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
 {
 	/* build index arrays if we didn't already */
 	return AH;
 }
 
-
 void
-WriteDataChunks(ArchiveHandle *AH)
+WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
 {
 	TocEntry   *te;
-	StartDataPtr startPtr;
-	EndDataPtr	endPtr;
 
 	for (te = AH->toc->next; te != AH->toc; te = te->next)
 	{
-		if (te->dataDumper != NULL && (te->reqs & REQ_DATA) != 0)
-		{
-			AH->currToc = te;
-			/* printf("Writing data for %d (%x)\n", te->id, te); */
-
-			if (strcmp(te->desc, "BLOBS") == 0)
-			{
-				startPtr = AH->StartBlobsPtr;
-				endPtr = AH->EndBlobsPtr;
-			}
-			else
-			{
-				startPtr = AH->StartDataPtr;
-				endPtr = AH->EndDataPtr;
-			}
+		if (!te->dataDumper)
+			continue;
 
-			if (startPtr != NULL)
-				(*startPtr) (AH, te);
+		if ((te->reqs & REQ_DATA) == 0)
+			continue;
 
+		if (pstate && pstate->numWorkers > 1)
+		{
 			/*
-			 * printf("Dumper arg for %d is %x\n", te->id, te->dataDumperArg);
+			 * If we are in a parallel backup, then we are always the master
+			 * process.
 			 */
+			EnsureIdleWorker(AH, pstate);
+			Assert(GetIdleWorker(pstate) != NO_SLOT);
+			DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP);
+		}
+		else
+			WriteDataChunksForTocEntry(AH, te);
+	}
+	EnsureWorkersFinished(AH, pstate);
+}
 
-			/*
-			 * The user-provided DataDumper routine needs to call
-			 * AH->WriteData
-			 */
-			(*te->dataDumper) ((Archive *) AH, te->dataDumperArg);
+void
+WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
+{
+	StartDataPtr startPtr;
+	EndDataPtr	endPtr;
 
-			if (endPtr != NULL)
-				(*endPtr) (AH, te);
-			AH->currToc = NULL;
-		}
+	AH->currToc = te;
+
+	if (strcmp(te->desc, "BLOBS") == 0)
+	{
+		startPtr = AH->StartBlobsPtr;
+		endPtr = AH->EndBlobsPtr;
 	}
+	else
+	{
+		startPtr = AH->StartDataPtr;
+		endPtr = AH->EndDataPtr;
+	}
+
+	if (startPtr != NULL)
+		(*startPtr) (AH, te);
+
+	/*
+	 * The user-provided DataDumper routine needs to call
+	 * AH->WriteData
+	 */
+	(*te->dataDumper) ((Archive *) AH, te->dataDumperArg);
+
+	if (endPtr != NULL)
+		(*endPtr) (AH, te);
+
+	AH->currToc = NULL;
 }
 
 void
 		ahprintf(AH, "-- %s %s\n\n", msg, buf);
 }
 
-static void
-setProcessIdentifier(ParallelStateEntry *pse, ArchiveHandle *AH)
-{
-#ifdef WIN32
-	pse->threadId = GetCurrentThreadId();
-#else
-	pse->pid = getpid();
-#endif
-	pse->AH = AH;
-}
-
-static void
-unsetProcessIdentifier(ParallelStateEntry *pse)
-{
-#ifdef WIN32
-	pse->threadId = 0;
-#else
-	pse->pid = 0;
-#endif
-	pse->AH = NULL;
-}
-
-static ParallelStateEntry *
-GetMyPSEntry(ParallelState *pstate)
-{
-	int			i;
-
-	for (i = 0; i < pstate->numWorkers; i++)
-#ifdef WIN32
-		if (pstate->pse[i].threadId == GetCurrentThreadId())
-#else
-		if (pstate->pse[i].pid == getpid())
-#endif
-			return &(pstate->pse[i]);
-
-	return NULL;
-}
-
-static void
-archive_close_connection(int code, void *arg)
-{
-	ShutdownInformation *si = (ShutdownInformation *) arg;
-
-	if (si->pstate)
-	{
-		ParallelStateEntry *entry = GetMyPSEntry(si->pstate);
-
-		if (entry != NULL && entry->AH)
-			DisconnectDatabase(&(entry->AH->public));
-	}
-	else if (si->AHX)
-		DisconnectDatabase(si->AHX);
-}
-
-void
-on_exit_close_archive(Archive *AHX)
-{
-	shutdown_info.AHX = AHX;
-	on_exit_nicely(archive_close_connection, &shutdown_info);
-}
-
 /*
  * Main engine for parallel restore.
  *
  * RestoreArchive).
  */
 static void
-restore_toc_entries_parallel(ArchiveHandle *AH)
-{
-	RestoreOptions *ropt = AH->ropt;
-	int			n_slots = ropt->number_of_jobs;
-	ParallelSlot *slots;
-	int			work_status;
-	int			next_slot;
-	bool		skipped_some;
-	TocEntry	pending_list;
-	TocEntry	ready_list;
-	TocEntry   *next_work_item;
-	thandle		ret_child;
-	TocEntry   *te;
-	ParallelState *pstate;
-	int			i;
-
-	ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
-
-	slots = (ParallelSlot *) pg_malloc0(n_slots * sizeof(ParallelSlot));
-	pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
-	pstate->pse = (ParallelStateEntry *) pg_malloc0(n_slots * sizeof(ParallelStateEntry));
-	pstate->numWorkers = ropt->number_of_jobs;
-	for (i = 0; i < pstate->numWorkers; i++)
-		unsetProcessIdentifier(&(pstate->pse[i]));
+restore_toc_entries_prefork(ArchiveHandle *AH)
+  {
+    RestoreOptions *ropt = AH->ropt;
+    bool        skipped_some;
+    TocEntry   *next_work_item;
+  
+	ahlog(AH, 2, "entering restore_toc_entries_prefork\n");
 
 	/* Adjust dependency information */
 	fix_dependencies(AH);
 	 */
 	DisconnectDatabase(&AH->public);
 
-	/*
-	 * Set the pstate in the shutdown_info. The exit handler uses pstate if
-	 * set and falls back to AHX otherwise.
-	 */
-	shutdown_info.pstate = pstate;
-
 	/* blow away any transient state from the old connection */
 	if (AH->currUser)
 		free(AH->currUser);
 		free(AH->currTablespace);
 	AH->currTablespace = NULL;
 	AH->currWithOids = -1;
+}
+
+/*
+ * Main engine for parallel restore.
+ *
+ * Work is done in three phases.
+ * First we process all SECTION_PRE_DATA tocEntries, in a single connection,
+ * just as for a standard restore. This is done in restore_toc_entries_prefork().
+ * Second we process the remaining non-ACL steps in parallel worker children
+ * (threads on Windows, processes on Unix), these fork off and set up their
+ * connections before we call restore_toc_entries_parallel_forked.
+ * Finally we process all the ACL entries in a single connection (that happens
+ * back in RestoreArchive).
+ */
+static void
+restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
+							 TocEntry *pending_list)
+{
+	int			work_status;
+	bool		skipped_some;
+	TocEntry	ready_list;
+	TocEntry   *next_work_item;
+	int			ret_child;
+
+	ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
 
 	/*
-	 * Initialize the lists of pending and ready items.  After this setup, the
-	 * pending list is everything that needs to be done but is blocked by one
-	 * or more dependencies, while the ready list contains items that have no
-	 * remaining dependencies.	Note: we don't yet filter out entries that
-	 * aren't going to be restored.  They might participate in dependency
-	 * chains connecting entries that should be restored, so we treat them as
-	 * live until we actually process them.
+	 * Initialize the lists of ready items, the list for pending items has
+	 * already been initialized in the caller.  After this setup, the pending
+	 * list is everything that needs to be done but is blocked by one or more
+	 * dependencies, while the ready list contains items that have no remaining
+	 * dependencies. Note: we don't yet filter out entries that aren't going
+	 * to be restored. They might participate in dependency chains connecting
+	 * entries that should be restored, so we treat them as live until we
+	 * actually process them.
 	 */
-	par_list_header_init(&pending_list);
 	par_list_header_init(&ready_list);
 	skipped_some = false;
 	for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
 		}
 
 		if (next_work_item->depCount > 0)
-			par_list_append(&pending_list, next_work_item);
+			par_list_append(pending_list, next_work_item);
 		else
 			par_list_append(&ready_list, next_work_item);
 	}
 
 	ahlog(AH, 1, "entering main parallel loop\n");
 
-	while ((next_work_item = get_next_work_item(AH, &ready_list,
-												slots, n_slots)) != NULL ||
-		   work_in_progress(slots, n_slots))
+	while ((next_work_item = get_next_work_item(AH, &ready_list, pstate)) != NULL ||
+		   !IsEveryWorkerIdle(pstate))
 	{
 		if (next_work_item != NULL)
 		{
 				continue;
 			}
 
-			if ((next_slot = get_next_slot(slots, n_slots)) != NO_SLOT)
-			{
-				/* There is work still to do and a worker slot available */
-				thandle		child;
-				RestoreArgs *args;
+			ahlog(AH, 1, "launching item %d %s %s\n",
+				  next_work_item->dumpId,
+				  next_work_item->desc, next_work_item->tag);
 
-				ahlog(AH, 1, "launching item %d %s %s\n",
-					  next_work_item->dumpId,
-					  next_work_item->desc, next_work_item->tag);
-
-				par_list_remove(next_work_item);
+			par_list_remove(next_work_item);
 
-				/* this memory is dealloced in mark_work_done() */
-				args = pg_malloc(sizeof(RestoreArgs));
-				args->AH = CloneArchive(AH);
-				args->te = next_work_item;
-				args->pse = &pstate->pse[next_slot];
+			Assert(GetIdleWorker(pstate) != NO_SLOT);
+			DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE);
+		}
+		else
+			/* at least one child is working and we have nothing ready. */
+			Assert(!IsEveryWorkerIdle(pstate));
 
-				/* run the step in a worker child */
-				child = spawn_restore(args);
+		for (;;)
+		{
+			int nTerm = 0;
 
-				slots[next_slot].child_id = child;
-				slots[next_slot].args = args;
+			/*
+			 * In order to reduce dependencies as soon as possible and
+			 * especially to reap the status of workers who are working on
+			 * items that pending items depend on, we do a non-blocking check
+			 * for ended workers first.
+			 *
+			 * However, if we do not have any other work items currently that
+			 * workers can work on, we do not busy-loop here but instead
+			 * really wait for at least one worker to terminate. Hence we call
+			 * ListenToWorkers(..., ..., do_wait = true) in this case.
+			 */
+			ListenToWorkers(AH, pstate, !next_work_item);
 
-				continue;
+			while ((ret_child = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
+			{
+				nTerm++;
+				mark_work_done(AH, &ready_list, ret_child, work_status, pstate);
 			}
-		}
 
-		/*
-		 * If we get here there must be work being done.  Either there is no
-		 * work available to schedule (and work_in_progress returned true) or
-		 * there are no slots available.  So we wait for a worker to finish,
-		 * and process the result.
-		 */
-		ret_child = reap_child(slots, n_slots, &work_status);
+			/*
+			 * We need to make sure that we have an idle worker before re-running the
+			 * loop. If nTerm > 0 we already have that (quick check).
+			 */
+			if (nTerm > 0)
+				break;
 
-		if (WIFEXITED(work_status))
-		{
-			mark_work_done(AH, &ready_list,
-						   ret_child, WEXITSTATUS(work_status),
-						   slots, n_slots);
-		}
-		else
-		{
-			exit_horribly(modulename, "worker process crashed: status %d\n",
-						  work_status);
+			/* if nobody terminated, explicitly check for an idle worker */
+			if (GetIdleWorker(pstate) != NO_SLOT)
+				break;
+
+			/*
+			 * If we have no idle worker, read the result of one or more
+			 * workers and loop the loop to call ReapWorkerStatus() on them.
+			 */
+			ListenToWorkers(AH, pstate, true);
 		}
 	}
 
 	ahlog(AH, 1, "finished main parallel loop\n");
+}
 
-	/*
-	 * Remove the pstate again, so the exit handler will now fall back to
-	 * closing AH->connection again.
-	 */
-	shutdown_info.pstate = NULL;
+static void
+restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
+{
+	RestoreOptions *ropt = AH->ropt;
+	TocEntry   *te;
+
+	ahlog(AH, 2, "entering restore_toc_entries_postfork\n");
 
 	/*
 	 * Now reconnect the single parent connection.
 	 * dependencies, or some other pathological condition. If so, do it in the
 	 * single parent connection.
 	 */
-	for (te = pending_list.par_next; te != &pending_list; te = te->par_next)
+	for (te = pending_list->par_next; te != pending_list; te = te->par_next)
 	{
 		ahlog(AH, 1, "processing missed item %d %s %s\n",
 			  te->dumpId, te->desc, te->tag);
 }
 
 /*
- * create a worker child to perform a restore step in parallel
- */
-static thandle
-spawn_restore(RestoreArgs *args)
-{
-	thandle		child;
-
-	/* Ensure stdio state is quiesced before forking */
-	fflush(NULL);
-
-#ifndef WIN32
-	child = fork();
-	if (child == 0)
-	{
-		/* in child process */
-		parallel_restore(args);
-		exit_horribly(modulename,
-					  "parallel_restore should not return\n");
-	}
-	else if (child < 0)
-	{
-		/* fork failed */
-		exit_horribly(modulename,
-					  "could not create worker process: %s\n",
-					  strerror(errno));
-	}
-#else
-	child = (HANDLE) _beginthreadex(NULL, 0, (void *) parallel_restore,
-									args, 0, NULL);
-	if (child == 0)
-		exit_horribly(modulename,
-					  "could not create worker thread: %s\n",
-					  strerror(errno));
-#endif
-
-	return child;
-}
-
-/*
- *	collect status from a completed worker child
- */
-static thandle
-reap_child(ParallelSlot *slots, int n_slots, int *work_status)
-{
-#ifndef WIN32
-	/* Unix is so much easier ... */
-	return wait(work_status);
-#else
-	static HANDLE *handles = NULL;
-	int			hindex,
-				snum,
-				tnum;
-	thandle		ret_child;
-	DWORD		res;
-
-	/* first time around only, make space for handles to listen on */
-	if (handles == NULL)
-		handles = (HANDLE *) pg_malloc0(n_slots * sizeof(HANDLE));
-
-	/* set up list of handles to listen to */
-	for (snum = 0, tnum = 0; snum < n_slots; snum++)
-		if (slots[snum].child_id != 0)
-			handles[tnum++] = slots[snum].child_id;
-
-	/* wait for one to finish */
-	hindex = WaitForMultipleObjects(tnum, handles, false, INFINITE);
-
-	/* get handle of finished thread */
-	ret_child = handles[hindex - WAIT_OBJECT_0];
-
-	/* get the result */
-	GetExitCodeThread(ret_child, &res);
-	*work_status = res;
-
-	/* dispose of handle to stop leaks */
-	CloseHandle(ret_child);
-
-	return ret_child;
-#endif
-}
-
-/*
- * are we doing anything now?
- */
-static bool
-work_in_progress(ParallelSlot *slots, int n_slots)
-{
-	int			i;
-
-	for (i = 0; i < n_slots; i++)
-	{
-		if (slots[i].child_id != 0)
-			return true;
-	}
-	return false;
-}
-
-/*
- * find the first free parallel slot (if any).
- */
-static int
-get_next_slot(ParallelSlot *slots, int n_slots)
-{
-	int			i;
-
-	for (i = 0; i < n_slots; i++)
-	{
-		if (slots[i].child_id == 0)
-			return i;
-	}
-	return NO_SLOT;
-}
-
-
-/*
  * Check if te1 has an exclusive lock requirement for an item that te2 also
  * requires, whether or not te2's requirement is for an exclusive lock.
  */
  */
 static TocEntry *
 get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
-				   ParallelSlot *slots, int n_slots)
+				   ParallelState *pstate)
 {
 	bool		pref_non_data = false;	/* or get from AH->ropt */
 	TocEntry   *data_te = NULL;
 	{
 		int			count = 0;
 
-		for (k = 0; k < n_slots; k++)
-			if (slots[k].args->te != NULL &&
-				slots[k].args->te->section == SECTION_DATA)
+		for (k = 0; k < pstate->numWorkers; k++)
+			if (pstate->parallelSlot[k].args->te != NULL &&
+				pstate->parallelSlot[k].args->te->section == SECTION_DATA)
 				count++;
-		if (n_slots == 0 || count * 4 < n_slots)
+		if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers)
 			pref_non_data = false;
 	}
 
 		 * that a currently running item also needs lock on, or vice versa. If
 		 * so, we don't want to schedule them together.
 		 */
-		for (i = 0; i < n_slots && !conflicts; i++)
+		for (i = 0; i < pstate->numWorkers && !conflicts; i++)
 		{
 			TocEntry   *running_te;
 
-			if (slots[i].args == NULL)
+			if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING)
 				continue;
-			running_te = slots[i].args->te;
+			running_te = pstate->parallelSlot[i].args->te;
 
 			if (has_lock_conflicts(te, running_te) ||
 				has_lock_conflicts(running_te, te))
 /*
  * Restore a single TOC item in parallel with others
  *
- * this is the procedure run as a thread (Windows) or a
- * separate process (everything else).
+ * this is run in the worker, i.e. in a thread (Windows) or a separate process
+ * (everything else). A worker process executes several such work items during
+ * a parallel backup or restore. Once we terminate here and report back that
+ * our work is finished, the master process will assign us a new work item.
  */
-static parallel_restore_result
-parallel_restore(RestoreArgs *args)
+int
+parallel_restore(ParallelArgs *args)
 {
 	ArchiveHandle *AH = args->AH;
 	TocEntry   *te = args->te;
 	RestoreOptions *ropt = AH->ropt;
-	int			retval;
-
-	setProcessIdentifier(args->pse, AH);
-
-	/*
-	 * Close and reopen the input file so we have a private file pointer that
-	 * doesn't stomp on anyone else's file pointer, if we're actually going to
-	 * need to read from the file. Otherwise, just close it except on Windows,
-	 * where it will possibly be needed by other threads.
-	 *
-	 * Note: on Windows, since we are using threads not processes, the reopen
-	 * call *doesn't* close the original file pointer but just open a new one.
-	 */
-	if (te->section == SECTION_DATA)
-		(AH->ReopenPtr) (AH);
-#ifndef WIN32
-	else
-		(AH->ClosePtr) (AH);
-#endif
-
-	/*
-	 * We need our own database connection, too
-	 */
-	ConnectDatabase((Archive *) AH, ropt->dbname,
-					ropt->pghost, ropt->pgport, ropt->username,
-					ropt->promptPassword);
+	int			status;
 
 	_doSetFixedOutputState(AH);
 
-	/* Restore the TOC item */
-	retval = restore_toc_entry(AH, te, ropt, true);
-
-	/* And clean up */
-	DisconnectDatabase((Archive *) AH);
-	unsetProcessIdentifier(args->pse);
+	Assert(AH->connection != NULL);
 
-	/* If we reopened the file, we are done with it, so close it now */
-	if (te->section == SECTION_DATA)
-		(AH->ClosePtr) (AH);
+	AH->public.n_errors = 0;
 
-	if (retval == 0 && AH->public.n_errors)
-		retval = WORKER_IGNORED_ERRORS;
+	/* Restore the TOC item */
+	status = restore_toc_entry(AH, te, ropt, true);
 
-#ifndef WIN32
-	exit(retval);
-#else
-	return retval;
-#endif
+	return status;
 }
 
 
  */
 static void
 mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
-			   thandle worker, int status,
-			   ParallelSlot *slots, int n_slots)
+			   int worker, int status,
+			   ParallelState *pstate)
 {
 	TocEntry   *te = NULL;
-	int			i;
-
-	for (i = 0; i < n_slots; i++)
-	{
-		if (slots[i].child_id == worker)
-		{
-			slots[i].child_id = 0;
-			te = slots[i].args->te;
-			DeCloneArchive(slots[i].args->AH);
-			free(slots[i].args);
-			slots[i].args = NULL;
 
-			break;
-		}
-	}
+	te = pstate->parallelSlot[worker].args->te;
 
 	if (te == NULL)
 		exit_horribly(modulename, "could not find slot of finished worker\n");
 	}
 }
 
-
 /*
  * Clone and de-clone routines used in parallel restoration.
  *
  * Enough of the structure is cloned to ensure that there is no
  * conflict between different threads each with their own clone.
- *
- * These could be public, but no need at present.
  */
-static ArchiveHandle *
+ArchiveHandle *
 CloneArchive(ArchiveHandle *AH)
 {
 	ArchiveHandle *clone;
 	/* clone has its own error count, too */
 	clone->public.n_errors = 0;
 
+	/*
+	 * Connect our new clone object to the database:
+	 * In parallel restore the parent is already disconnected, because we can
+	 * connect the worker processes independently to the database (no snapshot
+	 * sync required).
+	 * In parallel backup we clone the parent's existing connection.
+	 */
+	if (AH->mode == archModeRead)
+	{
+		RestoreOptions *ropt = AH->ropt;
+		Assert(AH->connection == NULL);
+		/* this also sets clone->connection */
+		ConnectDatabase((Archive *) clone, ropt->dbname,
+					ropt->pghost, ropt->pgport, ropt->username,
+					ropt->promptPassword);
+	}
+	else
+	{
+		char	   *dbname;
+		char	   *pghost;
+		char	   *pgport;
+		char	   *username;
+		const char *encname;
+
+		Assert(AH->connection != NULL);
+
+		/*
+		 * Even though we are technically accessing the parent's database object
+		 * here, these functions are fine to be called like that because all just
+		 * return a pointer and do not actually send/receive any data to/from the
+		 * database.
+		 */
+		dbname = PQdb(AH->connection);
+		pghost = PQhost(AH->connection);
+		pgport = PQport(AH->connection);
+		username = PQuser(AH->connection);
+		encname = pg_encoding_to_char(AH->public.encoding);
+
+		/* this also sets clone->connection */
+		ConnectDatabase((Archive *) clone, dbname, pghost, pgport, username, TRI_NO);
+
+		/*
+		 * Set the same encoding, whatever we set here is what we got from
+		 * pg_encoding_to_char(), so we really shouldn't run into an error setting that
+		 * very same value. Also see the comment in SetupConnection().
+		 */
+		PQsetClientEncoding(clone->connection, encname);
+	}
+
 	/* Let the format-specific code have a chance too */
 	(clone->ClonePtr) (clone);
 
+	Assert(clone->connection != NULL);
 	return clone;
 }
 
  *
  * Note: we assume any clone-local connection was already closed.
  */
-static void
+void
 DeCloneArchive(ArchiveHandle *AH)
 {
 	/* Clear format-specific state */

src/bin/pg_dump/pg_backup_archiver.h

 #define K_OFFSET_POS_SET 2
 #define K_OFFSET_NO_DATA 3
 
+/*
+ * Special exit values from worker children.  We reserve 0 for normal
+ * success; 1 and other small values should be interpreted as crashes.
+ */
+#define WORKER_OK                     0
+#define WORKER_CREATE_DONE            10
+#define WORKER_INHIBIT_DATA           11
+#define WORKER_IGNORED_ERRORS         12
+
 struct _archiveHandle;
 struct _tocEntry;
+struct _restoreList;
+struct ParallelArgs;
+struct ParallelState;
+enum T_Action;
 
 typedef void (*ClosePtr) (struct _archiveHandle * AH);
 typedef void (*ReopenPtr) (struct _archiveHandle * AH);
 typedef void (*ClonePtr) (struct _archiveHandle * AH);
 typedef void (*DeClonePtr) (struct _archiveHandle * AH);
 
+typedef char *(*WorkerJobRestorePtr)(struct _archiveHandle * AH, struct _tocEntry * te);
+typedef char *(*WorkerJobDumpPtr)(struct _archiveHandle * AH, struct _tocEntry * te);
+typedef char *(*MasterStartParallelItemPtr)(struct _archiveHandle * AH, struct _tocEntry * te,
+											enum T_Action act);
+typedef int (*MasterEndParallelItemPtr)(struct _archiveHandle * AH, struct _tocEntry * te,
+										const char *str, enum T_Action act);
+
 typedef size_t (*CustomOutPtr) (struct _archiveHandle * AH, const void *buf, size_t len);
 
 typedef enum
 	StartBlobPtr StartBlobPtr;
 	EndBlobPtr EndBlobPtr;
 
+	MasterStartParallelItemPtr MasterStartParallelItemPtr;
+	MasterEndParallelItemPtr MasterEndParallelItemPtr;
+
+	WorkerJobDumpPtr WorkerJobDumpPtr;
+	WorkerJobRestorePtr WorkerJobRestorePtr;
+
 	ClonePtr ClonePtr;			/* Clone format-specific fields */
 	DeClonePtr DeClonePtr;		/* Clean up cloned fields */
 
 	char	   *archdbname;		/* DB name *read* from archive */
 	enum trivalue promptPassword;
 	char	   *savedPassword;	/* password for ropt->username, if known */
+	char	   *use_role;
 	PGconn	   *connection;
 	int			connectToDB;	/* Flag to indicate if direct DB connection is
 								 * required */
 	int			nLockDeps;		/* number of such dependencies */
 } TocEntry;
 
+extern int parallel_restore(struct ParallelArgs *args);
 extern void on_exit_close_archive(Archive *AHX);
 
 extern void warn_or_exit_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt,...) __attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 4)));
 extern void ReadHead(ArchiveHandle *AH);
 extern void WriteToc(ArchiveHandle *AH);
 extern void ReadToc(ArchiveHandle *AH);
-extern void WriteDataChunks(ArchiveHandle *AH);
+extern void WriteDataChunks(ArchiveHandle *AH, struct ParallelState *pstate);
+extern void WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te);
+extern ArchiveHandle *CloneArchive(ArchiveHandle *AH);
+extern void DeCloneArchive(ArchiveHandle *AH);
 
 extern teReqs TocIDRequired(ArchiveHandle *AH, DumpId id);
+TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id);
 extern bool checkSeek(FILE *fp);
 
 #define appendStringLiteralAHX(buf,str,AH) \
 
 void		ahlog(ArchiveHandle *AH, int level, const char *fmt,...) __attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 4)));
 
+#ifdef USE_ASSERT_CHECKING
+#define Assert(condition) \
+	if (!(condition)) \
+	{ \
+		write_msg(NULL, "Failed assertion in %s, line %d\n", \
+				  __FILE__, __LINE__); \
+		abort();\
+	}
+#else
+#define Assert(condition)
+#endif
+
 #endif

src/bin/pg_dump/pg_backup_custom.c

 #include "compress_io.h"
 #include "dumputils.h"
 #include "dumpmem.h"
+#include "parallel.h"
 
 /*--------
  * Routines in the format interface
 static void _Clone(ArchiveHandle *AH);
 static void _DeClone(ArchiveHandle *AH);
 
+static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act);
+static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act);
+char *_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
+
 typedef struct
 {
 	CompressorState *cs;
 	AH->ClonePtr = _Clone;
 	AH->DeClonePtr = _DeClone;
 
+	AH->MasterStartParallelItemPtr = _MasterStartParallelItem;
+	AH->MasterEndParallelItemPtr = _MasterEndParallelItem;
+
+	/* no parallel dump in the custom archive, only parallel restore */
+	AH->WorkerJobDumpPtr = NULL;
+	AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
+
 	/* Set up a private area. */
 	ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
 	AH->formatData = (void *) ctx;
 		tpos = ftello(AH->FH);
 		WriteToc(AH);
 		ctx->dataStart = _getFilePos(AH, ctx);
-		WriteDataChunks(AH);
+		WriteDataChunks(AH, NULL);
 
 		/*
 		 * If possible, re-write the TOC in order to update the data offset
 	free(ctx);
 }
 
+/*
+ * This function is executed in the child of a parallel backup for the
+ * custom format archive and dumps the actual data.
+ */
+char *
+_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
+{
+	/* short fixed-size string + some ID so far, this needs to be malloc'ed
+	 * instead of static because we work with threads on windows */
+	const int	buflen = 64;
+	char	   *buf = (char*) pg_malloc(buflen);
+	ParallelArgs pargs;
+	int			status;
+	lclTocEntry *tctx;
+
+	tctx = (lclTocEntry *) te->formatData;
+
+	pargs.AH = AH;
+	pargs.te = te;
+
+	status = parallel_restore(&pargs);
+
+	snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
+			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
+
+	return buf;
+}
+
+/*
+ * This function is executed in the parent process. Depending on the desired
+ * action (dump or restore) it creates a string that is understood by the
+ * _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the
+ * respective dump format.
+ */
+static char *
+_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act)
+{
+	/*
+	 * A static char is okay here, even on Windows because we call this
+	 * function only from one process (the master).
+	 */
+	static char			buf[64]; /* short fixed-size string + number */
+
+	/* no parallel dump in the custom archive format */
+	Assert(act == ACT_RESTORE);
+
+	snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
+
+	return buf;
+}
+
+/*
+ * This function is executed in the parent process. It analyzes the response of
+ * the _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the
+ * respective dump format.
+ */
+static int
+_MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
+{
+	DumpId		dumpId;
+	int			nBytes, status, n_errors;
+
+	/* no parallel dump in the custom archive */
+	Assert(act == ACT_RESTORE);
+
+	sscanf(str, "%u %u %u%n", &dumpId, &status, &n_errors, &nBytes);
+
+	Assert(nBytes == strlen(str));
+	Assert(dumpId == te->dumpId);
+
+	AH->public.n_errors += n_errors;
+
+	return status;
+}
+
 /*--------------------------------------------------
  * END OF FORMAT CALLBACKS
  *--------------------------------------------------

src/bin/pg_dump/pg_backup_db.c

 	PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
 }
 
+/*
+ * Close the connection to the database and also cancel off the query if we
+ * have one running.
+ */
 void
 DisconnectDatabase(Archive *AHX)
 {
 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
+	PGcancel   *cancel;
+	char		errbuf[1];
+
+	if (!AH->connection)
+		return;
 
-	PQfinish(AH->connection);	/* noop if AH->connection is NULL */
+	if (PQtransactionStatus(AH->connection) == PQTRANS_ACTIVE)
+	{
+		if ((cancel = PQgetCancel(AH->connection)))
+		{
+			PQcancel(cancel, errbuf, sizeof(errbuf));
+			PQfreeCancel(cancel);
+		}
+	}
+
+	PQfinish(AH->connection);
 	AH->connection = NULL;
 }
 

src/bin/pg_dump/pg_backup_directory.c

 #include "compress_io.h"
 #include "dumpmem.h"
 #include "dumputils.h"
+#include "parallel.h"
 
 #include <dirent.h>
 #include <sys/stat.h>
 	cfp		   *dataFH;			/* currently open data file */
 
 	cfp		   *blobsTocFH;		/* file handle for blobs.toc */
+	ParallelState *pstate;		/* for parallel backup / restore */
 } lclContext;
 
 typedef struct
 static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
 static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
 static void _CloseArchive(ArchiveHandle *AH);
+static void _ReopenArchive(ArchiveHandle *AH);
 static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
 
 static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
 static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
 static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt);
 
-static char *prependDirectory(ArchiveHandle *AH, const char *relativeFilename);
+static void _Clone(ArchiveHandle *AH);
+static void _DeClone(ArchiveHandle *AH);
 
+static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act);
+static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te,
+								  const char *str, T_Action act);
+static char *_WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te);
+static char *_WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te);
+
+static char *prependDirectory(ArchiveHandle *AH, char *buf,
+							  const char *relativeFilename);
 
 /*
  *	Init routine required by ALL formats. This is a global routine
 	AH->WriteBufPtr = _WriteBuf;
 	AH->ReadBufPtr = _ReadBuf;
 	AH->ClosePtr = _CloseArchive;
-	AH->ReopenPtr = NULL;
+	AH->ReopenPtr = _ReopenArchive;
 	AH->PrintTocDataPtr = _PrintTocData;
 	AH->ReadExtraTocPtr = _ReadExtraToc;
 	AH->WriteExtraTocPtr = _WriteExtraToc;
 	AH->EndBlobPtr = _EndBlob;
 	AH->EndBlobsPtr = _EndBlobs;
 
-	AH->ClonePtr = NULL;
-	AH->DeClonePtr = NULL;
+	AH->ClonePtr = _Clone;
+	AH->DeClonePtr = _DeClone;
+
+	AH->WorkerJobRestorePtr = _WorkerJobRestoreDirectory;
+	AH->WorkerJobDumpPtr = _WorkerJobDumpDirectory;
+
+	AH->MasterStartParallelItemPtr = _MasterStartParallelItem;
+	AH->MasterEndParallelItemPtr = _MasterEndParallelItem;
 
 	/* Set up our private context */
 	ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
 
 	if (AH->mode == archModeWrite)
 	{
-		if (mkdir(ctx->directory, 0700) < 0)
+		struct stat st;
+		bool is_empty = false;
+
+		/* we accept an empty existing directory */
+		if (stat(ctx->directory, &st) == 0 && S_ISDIR(st.st_mode))
+		{
+			DIR* dir = opendir(ctx->directory);
+			if (dir) {
+				struct dirent *d;
+				is_empty = true;
+				while ((d = readdir(dir))) {
+					if (strcmp(d->d_name, ".") != 0 && strcmp(d->d_name, "..") != 0)
+					{
+						is_empty = false;
+						break;
+					}
+				}
+				closedir(dir);
+			}
+		}
+
+		if (!is_empty && mkdir(ctx->directory, 0700) < 0)
 			exit_horribly(modulename, "could not create directory \"%s\": %s\n",
 						  ctx->directory, strerror(errno));
 	}
 	else
 	{							/* Read Mode */
-		char	   *fname;
+		char	   fname[MAXPGPATH];
 		cfp		   *tocFH;
 
-		fname = prependDirectory(AH, "toc.dat");
+		prependDirectory(AH, fname, "toc.dat");
 
 		tocFH = cfopen_read(fname, PG_BINARY_R);
 		if (tocFH == NULL)
 {
 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
 	lclContext *ctx = (lclContext *) AH->formatData;
-	char	   *fname;
+	char		fname[MAXPGPATH];
 
-	fname = prependDirectory(AH, tctx->filename);
+	prependDirectory(AH, fname, tctx->filename);
 
 	ctx->dataFH = cfopen_write(fname, PG_BINARY_W, AH->compression);
 	if (ctx->dataFH == NULL)
 	if (dLen == 0)
 		return 0;
 
+	/* Are we aborting? */
+	checkAborting(AH);
+
 	return cfwrite(data, dLen, ctx->dataFH);
 }
 
 		_LoadBlobs(AH, ropt);
 	else
 	{
-		char	   *fname = prependDirectory(AH, tctx->filename);
+		char		fname[MAXPGPATH];
 
+		prependDirectory(AH, fname, tctx->filename);
 		_PrintFileData(AH, fname, ropt);
 	}
 }
 {
 	Oid			oid;
 	lclContext *ctx = (lclContext *) AH->formatData;
-	char	   *fname;
+	char		fname[MAXPGPATH];
 	char		line[MAXPGPATH];
 
 	StartRestoreBlobs(AH);
 
-	fname = prependDirectory(AH, "blobs.toc");
+	prependDirectory(AH, fname, "blobs.toc");
 
 	ctx->blobsTocFH = cfopen_read(fname, PG_BINARY_R);
 
 	lclContext *ctx = (lclContext *) AH->formatData;
 	size_t		res;
 
+	/* Are we aborting? */
+	checkAborting(AH);
+
 	res = cfwrite(buf, len, ctx->dataFH);
 	if (res != len)
 		exit_horribly(modulename, "could not write to output file: %s\n",
 	if (AH->mode == archModeWrite)
 	{
 		cfp		   *tocFH;
-		char	   *fname = prependDirectory(AH, "toc.dat");
+		char		fname[MAXPGPATH];
+
+		prependDirectory(AH, fname, "toc.dat");
+
+		/* this will actually fork the processes for a parallel backup */
+		ctx->pstate = ParallelBackupStart(AH, NULL);
 
 		/* The TOC is always created uncompressed */
 		tocFH = cfopen_write(fname, PG_BINARY_W, 0);
 		if (cfclose(tocFH) != 0)
 			exit_horribly(modulename, "could not close TOC file: %s\n",
 						  strerror(errno));
-		WriteDataChunks(AH);
+		WriteDataChunks(AH, ctx->pstate);
+
+		ParallelBackupEnd(AH, ctx->pstate);
 	}
 	AH->FH = NULL;
 }
 
+/*
+ * Reopen the archive's file handle.
+ */
+static void
+_ReopenArchive(ArchiveHandle *AH)
+{
+	/*
+	 * Our TOC is in memory, our data files are opened by each child anyway as
+	 * they are separate. We support reopening the archive by just doing nothing.
+	 */
+}
 
 /*
  * BLOB support
 _StartBlobs(ArchiveHandle *AH, TocEntry *te)
 {
 	lclContext *ctx = (lclContext *) AH->formatData;
-	char	   *fname;
+	char		fname[MAXPGPATH];
 
-	fname = prependDirectory(AH, "blobs.toc");
+	prependDirectory(AH, fname, "blobs.toc");
 
 	/* The blob TOC file is never compressed */
 	ctx->blobsTocFH = cfopen_write(fname, "ab", 0);
 	ctx->blobsTocFH = NULL;
 }
 
-
+/*
+ * Gets a relative file name and prepends the output directory, writing the
+ * result to buf. The caller needs to make sure that buf is MAXPGPATH bytes
+ * big. Can't use a static char[MAXPGPATH] inside the function because we run
+ * multithreaded on Windows.
+ */
 static char *
-prependDirectory(ArchiveHandle *AH, const char *relativeFilename)
+prependDirectory(ArchiveHandle *AH, char *buf, const char *relativeFilename)
 {
 	lclContext *ctx = (lclContext *) AH->formatData;
-	static char buf[MAXPGPATH];
 	char	   *dname;
 
 	dname = ctx->directory;
 
 	return buf;
 }
+
+/*
+ * Clone format-specific fields during parallel restoration.
+ */
+static void
+_Clone(ArchiveHandle *AH)
+{
+	lclContext *ctx = (lclContext *) AH->formatData;
+
+	AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
+	memcpy(AH->formatData, ctx, sizeof(lclContext));
+	ctx = (lclContext *) AH->formatData;
+
+	/*
+	 * Note: we do not make a local lo_buf because we expect at most one BLOBS
+	 * entry per archive, so no parallelism is possible.  Likewise,
+	 * TOC-entry-local state isn't an issue because any one TOC entry is
+	 * touched by just one worker child.
+	 */
+
+	/*
+	 * We also don't copy the ParallelState pointer (pstate), only the master
+	 * process ever writes to it.
+	 */
+}
+
+static void
+_DeClone(ArchiveHandle *AH)
+{
+	lclContext *ctx = (lclContext *) AH->formatData;
+	free(ctx);
+}
+
+/*
+ * This function is executed in the parent process. Depending on the desired
+ * action (dump or restore) it creates a string that is understood by the
+ * _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the
+ * respective dump format.
+ */
+static char *
+_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act)
+{
+	/*
+	 * A static char is okay here, even on Windows because we call this
+	 * function only from one process (the master).
+	 */
+	static char	buf[64];
+
+	if (act == ACT_DUMP)
+		snprintf(buf, sizeof(buf), "DUMP %d", te->dumpId);
+	else if (act == ACT_RESTORE)
+		snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
+
+	return buf;
+}
+
+/*
+ * This function is executed in the child of a parallel backup for the
+ * directory archive and dumps the actual data.
+ *
+ * We are currently returning only the DumpId so theoretically we could
+ * make this function returning an int (or a DumpId). However, to
+ * facilitate further enhancements and because sooner or later we need to
+ * convert this to a string and send it via a message anyway, we stick with
+ * char *. It is parsed on the other side by the _EndMasterParallel()
+ * function of the respective dump format.
+ */
+static char *
+_WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te)
+{
+	/* short fixed-size string + some ID so far, this needs to be malloc'ed
+	 * instead of static because we work with threads on windows */
+	const int	buflen = 64;
+	char	   *buf = (char*) pg_malloc(buflen);
+	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
+
+	/* This should never happen */
+	if (!tctx)
+		exit_horribly(modulename, "Error during backup\n");
+
+	/*
+	 * This function returns void. We either fail and die horribly or succeed...
+	 * A failure will be detected by the parent when the child dies unexpectedly.
+	 */
+	WriteDataChunksForTocEntry(AH, te);
+
+	snprintf(buf, buflen, "OK DUMP %d", te->dumpId);
+
+	return buf;
+}
+
+/*
+ * This function is executed in the child of a parallel backup for the
+ * directory archive and dumps the actual data.
+ */
+static char *
+_WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te)
+{
+	/* short fixed-size string + some ID so far, this needs to be malloc'ed
+	 * instead of static because we work with threads on windows */
+	const int	buflen = 64;
+	char	   *buf = (char*) pg_malloc(buflen);
+	ParallelArgs pargs;
+	int			status;
+	lclTocEntry *tctx;
+
+	tctx = (lclTocEntry *) te->formatData;
+
+	pargs.AH = AH;
+	pargs.te = te;
+
+	status = parallel_restore(&pargs);
+
+	snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
+			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
+
+	return buf;
+}
+/*
+ * This function is executed in the parent process. It analyzes the response of
+ * the _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the
+ * respective dump format.
+ */
+static int
+_MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
+{
+	DumpId		dumpId;
+	int			nBytes, n_errors;
+	int			status = 0;
+
+	if (act == ACT_DUMP)
+	{
+		sscanf(str, "%u%n", &dumpId, &nBytes);
+
+		Assert(dumpId == te->dumpId);
+		Assert(nBytes == strlen(str));
+	}
+	else if (act == ACT_RESTORE)
+	{
+		sscanf(str, "%u %u %u%n", &dumpId, &status, &n_errors, &nBytes);
+
+		Assert(dumpId == te->dumpId);
+		Assert(nBytes == strlen(str));
+
+		AH->public.n_errors += n_errors;
+	}
+
+	return status;
+}

src/bin/pg_dump/pg_backup_tar.c

 	AH->ClonePtr = NULL;
 	AH->DeClonePtr = NULL;
 
+	AH->MasterStartParallelItemPtr = NULL;
+	AH->MasterEndParallelItemPtr = NULL;
+
+	AH->WorkerJobDumpPtr = NULL;
+	AH->WorkerJobRestorePtr = NULL;
+
 	/*
 	 * Set up some special context used in compressing data.
 	 */
 		/*
 		 * Now send the data (tables & blobs)
 		 */
-		WriteDataChunks(AH);
+		WriteDataChunks(AH, NULL);
 
 		/*
 		 * Now this format wants to append a script which does a full restore

src/bin/pg_dump/pg_dump.c

 static int	dump_inserts = 0;
 static int	column_inserts = 0;
 static int	no_security_labels = 0;
+static int  no_synchronized_snapshots = 0;
 static int	no_unlogged_table_data = 0;
 static int	serializable_deferrable = 0;
 
 static void selectSourceSchema(Archive *fout, const char *schemaName);
 static char *getFormattedTypeName(Archive *fout, Oid oid, OidOptions opts);
 static char *myFormatType(const char *typname, int32 typmod);
-static const char *fmtQualifiedId(Archive *fout,
-			   const char *schema, const char *id);
 static void getBlobs(Archive *fout);
 static void dumpBlob(Archive *fout, BlobInfo *binfo);
 static int	dumpBlobs(Archive *fout, void *arg);
 								DumpableObject *dobj,
 								const char *objlabel);
 static const char *getAttrName(int attrnum, TableInfo *tblInfo);
-static const char *fmtCopyColumnList(const TableInfo *ti);
+static const char *fmtCopyColumnList(const TableInfo *ti, PQExpBuffer buffer);
+static char *get_synchronized_snapshot(Archive *fout);
 static PGresult *ExecuteSqlQueryForSingleRow(Archive *fout, char *query);
 
 
 	int			numObjs;
 	DumpableObject *boundaryObjs;
 	int			i;
+	int			numWorkers = 1;
 	enum trivalue prompt_password = TRI_DEFAULT;
 	int			compressLevel = -1;
 	int			plainText = 0;
 		{"format", required_argument, NULL, 'F'},
 		{"host", required_argument, NULL, 'h'},
 		{"ignore-version", no_argument, NULL, 'i'},
+		{"jobs", 1, NULL, 'j'},
 		{"no-reconnect", no_argument, NULL, 'R'},
 		{"oids", no_argument, NULL, 'o'},
 		{"no-owner", no_argument, NULL, 'O'},
 		{"serializable-deferrable", no_argument, &serializable_deferrable, 1},
 		{"use-set-session-authorization", no_argument, &use_setsessauth, 1},
 		{"no-security-labels", no_argument, &no_security_labels, 1},
+		{"no-synchronized-snapshots", no_argument, &no_synchronized_snapshots, 1},
 		{"no-unlogged-table-data", no_argument, &no_unlogged_table_data, 1},
 
 		{NULL, 0, NULL, 0}
 
 	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_dump"));
 
+	/*
+	 * Initialize what we need for parallel execution, especially for thread
+	 * support on Windows.
+	 */
+	init_parallel_dump_utils();
+
 	g_verbose = false;
 
 	strcpy(g_comment_start, "-- ");
 		}
 	}
 
-	while ((c = getopt_long(argc, argv, "abcCE:f:F:h:in:N:oOp:RsS:t:T:U:vwWxZ:",
+	while ((c = getopt_long(argc, argv, "abcCE:f:F:h:ij:n:N:oOp:RsS:t:T:U:vwWxZ:",
 							long_options, &optindex)) != -1)
 	{
 		switch (c)
 				/* ignored, deprecated option */
 				break;
 
+			case 'j':			/* number of dump jobs */
+				numWorkers = atoi(optarg);
+				break;
+
 			case 'n':			/* include schema(s) */
 				simple_string_list_append(&schema_include_patterns, optarg);
 				include_everything = false;
 			compressLevel = 0;
 	}
 
+	/*
+	 * On Windows we can only have at most MAXIMUM_WAIT_OBJECTS (= 64 usually)
+	 * parallel jobs because that's the maximum limit for the
+	 * WaitForMultipleObjects() call.
+	 */
+	if (numWorkers <= 0
+#ifdef WIN32
+			|| numWorkers > MAXIMUM_WAIT_OBJECTS
+#endif
+		)
+		exit_horribly(NULL, "%s: invalid number of parallel jobs\n", progname);
+
+	/* Parallel backup only in the directory archive format so far */
+	if (archiveFormat != archDirectory && numWorkers > 1)
+		exit_horribly(NULL, "parallel backup only supported by the directory format\n");
+
 	/* Open the output file */
 	fout = CreateArchive(filename, archiveFormat, compressLevel, archiveMode);
 
 	fout->minRemoteVersion = 70000;
 	fout->maxRemoteVersion = (my_version / 100) * 100 + 99;