Commits

Anonymous committed edf63d8

Support multiple outputs in BOF
Patch by Michael Kjellman, reviewed by brandonwilliams for
CASSANDRA-4912

Comments (0)

Files changed (2)

src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java

 
     private void checkOutputSpecs(Configuration conf)
     {
-        if (ConfigHelper.getOutputKeyspace(conf) == null || ConfigHelper.getOutputColumnFamily(conf) == null)
+        if (ConfigHelper.getOutputKeyspace(conf) == null)
         {
-            throw new UnsupportedOperationException("you must set the keyspace and columnfamily with setColumnFamily()");
+            throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()");
         }
     }
 

src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java

     private final Logger logger = LoggerFactory.getLogger(BulkRecordWriter.class);
     private SSTableSimpleUnsortedWriter writer;
     private SSTableLoader loader;
-    private final File outputdir;
+    private File outputdir;
     private Progressable progress;
     private int maxFailures;
 
         this.conf = conf;
         DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0")));
         maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
-        String keyspace = ConfigHelper.getOutputKeyspace(conf);
-        outputdir = new File(getOutputLocation() + File.separator + keyspace + File.separator + ConfigHelper.getOutputColumnFamily(conf)); //dir must be named by ks/cf for the loader
-        outputdir.mkdirs();
     }
 
     private String getOutputLocation() throws IOException
        }
     }
 
-    private void prepareWriter()
+    private void prepareWriter() throws IOException
     {
+        if (outputdir == null)
+        {
+            String keyspace = ConfigHelper.getOutputKeyspace(conf);
+            //dir must be named by ks/cf for the loader
+            outputdir = new File(getOutputLocation() + File.separator + keyspace + File.separator + ConfigHelper.getOutputColumnFamily(conf));
+            outputdir.mkdirs();
+        }
+        
         if (writer == null)
         {
             AbstractType<?> subcomparator = null;