Commits

pcmanus  committed 360d1a2

Validate compaction strategy options

patch by dbrosius; reviewed by slebresne for CASSANDRA-4795

  • Participants
  • Parent commits ba6cd11

Comments (0)

Files changed (10)

  * Validate correctly selects on composite partition key (CASSANDRA-5122)
  * Fix exception when adding collection (CASSANDRA-5117)
  * Handle states for non-vnode clusters correctly (CASSANDRA-5127)
- * Refuse unrecognized replication strategy options (CASSANDRA-4795)
+ * Refuse unrecognized replication and compaction strategy options (CASSANDRA-4795)
  * Pick the correct value validator in sstable2json for cql3 tables (CASSANDRA-5134)
  * Validate login for describe_keyspace, describe_keyspaces and set_keyspace
    (CASSANDRA-5144)
       since 1.2.0. However, Cassandra 1.2.0 was not complaining if CQL3 was set
       through set_cql_version but the now CQL2 only methods were used. This is
       now the case.
+    - Queries that uses unrecognized or bad compaction or replication strategy
+      options are now refused (instead of simply logging a warning).
 
 
 1.2

File src/java/org/apache/cassandra/config/CFMetaData.java

 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.util.*;
 
             throw new ConfigurationException("subcolumncomparators do not match or are note compatible.");
     }
 
+    public static void validateCompactionOptions(Class<? extends AbstractCompactionStrategy> strategyClass, Map<String, String> options) throws ConfigurationException
+    {
+        try
+        {
+            if (options == null)
+                return;
+
+            Method validateMethod = strategyClass.getMethod("validateOptions", Map.class);
+            Map<String, String> unknownOptions = (Map<String, String>) validateMethod.invoke(null, options);
+            if (!unknownOptions.isEmpty())
+                throw new ConfigurationException(String.format("Properties specified %s are not understood by %s", unknownOptions.keySet(), strategyClass.getSimpleName()));
+        }
+        catch (NoSuchMethodException e)
+        {
+            logger.warn("Compaction Strategy {} does not have a static validateOptions method. Validation ignored", strategyClass.getName());
+        }
+        catch (InvocationTargetException e)
+        {
+            if (e.getTargetException() instanceof ConfigurationException)
+                throw (ConfigurationException) e.getTargetException();
+            throw new ConfigurationException("Failed to validate compaction options");
+        }
+        catch (Exception e)
+        {
+            throw new ConfigurationException("Failed to validate compaction options");
+        }
+    }
+
     public static Class<? extends AbstractCompactionStrategy> createCompactionStrategy(String className) throws ConfigurationException
     {
         className = className.contains(".") ? className : "org.apache.cassandra.db.compaction." + className;
-        return FBUtilities.classForName(className, "compaction strategy");
+        Class<AbstractCompactionStrategy> strategyClass = FBUtilities.classForName(className, "compaction strategy");
+        if (!AbstractCompactionStrategy.class.isAssignableFrom(strategyClass))
+            throw new ConfigurationException(String.format("Specified compaction strategy class (%s) is not derived from AbstractReplicationStrategy", className));
+
+        return strategyClass;
     }
 
     public AbstractCompactionStrategy createCompactionStrategyInstance(ColumnFamilyStore cfs)

File src/java/org/apache/cassandra/cql/CFPropDefs.java

     public final Map<String, String> compactionStrategyOptions = new HashMap<String, String>();
     public final Map<String, String> compressionParameters = new HashMap<String, String>();
 
-    public void validate() throws InvalidRequestException
+    public void validate() throws InvalidRequestException, ConfigurationException
     {
         compactionStrategyClass = CFMetaData.DEFAULT_COMPACTION_STRATEGY_CLASS;
 
                         KW_MINCOMPACTIONTHRESHOLD,
                         CFMetaData.DEFAULT_MIN_COMPACTION_THRESHOLD));
         }
+
+        CFMetaData.validateCompactionOptions(compactionStrategyClass, compactionStrategyOptions);
     }
 
     /** Map a keyword to the corresponding value */

File src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java

     /** Perform validation of parsed params */
     private void validate(List<ByteBuffer> variables) throws InvalidRequestException
     {
-        cfProps.validate();
-
         // Ensure that exactly one key has been specified.
         if (keyValidator.size() < 1)
             throw new InvalidRequestException("You must specify a PRIMARY KEY");
 
         try
         {
+            cfProps.validate();
             comparator = cfProps.getComparator();
         }
         catch (ConfigurationException e)

File src/java/org/apache/cassandra/cql3/CFPropDefs.java

 
             compactionStrategyClass = CFMetaData.createCompactionStrategy(strategy);
             compactionOptions.remove(COMPACTION_STRATEGY_CLASS_KEY);
+
+            CFMetaData.validateCompactionOptions(compactionStrategyClass, compactionOptions);
         }
     }
 
