Commits

Pavel Yaskevich  committed 73d828e

add ConfigHelper support for Thrift frame and max message sizes
patch by Pavel Yaskevich; reviewed by Brandon Williams for CASSANDRA-5188

  • Participants
  • Parent commits 3298c2f

Comments (0)

Files changed (7)

  * fix ConcurrentModificationException in getBootstrapSource (CASSANDRA-5170)
  * fix sstable maxtimestamp for row deletes and pre-1.1.1 sstables (CASSANDRA-5153)
  * fix start key/end token validation for wide row iteration (CASSANDRA-5168)
+ * add ConfigHelper support for Thrift frame and max message sizes (CASSANDRA-5188)
 
 
 1.1.9

File src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java

             throws InvalidRequestException, TException, AuthenticationException, AuthorizationException, LoginException
     {
         logger.debug("Creating authenticated client for CF output format");
-        TTransport transport = ConfigHelper.getOutputTransportFactory(conf).openTransport(socket);
-        TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport);
+        TTransport transport = ConfigHelper.getOutputTransportFactory(conf).openTransport(socket, conf);
+        TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport, ConfigHelper.getThriftMaxMessageLength(conf));
         Cassandra.Client client = new Cassandra.Client(binaryProtocol);
         client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
         if (ConfigHelper.getOutputKeyspaceUserName(conf) != null)

File src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java

             // create connection using thrift
             String location = getLocation();
             socket = new TSocket(location, ConfigHelper.getInputRpcPort(conf));
-            TTransport transport = ConfigHelper.getInputTransportFactory(conf).openTransport(socket);
-            TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport);
+            TTransport transport = ConfigHelper.getInputTransportFactory(conf).openTransport(socket, conf);
+            TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport, ConfigHelper.getThriftMaxMessageLength(conf));
             client = new Cassandra.Client(binaryProtocol);
 
             // log in

File src/java/org/apache/cassandra/hadoop/ConfigHelper.java

     private static final String OUTPUT_COMPRESSION_CHUNK_LENGTH = "cassandra.output.compression.length";
     private static final String INPUT_TRANSPORT_FACTORY_CLASS = "cassandra.input.transport.factory.class";
     private static final String OUTPUT_TRANSPORT_FACTORY_CLASS = "cassandra.output.transport.factory.class";
+    private static final String THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB = "cassandra.thrift.framed.size_mb";
+    private static final String THRIFT_MAX_MESSAGE_LENGTH_IN_MB = "cassandra.thrift.message.max_size_mb";
 
     private static final Logger logger = LoggerFactory.getLogger(ConfigHelper.class);
 
         conf.set(OUTPUT_COMPRESSION_CHUNK_LENGTH, length);
     }
 
+    public static void setThriftFramedTransportSizeInMb(Configuration conf, int frameSizeInMB)
+    {
+        conf.setInt(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB, frameSizeInMB);
+    }
+
+    /**
+     * @param conf The configuration to use.
+     * @return Value (converts MBs to Bytes) set by {@link setThriftFramedTransportSizeInMb(Configuration, int)} or default of 15MB
+     */
+    public static int getThriftFramedTransportSize(Configuration conf)
+    {
+        return conf.getInt(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB, 15) * 1024 * 1024; // 15MB is default in Cassandra
+    }
+
+    public static void setThriftMaxMessageLengthInMb(Configuration conf, int maxMessageSizeInMB)
+    {
+        conf.setInt(THRIFT_MAX_MESSAGE_LENGTH_IN_MB, maxMessageSizeInMB);
+    }
+
+    /**
+     * @param conf The configuration to use.
+     * @return Value (converts MBs to Bytes) set by {@link setThriftMaxMessageLengthInMb(Configuration, int)} or default of 16MB
+     */
+    public static int getThriftMaxMessageLength(Configuration conf)
+    {
+        return conf.getInt(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB, 16) * 1024 * 1024; // 16MB is default in Cassandra
+    }
+
     public static CompressionParameters getOutputCompressionParamaters(Configuration conf)
     {
         if (getOutputCompressionClass(conf) == null)
         try
         {
             TSocket socket = new TSocket(host, port);
-            TTransport transport = getInputTransportFactory(conf).openTransport(socket);
-            return new Cassandra.Client(new TBinaryProtocol(transport));
+            TTransport transport = getInputTransportFactory(conf).openTransport(socket, conf);
+            return new Cassandra.Client(new TBinaryProtocol(transport, getThriftMaxMessageLength(conf)));
         }
         catch (LoginException e)
         {

File src/java/org/apache/cassandra/thrift/ITransportFactory.java

  *
  */
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 
 public interface ITransportFactory
 {
-    TTransport openTransport(TSocket socket) throws LoginException, TTransportException;
+    TTransport openTransport(TSocket socket, Configuration conf) throws LoginException, TTransportException;
 }

File src/java/org/apache/cassandra/thrift/TBinaryProtocol.java

         this(trans, false, true);
     }
 
+    public TBinaryProtocol(TTransport trans, int readLength)
+    {
+        this(trans);
+
+        if (readLength > 0)
+            setReadLength(readLength);
+    }
+
     public TBinaryProtocol(TTransport trans, boolean strictRead, boolean strictWrite)
     {
         super(trans);

File src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java

  *
  */
 
+import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 
+import org.apache.hadoop.conf.Configuration;
+
 public class TFramedTransportFactory implements ITransportFactory
 {
-    public TTransport openTransport(TSocket socket) throws TTransportException
+    public TTransport openTransport(TSocket socket, Configuration conf) throws TTransportException
     {
-        TTransport transport = new TFramedTransport(socket);
+        TTransport transport = new TFramedTransport(socket, ConfigHelper.getThriftFramedTransportSize(conf));
         transport.open();
         return transport;
     }