Commits

tbrugz  committed 2c6f6d1

datadump: added multiple 'partitionBy' (experimental)

  • Participants
  • Parent commits 4c45dde

Comments (0)

Files changed (2)

File src/tbrugz/sqldump/datadump/DataDump.java

 			String tableOrQueryId, String tableOrQueryName, String charset, 
 			long rowlimit,
 			List<DumpSyntax> syntaxList,
-			String partitionByPattern,
+			String[] partitionByPatterns,
 			List<String> keyColumns
 			) throws Exception {
 		
 			boolean hasData = rs.next();
 			//so empty tables do not create empty dump files
 			if(!hasData) return;
+			long count = 0;
 			
 			//String defaultFilename = prop.getProperty(PROP_DATADUMP_OUTFILEPATTERN);
 
+			Map<String, Writer> writersOpened = new HashMap<String, Writer>();
+			Map<String, DumpSyntax> writersSyntaxes = new HashMap<String, DumpSyntax>();
+			
+			//String partitionBy = prop.getProperty(PROP_DATADUMP_FILEPATTERN);
+			if(partitionByPatterns==null) { partitionByPatterns = new String[]{ "" }; }
+			
 			List<String> filenameList = new ArrayList<String>();
 			List<Boolean> doSyntaxDumpList = new ArrayList<Boolean>();
-			
-			//String partitionBy = prop.getProperty(PROP_DATADUMP_FILEPATTERN);
-			if(partitionByPattern==null) { partitionByPattern = ""; }
-			List<String> partitionByCols = getPartitionCols(partitionByPattern);
+			//List<String> partitionByCols = getPartitionCols(partitionByPattern);
 			
 			String partitionByStrId = "";
 			String partitionByStrIdOld = "";
 			
-			Map<String, Writer> writersOpened = new HashMap<String, Writer>();
-			Map<String, DumpSyntax> writersSyntaxes = new HashMap<String, DumpSyntax>();
-			
 			Boolean writeBOM = Utils.getPropBoolean(prop, PROP_DATADUMP_WRITEBOM, null);
 			
 			boolean log1stRow = Utils.getPropBool(prop, PROP_DATADUMP_LOG_1ST_ROW, true);
 			//XXXdone: prop for setting 'logEachXRows'
+			boolean logNumberOfOpenedWriters = true;
 			long logEachXRows = Utils.getPropLong(prop, PROP_DATADUMP_LOG_EACH_X_ROWS, LOG_EACH_X_ROWS_DEFAULT);
 
 			//header
 					//writerList.set(i, new OutputStreamWriter(new FileOutputStream(filename, alreadyOpened), charset));
 					filenameList.set(i, filename);
 
-					partitionByStrIdOld = partitionByStrId; 
-					partitionByStrId = getPartitionByStr(partitionByPattern, rs, partitionByCols);
-					String finalFilename = getFinalFilenameForAbstractFilename(filename, partitionByStrId);
-					//Writer w = getWriterForFilename(finalFilename, charset, false);
-					boolean newFilename = isSetNewFilename(writersOpened, finalFilename, charset, writeBOM);
-					Writer w = writersOpened.get(finalFilename);
-					if(newFilename) {
-						//should always be true
-						log.debug("new filename="+finalFilename+" [charset="+charset+"]");
+					//for each partitionBy...
+					for(String partitionByPattern: partitionByPatterns) {
+						//log.info("header:: partitionby:: "+partitionByPattern);
+						List<String> partitionByCols = getPartitionCols(partitionByPattern);
+					
+						partitionByStrIdOld = partitionByStrId; 
+						partitionByStrId = getPartitionByStr(partitionByPattern, rs, partitionByCols);
+						String finalFilename = getFinalFilenameForAbstractFilename(filename, partitionByStrId);
+						//Writer w = getWriterForFilename(finalFilename, charset, false);
+						boolean newFilename = isSetNewFilename(writersOpened, finalFilename, charset, writeBOM);
+						Writer w = writersOpened.get(finalFilename);
+						if(newFilename) {
+							//should always be true
+							log.debug("new filename="+finalFilename+" [charset="+charset+"]");
+						}
+						else {
+							log.warn("filename '"+finalFilename+"' shouldn't have been already opened...");
+						}
+	
+						writersSyntaxes.put(finalFilename, ds);
+						ds.dumpHeader(w);
 					}
-					else {
-						log.warn("filename '"+finalFilename+"' shouldn't have been already opened...");
-					}
-
-					writersSyntaxes.put(finalFilename, ds);
-					ds.dumpHeader(w);
 					//ds.dumpHeader(writerList.get(i));
 				}
 			}
 			
 			//rows