+    public Class<? extends AbstractCompactionStrategy> getCompactionStrategy()
+    {
+        return compactionStrategyClass;
+    }
+
     public Map<String, String> getCompactionOptions() throws SyntaxException
     {
         Map<String, String> compactionOptions = getMap(KW_COMPACTION);

File src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java

 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.SSTableReader;
 
+import com.google.common.collect.Sets;
+
 /**
  * Pluggable compaction strategy determines how SSTables get merged.
  *
     public final Map<String, String> options;
 
     protected final ColumnFamilyStore cfs;
-    protected final float tombstoneThreshold;
+    protected float tombstoneThreshold;
     protected long tombstoneCompactionInterval;
 
     protected AbstractCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
         this.cfs = cfs;
         this.options = options;
 
-        String optionValue = options.get(TOMBSTONE_THRESHOLD_OPTION);
-        tombstoneThreshold = optionValue == null ? DEFAULT_TOMBSTONE_THRESHOLD : Float.parseFloat(optionValue);
-        optionValue = options.get(TOMBSTONE_COMPACTION_INTERVAL_OPTION);
-        tombstoneCompactionInterval = optionValue == null ? DEFAULT_TOMBSTONE_COMPACTION_INTERVAL : Long.parseLong(optionValue);
-        if (tombstoneCompactionInterval < 0)
+        /* checks must be repeated here, as user supplied strategies might not call validateOptions directly */
+
+        try
+        {
+            validateOptions(options);
+            String optionValue = options.get(TOMBSTONE_THRESHOLD_OPTION);
+            tombstoneThreshold = optionValue == null ? DEFAULT_TOMBSTONE_THRESHOLD : Float.parseFloat(optionValue);
+            optionValue = options.get(TOMBSTONE_COMPACTION_INTERVAL_OPTION);
+            tombstoneCompactionInterval = optionValue == null ? DEFAULT_TOMBSTONE_COMPACTION_INTERVAL : Long.parseLong(optionValue);
+        }
+        catch (ConfigurationException e)
         {
-            logger.warn("tombstone_compaction_interval should not be negative({}). Using default value of {}.",
-                        tombstoneCompactionInterval, DEFAULT_TOMBSTONE_COMPACTION_INTERVAL);
+            logger.warn("Error setting compaction strategy options ({}), defaults will be used", e.getMessage());
+            tombstoneThreshold = DEFAULT_TOMBSTONE_THRESHOLD;
             tombstoneCompactionInterval = DEFAULT_TOMBSTONE_COMPACTION_INTERVAL;
         }
     }
             return remainingColumnsRatio * droppableRatio > tombstoneThreshold;
         }
     }
+
+    public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException
+    {
+        String threshold = options.get(TOMBSTONE_THRESHOLD_OPTION);
+        if (threshold != null)
+        {
+            try
+            {
+                float thresholdValue = Float.parseFloat(threshold);
+                if (thresholdValue < 0)
+                {
+                    throw new ConfigurationException(String.format("%s must be greater than 0, but was %d", TOMBSTONE_THRESHOLD_OPTION, thresholdValue));
+                }
+            }
+            catch (NumberFormatException e)
+            {
+                throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", threshold, TOMBSTONE_THRESHOLD_OPTION), e);
+            }
+        }
+
+        String interval = options.get(TOMBSTONE_COMPACTION_INTERVAL_OPTION);
+        if (interval != null)
+        {
+            try
+            {
+                long tombstoneCompactionInterval = Long.parseLong(interval);
+                if (tombstoneCompactionInterval < 0)
+                {
+                    throw new ConfigurationException(String.format("%s must be greater than 0, but was %d", TOMBSTONE_COMPACTION_INTERVAL_OPTION, tombstoneCompactionInterval));
+                }
+            }
+            catch (NumberFormatException e)
+            {
+                throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", interval, TOMBSTONE_COMPACTION_INTERVAL_OPTION), e);
+            }
+        }
+
+        Map<String, String> uncheckedOptions = new HashMap<String, String>(options);
+        uncheckedOptions.remove(TOMBSTONE_THRESHOLD_OPTION);
+        uncheckedOptions.remove(TOMBSTONE_COMPACTION_INTERVAL_OPTION);
+        return uncheckedOptions;
+    }
 }

File src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java

 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableScanner;
         int configuredMaxSSTableSize = 5;
         if (options != null)
         {
-            String value = options.containsKey(SSTABLE_SIZE_OPTION) ? options.get(SSTABLE_SIZE_OPTION) : null;
-            if (value != null)
-            {
-                try
-                {
-                    configuredMaxSSTableSize = Integer.parseInt(value);
-                }
-                catch (NumberFormatException ex)
-                {
-                    logger.warn(String.format("%s is not a parsable int (base10) for %s using default value",
-                                              value, SSTABLE_SIZE_OPTION));
-                }
-            }
+            String value = options.containsKey(SSTABLE_SIZE_OPTION) ? options.get(SSTABLE_SIZE_OPTION) : "5";
+            configuredMaxSSTableSize = Integer.parseInt(value);
         }
         maxSSTableSizeInMB = configuredMaxSSTableSize;
 
         }
         return null;
     }
