1. mirror
  2. Cassandra

Commits

Vijay Parthasarathy  committed c25a6a1

Speculative execution for Reads
Patch by Vijay, reviewed by jbellis for CASSANDRA-4705

  • Participants
  • Parent commits 34f0852
  • Branches trunk

Comments (0)

Files changed (19)

File interface/cassandra.thrift

View file
     38: optional i32 memtable_flush_period_in_ms,
     39: optional i32 default_time_to_live,
     40: optional i32 index_interval,
+    41: optional string speculative_retry="NONE",
 
     /* All of the following are now ignored and unsupplied. */
 

File interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java

View file
   private static final org.apache.thrift.protocol.TField MEMTABLE_FLUSH_PERIOD_IN_MS_FIELD_DESC = new org.apache.thrift.protocol.TField("memtable_flush_period_in_ms", org.apache.thrift.protocol.TType.I32, (short)38);
   private static final org.apache.thrift.protocol.TField DEFAULT_TIME_TO_LIVE_FIELD_DESC = new org.apache.thrift.protocol.TField("default_time_to_live", org.apache.thrift.protocol.TType.I32, (short)39);
   private static final org.apache.thrift.protocol.TField INDEX_INTERVAL_FIELD_DESC = new org.apache.thrift.protocol.TField("index_interval", org.apache.thrift.protocol.TType.I32, (short)40);
+  private static final org.apache.thrift.protocol.TField SPECULATIVE_RETRY_FIELD_DESC = new org.apache.thrift.protocol.TField("speculative_retry", org.apache.thrift.protocol.TType.STRING, (short)41);
   private static final org.apache.thrift.protocol.TField ROW_CACHE_SIZE_FIELD_DESC = new org.apache.thrift.protocol.TField("row_cache_size", org.apache.thrift.protocol.TType.DOUBLE, (short)9);
   private static final org.apache.thrift.protocol.TField KEY_CACHE_SIZE_FIELD_DESC = new org.apache.thrift.protocol.TField("key_cache_size", org.apache.thrift.protocol.TType.DOUBLE, (short)11);
   private static final org.apache.thrift.protocol.TField ROW_CACHE_SAVE_PERIOD_IN_SECONDS_FIELD_DESC = new org.apache.thrift.protocol.TField("row_cache_save_period_in_seconds", org.apache.thrift.protocol.TType.I32, (short)19);
   public int memtable_flush_period_in_ms; // optional
   public int default_time_to_live; // optional
   public int index_interval; // optional
+  public String speculative_retry; // optional
   /**
    * @deprecated
    */
     MEMTABLE_FLUSH_PERIOD_IN_MS((short)38, "memtable_flush_period_in_ms"),
     DEFAULT_TIME_TO_LIVE((short)39, "default_time_to_live"),
     INDEX_INTERVAL((short)40, "index_interval"),
+    SPECULATIVE_RETRY((short)41, "speculative_retry"),
     /**
      * @deprecated
      */
           return DEFAULT_TIME_TO_LIVE;
         case 40: // INDEX_INTERVAL
           return INDEX_INTERVAL;
+        case 41: // SPECULATIVE_RETRY
+          return SPECULATIVE_RETRY;
         case 9: // ROW_CACHE_SIZE
           return ROW_CACHE_SIZE;
         case 11: // KEY_CACHE_SIZE
   private static final int __MERGE_SHARDS_CHANCE_ISSET_ID = 18;
   private static final int __ROW_CACHE_KEYS_TO_SAVE_ISSET_ID = 19;
   private int __isset_bitfield = 0;