-			long count = 0;
 			long countInPartition = 0;
 			do {
+				for(String partitionByPattern: partitionByPatterns) {
+					//log.info("row:: partitionby:: "+partitionByPattern);
+					List<String> partitionByCols = getPartitionCols(partitionByPattern);
+					
 				partitionByStrIdOld = partitionByStrId; 
 				partitionByStrId = getPartitionByStr(partitionByPattern, rs, partitionByCols);
 				boolean partitionChanged = false;
 							String finalFilenameOld = getFinalFilenameForAbstractFilename(filenameList.get(i), partitionByStrIdOld);
 							ds.flushBuffer(writersOpened.get(finalFilenameOld));
 							//XXX: write footer & close file here? (less simultaneous open-files)
-							closeWriter(writersOpened, writersSyntaxes, finalFilenameOld);
-							removeWriter(writersOpened, writersSyntaxes, finalFilenameOld);
+							//closeWriter(writersOpened, writersSyntaxes, finalFilenameOld);
+							//removeWriter(writersOpened, writersSyntaxes, finalFilenameOld);
 							//w.flush();
 						}
 						
 						//ds.dumpRow(rs, count, writerList.get(i));
 					}
 				}
+				}
 				count++;
 				countInPartition++;
 				
+				//XXX: too many opened writers? maybe they should be divided by 'partitionBy' and cleaned each time
+				
 				if(log1stRow && count==1) {
 					logRow.info("[qid="+tableOrQueryId+"] 1st row dumped" + 
 						" ["+(System.currentTimeMillis()-initTime)+"ms elapsed]");
 				}
 				if( (logEachXRows>0) && (count%logEachXRows==0) ) { 
-					logRow.info("[qid="+tableOrQueryId+"] "+count+" rows dumped");
+					logRow.info("[qid="+tableOrQueryId+"] "+count+" rows dumped"
+							+(logNumberOfOpenedWriters?" ["+writersOpened.size()+" opened writers]":"") );
 				}
 				if(rowlimit<=count) { break; }
 			}
 			while(rs.next());
+			
+			//} //end for partitionby
+			
 			log.info("dumped "+count+" rows from table/query: "+tableOrQueryName + 
 				(rs.next()?" (more rows exists)":"") + 
 				" ["+(System.currentTimeMillis()-initTime)+"ms elapsed]");

File src/tbrugz/sqldump/datadump/SQLQueries.java

 			Long tablerowlimit = Utils.getPropLong(prop, "sqldump.query."+qid+".rowlimit");
 			long rowlimit = tablerowlimit!=null?tablerowlimit:globalRowLimit!=null?globalRowLimit:Long.MAX_VALUE;
 			
-			String partitionBy = prop.getProperty("sqldump.query."+qid+".partitionby");
+			//String partitionBy = prop.getProperty("sqldump.query."+qid+".partitionby");
+			List<String> partitionsBy = Utils.getStringListFromProp(prop, "sqldump.query."+qid+".partitionby", "\\|");
+			//if(partitionsBy==null) { partitionsBy = new ArrayList<String>(); }
+			log.info("partitions: "+partitionsBy);
 
 			List<String> keyCols = Utils.getStringListFromProp(prop, "sqldump.query."+qid+".keycols", ",");
 
 			if(runQueries) {
 				try {
 					log.debug("running query [id="+qid+"; name="+queryName+"]: "+sql);
-					dd.runQuery(conn, sql, params, prop, qid, queryName, charset, rowlimit, syntaxList, partitionBy, keyCols);
+					dd.runQuery(conn, sql, params, prop, qid, queryName, charset, rowlimit, syntaxList, 
+							partitionsBy!=null ? partitionsBy.toArray(new String[]{}) : null, 
+							keyCols);
 				} catch (Exception e) {
 					log.warn("error on query '"+qid+"'\n... sql: "+sql+"\n... exception: "+String.valueOf(e).trim());
 					log.info("error on query "+qid+": "+e.getMessage(), e);