1. tbrugz
  2. sqldump

Commits

tbrugz  committed 2093929

sqlrun/import: changes in failover strategy

  • Participants
  • Parent commits aea1d60
  • Branches default

Comments (0)

Files changed (1)

File src_run/tbrugz/sqldump/sqlrun/importers/AbstractImporter.java

View file
 import tbrugz.util.NonNullGetMap;
 
 public abstract class AbstractImporter {
+	
 	public static class IOCounter {
 		long input = 0;
 		long output = 0;
 			return "IOCounter[i="+input+";o="+output+"]";
 		}
 	}
+	
+	public enum FailoverIdSelectionStrategy {
+		CYCLE,
+		RESTART
+	}
 
 	static final Log log = LogFactory.getLog(AbstractImporter.class);
 
 	public long importData() throws SQLException, InterruptedException, IOException {
 		aggCountsByFailoverId = new NonNullGetMap<Integer, IOCounter>(new HashMap<Integer, IOCounter>(), IOCounter.class);
 		long ret = 0;
+		long filesImported = 0;
 		if(importFile!=null) {
 			log.info("importing file: "+importFile+" [maxfailoverid="+maxFailoverId+"]");
 			ret = importFile();
+			filesImported++;
 			addMapCount(aggCountsByFailoverId, countsByFailoverId);
 		}
 		else if(importFiles!=null) {
 					importFile = file;
 					log.debug("importing file: "+importFile);
 					ret += importFile();
+					filesImported++;
 					addMapCount(aggCountsByFailoverId, countsByFailoverId);
 				}
 			}
 		else {
 			log.warn("neither '"+SUFFIX_IMPORTFILE+"' nor '"+SUFFIX_IMPORTFILES+"' suffix specified...");
 		}
-		log.info("imported lines by failover id [all="+ret+"]:");//"imported lines = "+ret);
-		logCounts(aggCountsByFailoverId, true);
+		
+		if(filesImported>1) {
+			log.info("imported lines by failover id - all files [all="+ret+"]:");//"imported lines = "+ret);
+			logCounts(aggCountsByFailoverId, true);
+		}
 		
 		return ret;
 	}
 		countsByFailoverId = new NonNullGetMap<Integer, IOCounter>(new HashMap<Integer, IOCounter>(), IOCounter.class);
 		failoverId = 0;
 		IOCounter counter = countsByFailoverId.get(failoverId);
-
-		//assume all lines of same size (in number of columns?)
-		//FileReader fr = new FileReader(importFile);
-		/*int maxFailoverId = 0;
-		for(int i=1;;i++) {
-			String failoverKey = SQLRun.PREFIX_EXEC+execId+PREFIX_FAILOVER+i;
-			List<String> foids = Utils.getKeysStartingWith(prop, failoverKey);
-			if(foids==null || foids.size()==0) {
-				if(i>1) {
-					maxFailoverId = i-1;
-				}
-				break;
-			}
-			log.debug("foid: "+failoverKey);
-		}*/
+		//TODO: property for selecting failover strategy
+		FailoverIdSelectionStrategy failoverStrategy = FailoverIdSelectionStrategy.CYCLE;
 		
 		Scanner scan = createScanner();
 		
 		Pattern p = scan.delimiter();
 		log.debug("scan delimiter pattern: "+p);
 		log.info("input file: "+importFile);
-				//+(maxFailoverId>0?" [maxfailover="+maxFailoverId+"]":""));
 		
 		//PreparedStatement stmt = null;
 		//String stmtStrPrep = null;
 		//int[] filecol2tabcolMap = null;
 		do {
 			int linecounter = 0;
-			//failoverId = 0;
-			//int loopStartedWithFailoverId = failoverId;
 			while(scan.hasNext()) {
+				boolean importthisline = true;
+
+				//select strategy
+				if(failoverStrategy==FailoverIdSelectionStrategy.RESTART) {
+					if(failoverId > 0) {
+						failoverId = 0;
+						counter = countsByFailoverId.get(failoverId);
+						setImporterProperties(prop);
+						mustSetupSQLStatement = true;
+					}
+				}
 				int loopStartedWithFailoverId = failoverId;
-				boolean importthisline = true;
-				/*if(failoverId>0) {
-					failoverId = 0;
-					counter = countsByFailoverId.get(failoverId);
-					setDefaultImporterProperties(prop);
-					mustSetupSQLStatement = true;
-				}*/
+				
 				String line = scan.next();
 				linecounter++;
-				/*if(line.endsWith(recordDelimiter)) {
-					log.info("line1["+line.length()+"]: ["+line+"]");
-					line = line.substring(0,line.length()-recordDelimiter.length()-1);
-					log.info("line2["+line.length()+"]: ["+line+"]");
-				}*/
 				
 				while(importthisline) {
 					try {
 						if(failoverId > maxFailoverId) {
 							failoverId = 0;
 						}
+						counter = countsByFailoverId.get(failoverId);
 						setImporterProperties(prop);
 						mustSetupSQLStatement = true;
 						
 						//is last failover-id?
-						if(failoverId != loopStartedWithFailoverId) {
-							/*counter = countsByFailoverId.get(failoverId);
-							String failoverKey = SQLRun.PREFIX_EXEC+execId+PREFIX_FAILOVER+failoverId;
-							List<String> foids = Utils.getKeysStartingWith(prop, failoverKey);
-							if(foids!=null && foids.size()>0) {
-								//log.info("failover["+failoverId+"]: "+failoverKey);
-								setImporterProperties(prop);
-								/if(failoverId==0) {
-									setDefaultImporterProperties(prop);
-								}
-								else {
-									setImporterProperties(prop, failoverKey);
-								}* /
-								mustSetupSQLStatement = true;
-							}
-							else {
-								log.warn("should never occur! failoverkey="+failoverKey);
-							}*/
-						}
-						else {
-							//int previousFailoverId = failoverId-1;
-							//if(previousFailoverId<0) { previousFailoverId = maxFailoverId; }
-							//long linenumber = countsByFailoverId.get(previousFailoverId).input;
+						if(failoverId == loopStartedWithFailoverId) {
 							log.warn("error processing line "+linecounter+" ["+failoverId+/*"/"+previousFailoverId+*/"/"+maxFailoverId+"]: "+e.getMessage());
-							//log.debug("error processing line",e);
-							//e.printStackTrace();
 							importthisline = false;
 							//break;
 						}
 	}
 	
 	long logCounts(Map<Integer, IOCounter> ccMap, boolean alwaysShowId) {
-		long countAll = 0;
+		long countAllIn = 0, countAllOut = 0;
 		for(Integer id: ccMap.keySet()) {
 			IOCounter cc = ccMap.get(id);
 			if(cc.input>0 || cc.output>0) {
 				log.info( ((id==0&&!alwaysShowId)?"":"[failover="+id+"] ") +"processedLines: "+cc.input+" ; importedRows: "+cc.output);
-				countAll += cc.output;
+				countAllIn += cc.input;
+				countAllOut += cc.output;
 			}
 		}
-		return countAll;
+		log.info( "[failover=ALL] "+"processedLines: "+countAllIn+" ; importedRows: "+countAllOut);
+		return countAllOut;
 	}
 	
 	void procLineInternal(String line, boolean is1stloop) throws SQLException {