-  private _Fields optionals[] = {_Fields.COLUMN_TYPE,_Fields.COMPARATOR_TYPE,_Fields.SUBCOMPARATOR_TYPE,_Fields.COMMENT,_Fields.READ_REPAIR_CHANCE,_Fields.COLUMN_METADATA,_Fields.GC_GRACE_SECONDS,_Fields.DEFAULT_VALIDATION_CLASS,_Fields.ID,_Fields.MIN_COMPACTION_THRESHOLD,_Fields.MAX_COMPACTION_THRESHOLD,_Fields.REPLICATE_ON_WRITE,_Fields.KEY_VALIDATION_CLASS,_Fields.KEY_ALIAS,_Fields.COMPACTION_STRATEGY,_Fields.COMPACTION_STRATEGY_OPTIONS,_Fields.COMPRESSION_OPTIONS,_Fields.BLOOM_FILTER_FP_CHANCE,_Fields.CACHING,_Fields.DCLOCAL_READ_REPAIR_CHANCE,_Fields.MEMTABLE_FLUSH_PERIOD_IN_MS,_Fields.DEFAULT_TIME_TO_LIVE,_Fields.INDEX_INTERVAL,_Fields.ROW_CACHE_SIZE,_Fields.KEY_CACHE_SIZE,_Fields.ROW_CACHE_SAVE_PERIOD_IN_SECONDS,_Fields.KEY_CACHE_SAVE_PERIOD_IN_SECONDS,_Fields.MEMTABLE_FLUSH_AFTER_MINS,_Fields.MEMTABLE_THROUGHPUT_IN_MB,_Fields.MEMTABLE_OPERATIONS_IN_MILLIONS,_Fields.MERGE_SHARDS_CHANCE,_Fields.ROW_CACHE_PROVIDER,_Fields.ROW_CACHE_KEYS_TO_SAVE};
+  private _Fields optionals[] = {_Fields.COLUMN_TYPE,_Fields.COMPARATOR_TYPE,_Fields.SUBCOMPARATOR_TYPE,_Fields.COMMENT,_Fields.READ_REPAIR_CHANCE,_Fields.COLUMN_METADATA,_Fields.GC_GRACE_SECONDS,_Fields.DEFAULT_VALIDATION_CLASS,_Fields.ID,_Fields.MIN_COMPACTION_THRESHOLD,_Fields.MAX_COMPACTION_THRESHOLD,_Fields.REPLICATE_ON_WRITE,_Fields.KEY_VALIDATION_CLASS,_Fields.KEY_ALIAS,_Fields.COMPACTION_STRATEGY,_Fields.COMPACTION_STRATEGY_OPTIONS,_Fields.COMPRESSION_OPTIONS,_Fields.BLOOM_FILTER_FP_CHANCE,_Fields.CACHING,_Fields.DCLOCAL_READ_REPAIR_CHANCE,_Fields.MEMTABLE_FLUSH_PERIOD_IN_MS,_Fields.DEFAULT_TIME_TO_LIVE,_Fields.INDEX_INTERVAL,_Fields.SPECULATIVE_RETRY,_Fields.ROW_CACHE_SIZE,_Fields.KEY_CACHE_SIZE,_Fields.ROW_CACHE_SAVE_PERIOD_IN_SECONDS,_Fields.KEY_CACHE_SAVE_PERIOD_IN_SECONDS,_Fields.MEMTABLE_FLUSH_AFTER_MINS,_Fields.MEMTABLE_THROUGHPUT_IN_MB,_Fields.MEMTABLE_OPERATIONS_IN_MILLIONS,_Fields.MERGE_SHARDS_CHANCE,_Fields.ROW_CACHE_PROVIDER,_Fields.ROW_CACHE_KEYS_TO_SAVE};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
     tmpMap.put(_Fields.INDEX_INTERVAL, new org.apache.thrift.meta_data.FieldMetaData("index_interval", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
+    tmpMap.put(_Fields.SPECULATIVE_RETRY, new org.apache.thrift.meta_data.FieldMetaData("speculative_retry", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     tmpMap.put(_Fields.ROW_CACHE_SIZE, new org.apache.thrift.meta_data.FieldMetaData("row_cache_size", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE)));
     tmpMap.put(_Fields.KEY_CACHE_SIZE, new org.apache.thrift.meta_data.FieldMetaData("key_cache_size", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
 
     this.dclocal_read_repair_chance = 0;
 
+    this.speculative_retry = "NONE";
+
   }
 
   public CfDef(
     this.memtable_flush_period_in_ms = other.memtable_flush_period_in_ms;
     this.default_time_to_live = other.default_time_to_live;
     this.index_interval = other.index_interval;
+    if (other.isSetSpeculative_retry()) {
+      this.speculative_retry = other.speculative_retry;
+    }
     this.row_cache_size = other.row_cache_size;
     this.key_cache_size = other.key_cache_size;
     this.row_cache_save_period_in_seconds = other.row_cache_save_period_in_seconds;
     this.default_time_to_live = 0;
     setIndex_intervalIsSet(false);
     this.index_interval = 0;
+    this.speculative_retry = "NONE";
+
     setRow_cache_sizeIsSet(false);
     this.row_cache_size = 0.0;
     setKey_cache_sizeIsSet(false);
     __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __INDEX_INTERVAL_ISSET_ID, value);
   }
 
+  public String getSpeculative_retry() {
+    return this.speculative_retry;
+  }
+
+  public CfDef setSpeculative_retry(String speculative_retry) {
+    this.speculative_retry = speculative_retry;
+    return this;
+  }
+
+  public void unsetSpeculative_retry() {
+    this.speculative_retry = null;
+  }
+
+  /** Returns true if field speculative_retry is set (has been assigned a value) and false otherwise */
+  public boolean isSetSpeculative_retry() {
+    return this.speculative_retry != null;
+  }
+
+  public void setSpeculative_retryIsSet(boolean value) {
+    if (!value) {
+      this.speculative_retry = null;
+    }
+  }
+
   /**
    * @deprecated
    */
       }
       break;
 
+    case SPECULATIVE_RETRY:
+      if (value == null) {
+        unsetSpeculative_retry();
+      } else {
+        setSpeculative_retry((String)value);
+      }
+      break;
+
     case ROW_CACHE_SIZE:
       if (value == null) {
         unsetRow_cache_size();
     case INDEX_INTERVAL:
       return Integer.valueOf(getIndex_interval());
 
+    case SPECULATIVE_RETRY:
+      return getSpeculative_retry();
+
     case ROW_CACHE_SIZE:
       return Double.valueOf(getRow_cache_size());
 
       return isSetDefault_time_to_live();
     case INDEX_INTERVAL:
       return isSetIndex_interval();
+    case SPECULATIVE_RETRY:
+      return isSetSpeculative_retry();
     case ROW_CACHE_SIZE:
       return isSetRow_cache_size();
     case KEY_CACHE_SIZE:
         return false;
     }
 
+    boolean this_present_speculative_retry = true && this.isSetSpeculative_retry();
+    boolean that_present_speculative_retry = true && that.isSetSpeculative_retry();
+    if (this_present_speculative_retry || that_present_speculative_retry) {
+      if (!(this_present_speculative_retry && that_present_speculative_retry))
+        return false;
+      if (!this.speculative_retry.equals(that.speculative_retry))
+        return false;
+    }
+
     boolean this_present_row_cache_size = true && this.isSetRow_cache_size();
     boolean that_present_row_cache_size = true && that.isSetRow_cache_size();
     if (this_present_row_cache_size || that_present_row_cache_size) {
     if (present_row_cache_keys_to_save)
       builder.append(row_cache_keys_to_save);
 
+    boolean preset_speculative_retry = true && (isSetSpeculative_retry());
+    builder.append(preset_speculative_retry);
+    if (preset_speculative_retry)
+      builder.append(speculative_retry);
+
     return builder.toHashCode();
   }
 
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(isSetSpeculative_retry()).compareTo(typedOther.isSetSpeculative_retry());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetSpeculative_retry()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.speculative_retry, typedOther.speculative_retry);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     lastComparison = Boolean.valueOf(isSetRow_cache_size()).compareTo(typedOther.isSetRow_cache_size());
     if (lastComparison != 0) {
       return lastComparison;
       sb.append(this.index_interval);
       first = false;
     }
+    if (isSetSpeculative_retry()) {
+      if (!first) sb.append(", ");
+      sb.append("speculative_retry:");
+      if (this.speculative_retry == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.speculative_retry);
+      }
+      first = false;
+    }
     if (isSetRow_cache_size()) {
       if (!first) sb.append(", ");
       sb.append("row_cache_size:");
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 41: // SPECULATIVE_RETRY
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.speculative_retry = iprot.readString();
+              struct.setSpeculative_retryIsSet(true);
+            } else {
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           case 9: // ROW_CACHE_SIZE
             if (schemeField.type == org.apache.thrift.protocol.TType.DOUBLE) {
               struct.row_cache_size = iprot.readDouble();
         oprot.writeI32(struct.index_interval);
         oprot.writeFieldEnd();
       }
+      if (struct.speculative_retry != null) {
+        if (struct.isSetSpeculative_retry()) {
+          oprot.writeFieldBegin(SPECULATIVE_RETRY_FIELD_DESC);
+          oprot.writeString(struct.speculative_retry);
+          oprot.writeFieldEnd();
+        }
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
       if (struct.isSetIndex_interval()) {
         optionals.set(22);
       }
-      if (struct.isSetRow_cache_size()) {
+      if (struct.isSetSpeculative_retry()) {
         optionals.set(23);
       }
-      if (struct.isSetKey_cache_size()) {
+      if (struct.isSetRow_cache_size()) {
         optionals.set(24);
       }
-      if (struct.isSetRow_cache_save_period_in_seconds()) {
+      if (struct.isSetKey_cache_size()) {
         optionals.set(25);
       }
-      if (struct.isSetKey_cache_save_period_in_seconds()) {
+      if (struct.isSetRow_cache_save_period_in_seconds()) {
         optionals.set(26);
       }
-      if (struct.isSetMemtable_flush_after_mins()) {
+      if (struct.isSetKey_cache_save_period_in_seconds()) {
         optionals.set(27);
       }
-      if (struct.isSetMemtable_throughput_in_mb()) {
+      if (struct.isSetMemtable_flush_after_mins()) {
         optionals.set(28);
       }
-      if (struct.isSetMemtable_operations_in_millions()) {
+      if (struct.isSetMemtable_throughput_in_mb()) {
         optionals.set(29);
       }
-      if (struct.isSetMerge_shards_chance()) {
+      if (struct.isSetMemtable_operations_in_millions()) {
         optionals.set(30);
       }
-      if (struct.isSetRow_cache_provider()) {
+      if (struct.isSetMerge_shards_chance()) {
         optionals.set(31);
       }
-      if (struct.isSetRow_cache_keys_to_save()) {
+      if (struct.isSetRow_cache_provider()) {
         optionals.set(32);
       }
-      oprot.writeBitSet(optionals, 33);
+      if (struct.isSetRow_cache_keys_to_save()) {
+        optionals.set(33);
+      }
+      oprot.writeBitSet(optionals, 34);
       if (struct.isSetColumn_type()) {
         oprot.writeString(struct.column_type);
       }
       if (struct.isSetIndex_interval()) {
         oprot.writeI32(struct.index_interval);
       }
+      if (struct.isSetSpeculative_retry()) {
+        oprot.writeString(struct.speculative_retry);
+      }
       if (struct.isSetRow_cache_size()) {
         oprot.writeDouble(struct.row_cache_size);
       }
       struct.setKeyspaceIsSet(true);
       struct.name = iprot.readString();
       struct.setNameIsSet(true);
-      BitSet incoming = iprot.readBitSet(33);
+      BitSet incoming = iprot.readBitSet(34);
       if (incoming.get(0)) {
         struct.column_type = iprot.readString();
         struct.setColumn_typeIsSet(true);
         struct.setIndex_intervalIsSet(true);
       }
       if (incoming.get(23)) {
+        struct.speculative_retry = iprot.readString();
+        struct.setSpeculative_retryIsSet(true);
+      }
+      if (incoming.get(24)) {
         struct.row_cache_size = iprot.readDouble();
         struct.setRow_cache_sizeIsSet(true);
       }
-      if (incoming.get(24)) {
+      if (incoming.get(25)) {
         struct.key_cache_size = iprot.readDouble();
         struct.setKey_cache_sizeIsSet(true);
       }
-      if (incoming.get(25)) {
+      if (incoming.get(26)) {
         struct.row_cache_save_period_in_seconds = iprot.readI32();
         struct.setRow_cache_save_period_in_secondsIsSet(true);
       }
-      if (incoming.get(26)) {
+      if (incoming.get(27)) {
         struct.key_cache_save_period_in_seconds = iprot.readI32();
         struct.setKey_cache_save_period_in_secondsIsSet(true);
       }
-      if (incoming.get(27)) {
+      if (incoming.get(28)) {
         struct.memtable_flush_after_mins = iprot.readI32();
         struct.setMemtable_flush_after_minsIsSet(true);
       }
-      if (incoming.get(28)) {
+      if (incoming.get(29)) {
         struct.memtable_throughput_in_mb = iprot.readI32();
         struct.setMemtable_throughput_in_mbIsSet(true);
       }
-      if (incoming.get(29)) {
+      if (incoming.get(30)) {
         struct.memtable_operations_in_millions = iprot.readDouble();
         struct.setMemtable_operations_in_millionsIsSet(true);
       }
-      if (incoming.get(30)) {
+      if (incoming.get(31)) {
         struct.merge_shards_chance = iprot.readDouble();
         struct.setMerge_shards_chanceIsSet(true);
       }
-      if (incoming.get(31)) {
+      if (incoming.get(32)) {
         struct.row_cache_provider = iprot.readString();
         struct.setRow_cache_providerIsSet(true);
       }
-      if (incoming.get(32)) {
+      if (incoming.get(33)) {
         struct.row_cache_keys_to_save = iprot.readI32();
         struct.setRow_cache_keys_to_saveIsSet(true);
       }

File src/java/org/apache/cassandra/cli/CliClient.java

View file
         INDEX_INTERVAL,
         MEMTABLE_FLUSH_PERIOD_IN_MS,
         CACHING,
-        DEFAULT_TIME_TO_LIVE
+        DEFAULT_TIME_TO_LIVE,
+        SPECULATIVE_RETRY
     }
 
     private static final String DEFAULT_PLACEMENT_STRATEGY = "org.apache.cassandra.locator.NetworkTopologyStrategy";
             case INDEX_INTERVAL:
                 cfDef.setIndex_interval(Integer.parseInt(mValue));
                 break;
+            case SPECULATIVE_RETRY:
+                cfDef.setSpeculative_retry(CliUtils.unescapeSQLString(mValue));
+                break;
             default:
                 //must match one of the above or we'd throw an exception at the valueOf statement above.
                 assert(false);
         writeAttr(output, false, "compaction_strategy", cfDef.compaction_strategy);
         writeAttr(output, false, "caching", cfDef.caching);
         writeAttr(output, false, "default_time_to_live", cfDef.default_time_to_live);
+        writeAttr(output, false, "speculative_retry", cfDef.speculative_retry);
 
         if (cfDef.isSetBloom_filter_fp_chance())
             writeAttr(output, false, "bloom_filter_fp_chance", cfDef.bloom_filter_fp_chance);
         sessionState.out.printf("      Default time to live: %s%n", cf_def.default_time_to_live);
         sessionState.out.printf("      Bloom Filter FP chance: %s%n", cf_def.isSetBloom_filter_fp_chance() ? cf_def.bloom_filter_fp_chance : "default");
         sessionState.out.printf("      Index interval: %s%n", cf_def.isSetIndex_interval() ? cf_def.index_interval : "default");
+        sessionState.out.printf("      Speculative Retry: %s%n", cf_def.speculative_retry);
 
         // if we have connection to the cfMBean established
         if (cfMBean != null)

File src/java/org/apache/cassandra/config/CFMetaData.java

View file
 import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
 import com.google.common.collect.MapDifference;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang.ArrayUtils;
     public final static ByteBuffer DEFAULT_KEY_NAME = ByteBufferUtil.bytes("KEY");
     public final static Caching DEFAULT_CACHING_STRATEGY = Caching.KEYS_ONLY;
     public final static int DEFAULT_DEFAULT_TIME_TO_LIVE = 0;
+    public final static SpeculativeRetry DEFAULT_SPECULATIVE_RETRY = new SpeculativeRetry(SpeculativeRetry.RetryType.NONE, 0);
     public final static int DEFAULT_INDEX_INTERVAL = 128;
 
     // Note that this is the default only for user created tables
                                                                        + "compaction_strategy_options text,"
                                                                        + "default_read_consistency text,"
                                                                        + "default_write_consistency text,"
+                                                                       + "speculative_retry text,"
                                                                        + "PRIMARY KEY (keyspace_name, columnfamily_name)"
                                                                        + ") WITH COMMENT='ColumnFamily definitions' AND gc_grace_seconds=8640");
     public static final CFMetaData SchemaColumnsCf = compile(10, "CREATE TABLE " + SystemTable.SCHEMA_COLUMNS_CF + "("
         }
     }
 
+    public static class SpeculativeRetry
+    {
+        public enum RetryType
+        {
+            NONE, CUSTOM, PERCENTILE, ALWAYS;
+        }
+
+        public final RetryType type;
+        public final long value;
+
+        private SpeculativeRetry(RetryType type, long value)
+        {
+            this.type = type;
+            this.value = value;
+        }
+
+        public static SpeculativeRetry fromString(String retry) throws ConfigurationException
+        {
+            String name = retry.toUpperCase();
+            try
+            {
+                if (name.endsWith(RetryType.PERCENTILE.toString()))
+                {
+                    long value = Long.parseLong(name.substring(0, name.length() - 10));
+                    if (value > 100 || value < 0)
+                        throw new ConfigurationException("PERCENTILE should be between 0 and 100");
+                    return new SpeculativeRetry(RetryType.PERCENTILE, value);
+                }
+                else if (name.endsWith("MS"))
+                {
+                    long value = Long.parseLong(name.substring(0, name.length() - 2));
+                    return new SpeculativeRetry(RetryType.CUSTOM, value);
+                }
+                else
+                {
+                    return new SpeculativeRetry(RetryType.valueOf(name), 0);
+                }
+            }
+            catch (IllegalArgumentException e)
+            {
+                // ignore to throw the below exception.
+            }
+            throw new ConfigurationException("invalid speculative_retry type: " + retry);
+        }
+
+        @Override
+        public boolean equals(Object obj)
+        {
+            if (! (obj instanceof SpeculativeRetry))
+                return false;
+            SpeculativeRetry rhs = (SpeculativeRetry) obj;
+            return Objects.equal(type, rhs.type) && Objects.equal(value, rhs.value);
+        }
+
+        @Override
+        public String toString()
+        {
+            switch (type)
+            {
+            case PERCENTILE:
+                return value + "PERCENTILE";
+            case CUSTOM:
+                return value + "MS";
+            default:
+                return type.toString();
+            }
+        }
+    }
+
     //REQUIRED
     public final UUID cfId;                           // internal id, never exposed to user
     public final String ksName;                       // name of keyspace
     private volatile int indexInterval = DEFAULT_INDEX_INTERVAL;
     private int memtableFlushPeriod = 0;
     private volatile int defaultTimeToLive = DEFAULT_DEFAULT_TIME_TO_LIVE;
+    private volatile SpeculativeRetry speculativeRetry = DEFAULT_SPECULATIVE_RETRY;
 
     volatile Map<ByteBuffer, ColumnDefinition> column_metadata = new HashMap<ByteBuffer,ColumnDefinition>();
     public volatile Class<? extends AbstractCompactionStrategy> compactionStrategyClass = DEFAULT_COMPACTION_STRATEGY_CLASS;
     public CFMetaData indexInterval(int prop) {indexInterval = prop; return this;}
     public CFMetaData memtableFlushPeriod(int prop) {memtableFlushPeriod = prop; return this;}
     public CFMetaData defaultTimeToLive(int prop) {defaultTimeToLive = prop; return this;}
+    public CFMetaData speculativeRetry(SpeculativeRetry prop) {speculativeRetry = prop; return this;}
 
     public CFMetaData(String keyspace, String name, ColumnFamilyType type, AbstractType<?> comp, AbstractType<?> subcc)
     {
                              .dcLocalReadRepairChance(0.0)
                              .gcGraceSeconds(0)
                              .caching(indexCaching)
+                             .speculativeRetry(parent.speculativeRetry)
                              .compactionStrategyClass(parent.compactionStrategyClass)
                              .compactionStrategyOptions(parent.compactionStrategyOptions)
                              .reloadSecondaryIndexMetadata(parent);
                       .caching(oldCFMD.caching)
                       .defaultTimeToLive(oldCFMD.defaultTimeToLive)
                       .indexInterval(oldCFMD.indexInterval)
+                      .speculativeRetry(oldCFMD.speculativeRetry)
                       .memtableFlushPeriod(oldCFMD.memtableFlushPeriod);
     }
 
         return indexInterval;
     }
 
+    public SpeculativeRetry getSpeculativeRetry()
+    {
+        return speculativeRetry;
+    }
+
     public int getMemtableFlushPeriod()
     {
         return memtableFlushPeriod;
             .append(caching, rhs.caching)
             .append(defaultTimeToLive, rhs.defaultTimeToLive)
             .append(indexInterval, rhs.indexInterval)
+            .append(speculativeRetry, rhs.speculativeRetry)
             .isEquals();
     }
 
             .append(caching)
             .append(defaultTimeToLive)
             .append(indexInterval)
+            .append(speculativeRetry)
             .toHashCode();
     }
 
                 newCFMD.dcLocalReadRepairChance(cf_def.dclocal_read_repair_chance);
             if (cf_def.isSetIndex_interval())
                 newCFMD.indexInterval(cf_def.index_interval);
+            if (cf_def.isSetSpeculative_retry())
+                newCFMD.speculativeRetry(SpeculativeRetry.fromString(cf_def.speculative_retry));
 
             CompressionParameters cp = CompressionParameters.create(cf_def.compression_options);
 
         memtableFlushPeriod = cfm.memtableFlushPeriod;
         caching = cfm.caching;
         defaultTimeToLive = cfm.defaultTimeToLive;
+        speculativeRetry = cfm.speculativeRetry;
 
         MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(column_metadata, cfm.column_metadata);
         // columns that are no longer needed
         def.setMemtable_flush_period_in_ms(memtableFlushPeriod);
         def.setCaching(caching.toString());
         def.setDefault_time_to_live(defaultTimeToLive);
+        def.setSpeculative_retry(speculativeRetry.toString());
         return def;
     }
 
         cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "bloom_filter_fp_chance"));
         cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "caching"));
         cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "default_time_to_live"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "speculative_retry"));
         cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "compaction_strategy_class"));
         cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "compression_parameters"));
         cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "value_alias"));
         cf.addColumn(Column.create(json(aliasesAsStrings(columnAliases)), timestamp, cfName, "column_aliases"));
         cf.addColumn(Column.create(json(compactionStrategyOptions), timestamp, cfName, "compaction_strategy_options"));
         cf.addColumn(Column.create(indexInterval, timestamp, cfName, "index_interval"));
+        cf.addColumn(Column.create(speculativeRetry.toString(), timestamp, cfName, "speculative_retry"));
     }
 
     // Package protected for use by tests
             cfm.caching(Caching.valueOf(result.getString("caching")));
             if (result.has("default_time_to_live"))
                 cfm.defaultTimeToLive(result.getInt("default_time_to_live"));
+            if (result.has("speculative_retry"))
+                cfm.speculativeRetry(SpeculativeRetry.fromString(result.getString("speculative_retry")));
             cfm.compactionStrategyClass(createCompactionStrategy(result.getString("compaction_strategy_class")));
             cfm.compressionParameters(CompressionParameters.create(fromJsonMap(result.getString("compression_parameters"))));
             cfm.columnAliases(aliasesFromStrings(fromJsonList(result.getString("column_aliases"))));
             .append("memtable_flush_period_in_ms", memtableFlushPeriod)
             .append("caching", caching)
             .append("defaultTimeToLive", defaultTimeToLive)
+            .append("speculative_retry", speculativeRetry)
             .append("indexInterval", indexInterval)
             .toString();
     }

File src/java/org/apache/cassandra/cql/AlterTableStatement.java

View file
         cfm.maxCompactionThreshold(cfProps.getPropertyInt(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD, cfm.getMaxCompactionThreshold()));
         cfm.caching(CFMetaData.Caching.fromString(cfProps.getPropertyString(CFPropDefs.KW_CACHING, cfm.getCaching().toString())));
         cfm.defaultTimeToLive(cfProps.getPropertyInt(CFPropDefs.KW_DEFAULT_TIME_TO_LIVE, cfm.getDefaultTimeToLive()));
+        cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(cfProps.getPropertyString(CFPropDefs.KW_SPECULATIVE_RETRY, cfm.getSpeculativeRetry().toString())));
         cfm.bloomFilterFpChance(cfProps.getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, cfm.getBloomFilterFpChance()));
         cfm.memtableFlushPeriod(cfProps.getPropertyInt(CFPropDefs.KW_MEMTABLE_FLUSH_PERIOD, cfm.getMemtableFlushPeriod()));
 

