Commits

tbrugz committed cf0964e

datadump: multiple 'partitionBy'

Comments (0)

Files changed (2)

src/tbrugz/sqldump/datadump/DataDump.java

 			//List<String> partitionByCols = getPartitionCols(partitionByPattern);
 			
 			String partitionByStrId = "";
-			String partitionByStrIdOld = "";
+			//String partitionByStrIdOld = "";
 			
 			Boolean writeBOM = Utils.getPropBoolean(prop, PROP_DATADUMP_WRITEBOM, null);
 			
 						//log.info("header:: partitionby:: "+partitionByPattern);
 						List<String> partitionByCols = getPartitionCols(partitionByPattern);
 					
-						partitionByStrIdOld = partitionByStrId; 
+						//partitionByStrIdOld = partitionByStrId; 
 						partitionByStrId = getPartitionByStr(partitionByPattern, rs, partitionByCols);
 						String finalFilename = getFinalFilenameForAbstractFilename(filename, partitionByStrId);
 						//Writer w = getWriterForFilename(finalFilename, charset, false);
-						boolean newFilename = isSetNewFilename(writersOpened, finalFilename, charset, writeBOM);
-						Writer w = writersOpened.get(finalFilename);
+						boolean newFilename = isSetNewFilename(writersOpened, finalFilename, partitionByPattern, charset, writeBOM);
+						Writer w = writersOpened.get(getWriterMapKey(finalFilename, partitionByPattern));
 						if(newFilename) {
 							//should always be true
 							log.debug("new filename="+finalFilename+" [charset="+charset+"]");
 				}
 			}
 			