+
+    public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException
+    {
+        Map<String, String> uncheckedOptions = AbstractCompactionStrategy.validateOptions(options);
+
+        String size = options.containsKey(SSTABLE_SIZE_OPTION) ? options.get(SSTABLE_SIZE_OPTION) : "1";
+        try
+        {
+            int ssSize = Integer.parseInt(size);
+            if (ssSize < 1)
+            {
+                throw new ConfigurationException(String.format("%s must be larger than 0, but was %s", SSTABLE_SIZE_OPTION, ssSize));
+            }
+        }
+        catch (NumberFormatException ex)
+        {
+            throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", size, SSTABLE_SIZE_OPTION), ex);
+        }
+
+        uncheckedOptions.remove(SSTABLE_SIZE_OPTION);
+
+        return uncheckedOptions;
+    }
 }

File src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java

 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.cql3.CFPropDefs;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.Pair;
 
         bucketLow = optionValue == null ? DEFAULT_BUCKET_LOW : Double.parseDouble(optionValue);
         optionValue = options.get(BUCKET_HIGH_KEY);
         bucketHigh = optionValue == null ? DEFAULT_BUCKET_HIGH : Double.parseDouble(optionValue);
-        if (bucketHigh <= bucketLow)
-        {
-            logger.warn("Bucket low/high marks for {} incorrect, using defaults.", cfs.getColumnFamilyName());
-            bucketLow = DEFAULT_BUCKET_LOW;
-            bucketHigh = DEFAULT_BUCKET_HIGH;
-        }
+
         cfs.setCompactionThresholds(cfs.metadata.getMinCompactionThreshold(), cfs.metadata.getMaxCompactionThreshold());
     }
 
         return Long.MAX_VALUE;
     }
 
+    public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException
+    {
+        Map<String, String> uncheckedOptions = AbstractCompactionStrategy.validateOptions(options);
+
+        String optionValue = options.get(MIN_SSTABLE_SIZE_KEY);
+        try
+        {
+            long minSSTableSize = optionValue == null ? DEFAULT_MIN_SSTABLE_SIZE : Long.parseLong(optionValue);
+            if (minSSTableSize < 0)
+            {
+                throw new ConfigurationException(String.format("%s must be non negative: %d", MIN_SSTABLE_SIZE_KEY, minSSTableSize));
+            }
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, MIN_SSTABLE_SIZE_KEY), e);
+        }
+
+        double bucketLow, bucketHigh;
+        optionValue = options.get(BUCKET_LOW_KEY);
+        try
+        {
+            bucketLow = optionValue == null ? DEFAULT_BUCKET_LOW : Double.parseDouble(optionValue);
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, DEFAULT_BUCKET_LOW), e);
+        }
+
+        optionValue = options.get(BUCKET_HIGH_KEY);
+        try
+        {
+            bucketHigh = optionValue == null ? DEFAULT_BUCKET_HIGH : Double.parseDouble(optionValue);
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, DEFAULT_BUCKET_HIGH), e);
+        }
+
+        if (bucketHigh <= bucketLow)
+        {
+            throw new ConfigurationException(String.format("BucketHigh value (%s) is less than or equal BucketLow value (%s)", bucketHigh, bucketLow));
+        }
+
+        uncheckedOptions.remove(MIN_SSTABLE_SIZE_KEY);
+        uncheckedOptions.remove(BUCKET_LOW_KEY);
+        uncheckedOptions.remove(BUCKET_HIGH_KEY);
+        uncheckedOptions.remove(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD);
+        uncheckedOptions.remove(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD);
+
+        return uncheckedOptions;
+    }
+
     public String toString()
     {
         return String.format("SizeTieredCompactionStrategy[%s/%s]",

File src/java/org/apache/cassandra/thrift/CassandraServer.java

             cState.hasKeyspaceAccess(keyspace, Permission.CREATE);
             cf_def.unsetId(); // explicitly ignore any id set by client (Hector likes to set zero)
             CFMetaData cfm = CFMetaData.fromThrift(cf_def);
+            CFMetaData.validateCompactionOptions(cfm.compactionStrategyClass, cfm.compactionStrategyOptions);
+
             cfm.addDefaultIndexNames();
             MigrationManager.announceNewColumnFamily(cfm);
             return Schema.instance.getVersion().toString();
 
             CFMetaData.applyImplicitDefaults(cf_def);
             CFMetaData cfm = CFMetaData.fromThrift(cf_def);
+            CFMetaData.validateCompactionOptions(cfm.compactionStrategyClass, cfm.compactionStrategyOptions);
             cfm.addDefaultIndexNames();
             MigrationManager.announceColumnFamilyUpdate(cfm);
             return Schema.instance.getVersion().toString();