File src/java/org/apache/cassandra/cql/CFPropDefs.java

View file
     public static final String KW_COMPACTION_STRATEGY_CLASS = "compaction_strategy_class";
     public static final String KW_CACHING = "caching";
     public static final String KW_DEFAULT_TIME_TO_LIVE = "default_time_to_live";
+    public static final String KW_SPECULATIVE_RETRY = "speculative_retry";
     public static final String KW_BF_FP_CHANCE = "bloom_filter_fp_chance";
     public static final String KW_MEMTABLE_FLUSH_PERIOD = "memtable_flush_period_in_ms";
 
         keywords.add(KW_COMPACTION_STRATEGY_CLASS);
         keywords.add(KW_CACHING);
         keywords.add(KW_DEFAULT_TIME_TO_LIVE);
+        keywords.add(KW_SPECULATIVE_RETRY);
         keywords.add(KW_BF_FP_CHANCE);
         keywords.add(KW_MEMTABLE_FLUSH_PERIOD);
 

File src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java

View file
                    .compactionStrategyOptions(cfProps.compactionStrategyOptions)
                    .compressionParameters(CompressionParameters.create(cfProps.compressionParameters))
                    .caching(CFMetaData.Caching.fromString(getPropertyString(CFPropDefs.KW_CACHING, CFMetaData.DEFAULT_CACHING_STRATEGY.toString())))