+			Map<String, String> lastPartitionIdByPartitionPattern = new HashMap<String, String>();
+			
 			//rows
 			long countInPartition = 0;
 			do {
-				for(String partitionByPattern: partitionByPatterns) {
+				//String lastPartitionByPattern = null;
+				for(int partIndex = 0; partIndex<partitionByPatterns.length ; partIndex++) {
+					String partitionByPattern = partitionByPatterns[partIndex];
 					//log.info("row:: partitionby:: "+partitionByPattern);
 					List<String> partitionByCols = getPartitionCols(partitionByPattern);
 					
-				partitionByStrIdOld = partitionByStrId; 
+				//partitionByStrIdOld = partitionByStrId; 
 				partitionByStrId = getPartitionByStr(partitionByPattern, rs, partitionByCols);
-				boolean partitionChanged = false;
-				if(!partitionByStrId.equals(partitionByStrIdOld)) {
+				//boolean partitionChanged = false;
+				/*if(!partitionByStrId.equals(partitionByStrIdOld)) {
 					partitionChanged = true;
 					countInPartition = 0;
 					log.debug("partitionId changed: from='"+partitionByStrIdOld+"' to='"+partitionByStrId+"'");
-				}
+				}*/
 				
 				for(int i=0;i<syntaxList.size();i++) {
 					DumpSyntax ds = syntaxList.get(i);
 						
 						String finalFilename = getFinalFilenameForAbstractFilename(filenameList.get(i), partitionByStrId);
 						//Writer w = getWriterForFilename(finalFilename, charset, true);
-						boolean newFilename = isSetNewFilename(writersOpened, finalFilename, charset, writeBOM);
-						Writer w = writersOpened.get(finalFilename);
-						if(partitionChanged) {
+						boolean newFilename = isSetNewFilename(writersOpened, finalFilename, partitionByPattern, charset, writeBOM);
+						Writer w = writersOpened.get(getWriterMapKey(finalFilename, partitionByPattern));
+						//if(partitionChanged) {
 							//for DumpSyntaxes that have buffer (like FFC)
-							String finalFilenameOld = getFinalFilenameForAbstractFilename(filenameList.get(i), partitionByStrIdOld);
-							ds.flushBuffer(writersOpened.get(finalFilenameOld));
+							//XXX String finalFilenameOld = getFinalFilenameForAbstractFilename(filenameList.get(i), partitionByStrIdOld);
+							//ds.flushBuffer(writersOpened.get(getWriterMapKey(finalFilenameOld, partitionByPattern)));
 							//XXX: write footer & close file here? (less simultaneous open-files)
+							
+							String lastPartitionId = lastPartitionIdByPartitionPattern.get(partitionByPattern);
+							if(lastPartitionId!=null && !partitionByStrId.equals(lastPartitionId)) {
+								String lastFinalFilename = getFinalFilenameForAbstractFilename(filenameList.get(i), lastPartitionId);
+								//log.info("partid>> "+lastPartitionId+" // "+partitionByStrId+" // "+lastFinalFilename);
+								closeWriter(writersOpened, writersSyntaxes, getWriterMapKey(lastFinalFilename, partitionByPattern));
+								removeWriter(writersOpened, writersSyntaxes, getWriterMapKey(lastFinalFilename, partitionByPattern));
+							}
+							
 							//closeWriter(writersOpened, writersSyntaxes, finalFilenameOld);
 							//removeWriter(writersOpened, writersSyntaxes, finalFilenameOld);
 							//w.flush();
-						}
+						//}
 						
 						if(newFilename) {
 							log.debug("new filename="+finalFilename+" [charset="+charset+"]");
 						//ds.dumpRow(rs, count, writerList.get(i));
 					}
 				}
+				//lastPartitionByPattern = partitionByPattern;
+				lastPartitionIdByPartitionPattern.put(partitionByPattern, partitionByStrId);
 				}
 				count++;
 				countInPartition++;
 			//footer
 			Set<String> filenames = writersOpened.keySet();
 			for(String filename: filenames) {
+				//for(String partitionByPattern: partitionByPatterns) {
 				closeWriter(writersOpened, writersSyntaxes, filename);
+					//removeWriter(writersOpened, writersSyntaxes, filename);
+				//}
 				/*Writer w = writersOpened.get(filename);
 				DumpSyntax ds = writersSyntaxes.get(filename);
 				try {
 			rs.close();
 	}
 	
-	static void closeWriter(Map<String, Writer> writersOpened, Map<String, DumpSyntax> writersSyntaxes, String filename) throws IOException {
-		Writer w = writersOpened.get(filename);
+	static void closeWriter(Map<String, Writer> writersOpened, Map<String, DumpSyntax> writersSyntaxes, String key) throws IOException {
+		Writer w = writersOpened.get(key);
+		String filename = key.substring(0, key.indexOf("$$"));
+		//String key = getWriterMapKey(filename, partitionByPattern);
+		
 		DumpSyntax ds = writersSyntaxes.get(filename);
 		try {
 			ds.dumpFooter(w);
+			w.close();
+			//log.info("closed stream; filename: "+filename);
 		}
 		catch(Exception e) {
-			log.warn("error closing stream: "+w+"; filename: "+filename);
+			log.warn("error closing stream: "+w+"; filename: "+filename, e);
 			log.debug("error closing stream: ", e);
 		}
-		w.close();
 	}
 	
-	static void removeWriter(Map<String, Writer> writersOpened, Map<String, DumpSyntax> writersSyntaxes, String filename) throws IOException {
-		Writer writerRemoved = writersOpened.remove(filename);
+	static void removeWriter(Map<String, Writer> writersOpened, Map<String, DumpSyntax> writersSyntaxes, String key) throws IOException {
+		String filename = key.substring(0, key.indexOf("$$"));
+		
+		Writer writerRemoved = writersOpened.remove(key);
 		if(writerRemoved==null) { log.warn("writer for file '"+filename+"' not found"); }
 		DumpSyntax syntaxRemoved = writersSyntaxes.remove(filename);
 		if(syntaxRemoved==null) { log.warn("syntax for file '"+filename+"' not found"); }
 		return w;
 	}*/
 	
-	static boolean isSetNewFilename(Map<String, Writer> writersOpened, String fname, String charset, Boolean writeBOM) throws UnsupportedEncodingException, FileNotFoundException {
-		if(! writersOpened.containsKey(fname)) {
+	static String getWriterMapKey(String fname, String partitionBy) {
+		return fname+"$$"+partitionBy;
+	}
+	
+	static boolean isSetNewFilename(Map<String, Writer> writersOpened, String fname, String partitionBy, String charset, Boolean writeBOM) throws UnsupportedEncodingException, FileNotFoundException {
+		String key = getWriterMapKey(fname, partitionBy);
+		if(! writersOpened.containsKey(key)) {
 			File f = new File(fname);
 			File parent = f.getParentFile();
 			if(!parent.isDirectory()) {
 			
 			OutputStreamWriter w = new OutputStreamWriter(new FileOutputStream(fname, false), encoder); //XXX: false: never append
 			writeBOMifNeeded(w, charset, writeBOM);
-			writersOpened.put(fname, w);
+			writersOpened.put(key, w);
 			//filesOpened.add(fname);
 			return true;
 		}

src_test/tbrugz/sqldump/datadump/DataDumpTest.java

 	public void testEncoding() throws IOException {
 		//DataDump dd = new DataDump();
 		Map<String, Writer> map = new HashMap<String, Writer>();
-		DataDump.isSetNewFilename(map, DIROUT+"t1-utf8.txt", "UTF-8", null);
-		DataDump.isSetNewFilename(map, DIROUT+"t1-iso8859.txt", "ISO-8859-1", null); //ISO8859_1
+		DataDump.isSetNewFilename(map, DIROUT+"t1-utf8.txt", "", "UTF-8", null);
+		DataDump.isSetNewFilename(map, DIROUT+"t1-iso8859.txt", "", "ISO-8859-1", null); //ISO8859_1
 		for(String s: map.keySet()) {
 			map.get(s).write("P�rto Al�gre");
 			map.get(s).close();