+                   .speculativeRetry(CFMetaData.SpeculativeRetry.fromString(getPropertyString(CFPropDefs.KW_SPECULATIVE_RETRY, CFMetaData.DEFAULT_SPECULATIVE_RETRY.toString())))
                    .bloomFilterFpChance(getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, null))
                    .memtableFlushPeriod(getPropertyInt(CFPropDefs.KW_MEMTABLE_FLUSH_PERIOD, 0))
                    .defaultTimeToLive(getPropertyInt(CFPropDefs.KW_DEFAULT_TIME_TO_LIVE, CFMetaData.DEFAULT_DEFAULT_TIME_TO_LIVE));

File src/java/org/apache/cassandra/cql3/CFPropDefs.java

View file
     public static final String KW_REPLICATEONWRITE = "replicate_on_write";
     public static final String KW_CACHING = "caching";
     public static final String KW_DEFAULT_TIME_TO_LIVE = "default_time_to_live";
+    public static final String KW_SPECULATIVE_RETRY = "speculative_retry";
     public static final String KW_BF_FP_CHANCE = "bloom_filter_fp_chance";
     public static final String KW_MEMTABLE_FLUSH_PERIOD = "memtable_flush_period_in_ms";
 
         keywords.add(KW_REPLICATEONWRITE);
         keywords.add(KW_CACHING);
         keywords.add(KW_DEFAULT_TIME_TO_LIVE);
+        keywords.add(KW_SPECULATIVE_RETRY);
         keywords.add(KW_BF_FP_CHANCE);
         keywords.add(KW_COMPACTION);
         keywords.add(KW_COMPRESSION);
         cfm.maxCompactionThreshold(toInt(KW_MAXCOMPACTIONTHRESHOLD, getCompactionOptions().get(KW_MAXCOMPACTIONTHRESHOLD), cfm.getMaxCompactionThreshold()));
         cfm.caching(CFMetaData.Caching.fromString(getString(KW_CACHING, cfm.getCaching().toString())));
         cfm.defaultTimeToLive(getInt(KW_DEFAULT_TIME_TO_LIVE, cfm.getDefaultTimeToLive()));
+        cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(getString(KW_SPECULATIVE_RETRY, cfm.getSpeculativeRetry().toString())));
         cfm.memtableFlushPeriod(getInt(KW_MEMTABLE_FLUSH_PERIOD, cfm.getMemtableFlushPeriod()));
 
         if (compactionStrategyClass != null)

File src/java/org/apache/cassandra/db/ColumnFamilyStore.java

View file
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.*;
+import org.apache.cassandra.config.CFMetaData.SpeculativeRetry;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
     private final AtomicLong liveRatioComputedAt = new AtomicLong(32);
 
     public final ColumnFamilyMetrics metric;
+    public volatile long sampleLatency = Long.MAX_VALUE;
 
     public void reload()
     {
         {
             throw new RuntimeException(e);
         }
+        StorageService.optionalTasks.scheduleWithFixedDelay(new Runnable()
+        {
+            public void run()
+            {
+                SpeculativeRetry retryPolicy = ColumnFamilyStore.this.metadata.getSpeculativeRetry();
+                switch (retryPolicy.type)
+                {
+                    case PERCENTILE:
+                        double percentile = retryPolicy.value / 100d;
+                        // get percentile and convert it to MS insted of dealing with micro
+                        sampleLatency = (long) (metric.readLatency.latency.getSnapshot().getValue(percentile) / 1000);
+                        break;
+                    case CUSTOM:
+                        sampleLatency = retryPolicy.value;
+                        break;
+                    default:
+                        sampleLatency = Long.MAX_VALUE;
+                        break;
+                }
+            }
+        }, 30, 30, TimeUnit.SECONDS);
     }
 
     /** call when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations */

File src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java

View file
 
     private final MetricNameFactory factory;
 
+    public final Counter speculativeRetry;
+
     // for backward compatibility
     @Deprecated public final EstimatedHistogram sstablesPerRead = new EstimatedHistogram(35);
     @Deprecated public final EstimatedHistogram recentSSTablesPerRead = new EstimatedHistogram(35);
                 return total;
             }
         });
+        speculativeRetry = Metrics.newCounter(factory.createMetricName("SpeculativeRetry"));
     }
 
     public void updateSSTableIterated(int count)

File src/java/org/apache/cassandra/service/AbstractReadExecutor.java

View file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
+import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.primitives.Longs;
+
+public abstract class AbstractReadExecutor
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractReadExecutor.class);
+    protected final ReadCallback<ReadResponse, Row> handler;
+    protected final ReadCommand command;
+    protected final RowDigestResolver resolver;
+    protected final List<InetAddress> unfiltered;
+    protected final List<InetAddress> endpoints;
+    protected final ColumnFamilyStore cfs;
+
+    AbstractReadExecutor(ColumnFamilyStore cfs,
+                         ReadCommand command,
+                         ConsistencyLevel consistency_level,
+                         List<InetAddress> allReplicas,
+                         List<InetAddress> queryTargets)
+    throws UnavailableException
+    {
+        unfiltered = allReplicas;
+        this.endpoints = queryTargets;
+        this.resolver = new RowDigestResolver(command.table, command.key);
+        this.handler = new ReadCallback<ReadResponse, Row>(resolver, consistency_level, command, this.endpoints);
+        this.command = command;
+        this.cfs = cfs;
+
+        handler.assureSufficientLiveNodes();
+        assert !handler.endpoints.isEmpty();
+    }
+
+    void executeAsync()
+    {
+        // The data-request message is sent to dataPoint, the node that will actually get the data for us
+        InetAddress dataPoint = handler.endpoints.get(0);
+        if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && StorageProxy.OPTIMIZE_LOCAL_REQUESTS)
+        {
+            logger.trace("reading data locally");
+            StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
+        }
+        else
+        {
+            logger.trace("reading data from {}", dataPoint);
+            MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler);
+        }
+
+        if (handler.endpoints.size() == 1)
+            return;
+
+        // send the other endpoints a digest request
+        ReadCommand digestCommand = command.copy();
+        digestCommand.setDigestQuery(true);
+        MessageOut<?> message = null;
+        for (int i = 1; i < handler.endpoints.size(); i++)
+        {
+            InetAddress digestPoint = handler.endpoints.get(i);
+            if (digestPoint.equals(FBUtilities.getBroadcastAddress()))
+            {
+                logger.trace("reading digest locally");
+                StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
+            }
+            else
+            {
+                logger.trace("reading digest from {}", digestPoint);
+                // (We lazy-construct the digest Message object since it may not be necessary if we
+                // are doing a local digest read, or no digest reads at all.)
+                if (message == null)
+                    message = digestCommand.createMessage();
+                MessagingService.instance().sendRR(message, digestPoint, handler);
+            }
+        }
+    }
+
+    void speculate()
+    {
+        // noop by default.
+    }
+
+    Row get() throws ReadTimeoutException, DigestMismatchException, IOException
+    {
+        return handler.get();
+    }
+
+    public static AbstractReadExecutor getReadExecutor(ReadCommand command, ConsistencyLevel consistency_level) throws UnavailableException
+    {
+        Table table = Table.open(command.table);
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(command.cfName);
+        List<InetAddress> allReplicas = StorageProxy.getLiveSortedEndpoints(table, command.key);
+        List<InetAddress> queryTargets = consistency_level.filterForQuery(table, allReplicas, cfs.metadata.newReadRepairDecision());
+
+        switch (cfs.metadata.getSpeculativeRetry().type)
+        {
+            case ALWAYS:
+                return new SpeculateAlwaysExecutor(cfs, command, consistency_level, allReplicas, queryTargets);
+            case PERCENTILE:
+            case CUSTOM:
+                return queryTargets.size() < allReplicas.size()
+                       ? new SpeculativeReadExecutor(cfs, command, consistency_level, allReplicas, queryTargets)
+                       : new DefaultReadExecutor(cfs, command, consistency_level, allReplicas, queryTargets);
+            default:
+                return new DefaultReadExecutor(cfs, command, consistency_level, allReplicas, queryTargets);
+        }
+    }
+
+    private static class DefaultReadExecutor extends AbstractReadExecutor
+    {
+        public DefaultReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas, List<InetAddress> queryTargets) throws UnavailableException
+        {
+            super(cfs, command, consistency_level, allReplicas, queryTargets);
+        }
+    }
+
+    private static class SpeculativeReadExecutor extends AbstractReadExecutor
+    {
+        public SpeculativeReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas, List<InetAddress> queryTargets) throws UnavailableException
+        {
+            super(cfs, command, consistency_level, allReplicas, queryTargets);
+            assert handler.endpoints.size() < unfiltered.size();
+        }
+
+        @Override
+        void speculate()
+        {
+            // no latency information, or we're overloaded
+            if (cfs.sampleLatency > command.getTimeout())
+                return;
+
+            if (!handler.await(cfs.sampleLatency))
+            {
+                InetAddress endpoint = unfiltered.get(handler.endpoints.size());
+
+                // could be waiting on the data, or on enough digests
+                ReadCommand scommand = command;
+                if (resolver.getData() != null)
+                {
+                    scommand = command.copy();
+                    scommand.setDigestQuery(true);
+                }
+
+                logger.trace("Speculating read retry on {}", endpoint);
+                MessagingService.instance().sendRR(scommand.createMessage(), endpoint, handler);
+                cfs.metric.speculativeRetry.inc();
+            }
+        }
+    }
+
+    private static class SpeculateAlwaysExecutor extends AbstractReadExecutor
+    {
+        public SpeculateAlwaysExecutor(ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency_level, List<InetAddress> allReplicas, List<InetAddress> queryTargets) throws UnavailableException
+        {
+            super(cfs, command, consistency_level, allReplicas, queryTargets);
+        }
+
+        @Override
+        void executeAsync()
+        {
+            int limit = unfiltered.size() >= 2 ? 2 : 1;
+            for (int i = 0; i < limit; i++)
+            {
+                InetAddress endpoint = unfiltered.get(i);
+                if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+                {
+                    logger.trace("reading full data locally");
+                    StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
+                }
+                else
+                {
+                    logger.trace("reading full data from {}", endpoint);
+                    MessagingService.instance().sendRR(command.createMessage(), endpoint, handler);
+                }
+            }
+            if (handler.endpoints.size() <= limit)
+                return;
+
+            ReadCommand digestCommand = command.copy();
+            digestCommand.setDigestQuery(true);
+            MessageOut<?> message = digestCommand.createMessage();
+            for (int i = limit; i < handler.endpoints.size(); i++)
+            {
+                // Send the message
+                InetAddress endpoint = handler.endpoints.get(i);
+                if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+                {
+                    logger.trace("reading data locally, isDigest: {}", command.isDigestQuery());
+                    StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
+                }
+                else
+                {
+                    logger.trace("reading full data from {}, isDigest: {}", endpoint, command.isDigestQuery());
+                    MessagingService.instance().sendRR(message, endpoint, handler);
+                }
+            }
+        }
+    }
+}

File src/java/org/apache/cassandra/service/AbstractRowResolver.java

View file
         this.table = table;
     }
 
-    public void preprocess(MessageIn<ReadResponse> message)
+    public boolean preprocess(MessageIn<ReadResponse> message)
     {
+        MessageIn<ReadResponse> toReplace = null;
+        for (MessageIn<ReadResponse> reply : replies)
+        {
+            if (reply.from.equals(message.from))
+            {
+                if (!message.payload.isDigestQuery())
+                    toReplace = reply;
+                break;
+            }
+        }
+        // replace old message
+        if (toReplace != null)
+        {
+            replies.remove(toReplace);
+            replies.add(message);
+            return false;
+        }
         replies.add(message);
+        return true;
     }
 
     public Iterable<MessageIn<ReadResponse>> getMessages()

File src/java/org/apache/cassandra/service/IResponseResolver.java

View file
      */
     public TResolved getData() throws IOException;
 
-    public void preprocess(MessageIn<TMessage> message);
+    public boolean preprocess(MessageIn<TMessage> message);
     public Iterable<MessageIn<TMessage>> getMessages();
 }

File src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java

View file
         return resolvedRows;
     }
 
-    public void preprocess(MessageIn message)
+    public boolean preprocess(MessageIn message)
     {
         responses.add(message);
+        return true;
     }
 
     public boolean isDataPresent()

File src/java/org/apache/cassandra/service/ReadCallback.java

View file
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.cassandra.config.Schema;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.Table;
 
     public final IResponseResolver<TMessage, TResolved> resolver;
     private final SimpleCondition condition = new SimpleCondition();
-    private final long startTime;
+    final long startTime;
     private final int blockfor;
     final List<InetAddress> endpoints;
     private final IReadCommand command;
         return new ReadCallback(newResolver, consistencyLevel, blockfor, command, table, endpoints);
     }
 
-    public TResolved get() throws ReadTimeoutException, DigestMismatchException, IOException
+    public boolean await(long interimTimeout)
     {
-        long timeout = command.getTimeout() - (System.currentTimeMillis() - startTime);
-        boolean success;
+        long timeout = interimTimeout - (System.currentTimeMillis() - startTime);
         try
         {
-            success = condition.await(timeout, TimeUnit.MILLISECONDS);
+            return condition.await(timeout, TimeUnit.MILLISECONDS);
         }
         catch (InterruptedException ex)
         {
             throw new AssertionError(ex);
         }
+    }
 
-        if (!success)
-            throw new ReadTimeoutException(consistencyLevel, received.get(), blockfor, resolver.isDataPresent());
-
+    public TResolved get() throws ReadTimeoutException, DigestMismatchException, IOException
+    {
+        long timeout = command.getTimeout() - (System.currentTimeMillis() - startTime);
+        if (!await(timeout))
+        {
+            ReadTimeoutException ex = new ReadTimeoutException(consistencyLevel, received.get(), blockfor, resolver.isDataPresent());
+            if (logger.isDebugEnabled())
+                logger.debug("Read timeout: {}", ex.toString());
+            throw ex;
+        }
         return blockfor == 1 ? resolver.getData() : resolver.resolve();
     }
 
     public void response(MessageIn<TMessage> message)
     {
-        resolver.preprocess(message);
-        int n = waitingFor(message)
+        boolean hasAdded = resolver.preprocess(message);
+        int n = (waitingFor(message) && hasAdded)
               ? received.incrementAndGet()
               : received.get();
         if (n >= blockfor && resolver.isDataPresent())

File src/java/org/apache/cassandra/service/RowDigestResolver.java

View file
     /**
      * Special case of resolve() so that CL.ONE reads never throw DigestMismatchException in the foreground
      */
-    public Row getData() throws IOException
+    public Row getData()
     {
         for (MessageIn<ReadResponse> message : replies)
         {
             if (!result.isDigestQuery())
                 return result.row();
         }
-
-        throw new AssertionError("getData should not be invoked when no data is present");
+        return null;
     }
 
     /*

File src/java/org/apache/cassandra/service/StorageProxy.java

View file
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 {
     public static final String MBEAN_NAME = "org.apache.cassandra.db:type=StorageProxy";
     private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class);
-    private static final boolean OPTIMIZE_LOCAL_REQUESTS = true; // set to false to test messagingservice path on single node
+    static final boolean OPTIMIZE_LOCAL_REQUESTS = true; // set to false to test messagingservice path on single node
 
     public static final String UNREACHABLE = "UNREACHABLE";
 
         do
         {
             List<ReadCommand> commands = commandsToRetry.isEmpty() ? initialCommands : commandsToRetry;
-            ReadCallback<ReadResponse, Row>[] readCallbacks = new ReadCallback[commands.size()];
+            AbstractReadExecutor[] readExecutors = new AbstractReadExecutor[commands.size()];
 
             if (!commandsToRetry.isEmpty())
                 logger.debug("Retrying {} commands", commandsToRetry.size());
             for (int i = 0; i < commands.size(); i++)
             {
                 ReadCommand command = commands.get(i);
-                Table table = Table.open(command.getKeyspace());
                 assert !command.isDigestQuery();
                 logger.trace("Command/ConsistencyLevel is {}/{}", command, consistency_level);
 
-                List<InetAddress> endpoints = getLiveSortedEndpoints(table, command.key);
-                CFMetaData cfm = Schema.instance.getCFMetaData(command.getKeyspace(), command.getColumnFamilyName());
-                endpoints = consistency_level.filterForQuery(table, endpoints, cfm.newReadRepairDecision());
-
-                RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
-                ReadCallback<ReadResponse, Row> handler = new ReadCallback(resolver, consistency_level, command, endpoints);
-                handler.assureSufficientLiveNodes();
-                assert !endpoints.isEmpty();
-                readCallbacks[i] = handler;
-
-                // The data-request message is sent to dataPoint, the node that will actually get the data for us
-                InetAddress dataPoint = endpoints.get(0);
-                if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
-                {
-                    logger.trace("reading data locally");
-                    StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
-                }
-                else
-                {
-                    logger.trace("reading data from {}", dataPoint);
-                    MessagingService.instance().sendRR(command.createMessage(), dataPoint, handler);
-                }
-
-                if (endpoints.size() == 1)
-                    continue;
-
-                // send the other endpoints a digest request
-                ReadCommand digestCommand = command.copy();
-                digestCommand.setDigestQuery(true);
-                MessageOut message = null;
-                for (InetAddress digestPoint : endpoints.subList(1, endpoints.size()))
-                {
-                    if (digestPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
-                    {
-                        logger.trace("reading digest locally");
-                        StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
-                    }
-                    else
-                    {
-                        logger.trace("reading digest from {}", digestPoint);
-                        // (We lazy-construct the digest Message object since it may not be necessary if we
-                        // are doing a local digest read, or no digest reads at all.)
-                        if (message == null)
-                            message = digestCommand.createMessage();
-                        MessagingService.instance().sendRR(message, digestPoint, handler);
-                    }
-                }
+                AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistency_level);
+                exec.executeAsync();
+                readExecutors[i] = exec;
             }
 
+            for (AbstractReadExecutor exec: readExecutors)
+                exec.speculate();
+
             // read results and make a second pass for any digest mismatches
             List<ReadCommand> repairCommands = null;
             List<ReadCallback<ReadResponse, Row>> repairResponseHandlers = null;
-            for (int i = 0; i < commands.size(); i++)
+            for (AbstractReadExecutor exec: readExecutors)
             {
-                ReadCallback<ReadResponse, Row> handler = readCallbacks[i];
-                ReadCommand command = commands.get(i);
                 try
                 {
-                    Row row = handler.get();
+                    Row row = exec.get();
                     if (row != null)
                     {
-                        command.maybeTrim(row);
+                        exec.command.maybeTrim(row);
                         rows.add(row);
                     }
-                }
-                catch (ReadTimeoutException ex)
-                {
                     if (logger.isDebugEnabled())
-                        logger.debug("Read timeout: {}", ex.toString());
-                    throw ex;
+                        logger.debug("Read: " + (System.currentTimeMillis() - exec.handler.startTime) + " ms.");
                 }
                 catch (DigestMismatchException ex)
                 {
-                    logger.debug("Digest mismatch: {}", ex.toString());
+                    logger.trace("Digest mismatch: {}", ex);
                     // Do a full data read to resolve the correct response (and repair node that need be)
-                    RowDataResolver resolver = new RowDataResolver(command.table, command.key, command.filter());
-                    ReadCallback<ReadResponse, Row> repairHandler = handler.withNewResolver(resolver);
+                    RowDataResolver resolver = new RowDataResolver(exec.command.table, exec.command.key, exec.command.filter());
+                    ReadCallback<ReadResponse, Row> repairHandler = exec.handler.withNewResolver(resolver);
 
                     if (repairCommands == null)
                     {
                         repairCommands = new ArrayList<ReadCommand>();
                         repairResponseHandlers = new ArrayList<ReadCallback<ReadResponse, Row>>();
                     }
-                    repairCommands.add(command);
+                    repairCommands.add(exec.command);
                     repairResponseHandlers.add(repairHandler);
 
-                    for (InetAddress endpoint : handler.endpoints)
-                    {
-                        MessageOut<ReadCommand> message = command.createMessage();
+                    MessageOut<ReadCommand> message = exec.command.createMessage();
+                    for (InetAddress endpoint : exec.handler.endpoints)
                         MessagingService.instance().sendRR(message, endpoint, repairHandler);
-                    }
                 }
             }
 
         }
     }
 
-    private static List<InetAddress> getLiveSortedEndpoints(Table table, ByteBuffer key)
+    public static List<InetAddress> getLiveSortedEndpoints(Table table, ByteBuffer key)
     {
         return getLiveSortedEndpoints(table, StorageService.instance.getPartitioner().decorateKey(key));
     }

File src/java/org/apache/cassandra/utils/SimpleCondition.java

View file
 // _after_ signal(), it will work as desired.)
 public class SimpleCondition implements Condition
 {
-    boolean set;
+    private boolean set;
 
     public synchronized void await() throws InterruptedException
     {

File src/resources/org/apache/cassandra/cli/CliHelp.yaml

View file
             - ROWS_ONLY
             - NONE;
 
+        - speculative_retry: Speculative retry is used to speculate a read failure.
+
+          Speculative retry will execute additional read on a different nodes when
+          the read request doesn't complete within the x milliseconds.
+
+          Xpercentile will execute additional read requests to a different replicas
+          when read request was not completed within X percentile of the
+          normal/earlier recorded latencies.
+
+          Xms will execute additional read request to a diffrent replica when read
+          request was not completed in X milliseconds.
+
+          ALWAYS will execute data read request to 2 (If available) of the replicas
+          expecting a node to fail read.
+
+          Supported values are:
+            - ALWAYS
+            - Xpercentile
+            - Xms
+            - NONE;
+
         - max_compaction_threshold: The maximum number of SSTables allowed before a
         minor compaction is forced. Default is 32, setting to 0 disables minor
         compactions.