Commits

Anonymous committed 791affd Merge

Merge branch 'cassandra-1.2.0' into cassandra-1.2

Comments (0)

Files changed (4)

  * Skip repair on system_trace and keyspaces with RF=1 (CASSANDRA-4956)
  * Remove select arbitrary limit (CASSANDRA-4918)
 Merged from 1.1:
+ * add basic authentication support for Pig CassandraStorage (CASSANDRA-3042)
  * fix CQL2 ALTER TABLE compaction_strategy_class altering (CASSANDRA-4965)
  * reset getRangeSlice filter after finishing a row for get_paged_slice
    (CASSANDRA-4919)

doc/native_protocol.spec

     - <id> is [short bytes] representing the prepared query ID.
     - <metadata> is defined exactly as for a Rows RESULT (See section 4.2.5.2).
 
+  Note that prepared query ID return is global to the node on which the query
+  has been prepared. It can be used on any connection to that node and this
+  until the node is restarted (after which the query must be reprepared).
+
 4.2.5.5. Schema_change
 
   The result to a schema altering query (creation/update/drop of a

src/java/org/apache/cassandra/hadoop/ConfigHelper.java

         return conf.get(OUTPUT_KEYSPACE_CONFIG);
     }
 
+    public static void setInputKeyspaceUserNameAndPassword(Configuration conf, String username, String password)
+    {
+        setInputKeyspaceUserName(conf, username);
+        setInputKeyspacePassword(conf, password);
+    }
+
+    public static void setInputKeyspaceUserName(Configuration conf, String username)
+    {
+        conf.set(INPUT_KEYSPACE_USERNAME_CONFIG, username);
+    }
+
     public static String getInputKeyspaceUserName(Configuration conf)
     {
     	return conf.get(INPUT_KEYSPACE_USERNAME_CONFIG);
     }
 
+    public static void setInputKeyspacePassword(Configuration conf, String password)
+    {
+        conf.set(INPUT_KEYSPACE_PASSWD_CONFIG, password);
+    }
+
     public static String getInputKeyspacePassword(Configuration conf)
     {
     	return conf.get(INPUT_KEYSPACE_PASSWD_CONFIG);
     }
 
+    public static void setOutputKeyspaceUserNameAndPassword(Configuration conf, String username, String password)
+    {
+        setOutputKeyspaceUserName(conf, username);
+        setOutputKeyspacePassword(conf, password);
+    }
+
     public static void setOutputKeyspaceUserName(Configuration conf, String username)
     {
         conf.set(OUTPUT_KEYSPACE_USERNAME_CONFIG, username);

src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java

 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.IntegerType;
-import org.apache.cassandra.db.marshal.TypeParser;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Hex;
-import org.apache.cassandra.utils.UUIDGen;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.marshal.AbstractCompositeType.CompositeComponent;
 import org.apache.cassandra.hadoop.*;
-import org.apache.cassandra.thrift.Mutation;
-import org.apache.cassandra.thrift.Deletion;
-import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.UUIDGen;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.*;
-
 import org.apache.pig.*;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.*;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
     private ByteBuffer slice_end = BOUND;
     private boolean slice_reverse = false;
     private boolean allow_deletes = false;
+    private String username;
+    private String password;
     private String keyspace;
     private String column_family;
     private String loadSignature;
 
     private void setLocationFromUri(String location) throws IOException
     {
-        // parse uri into keyspace and columnfamily
         try
         {
             if (!location.startsWith("cassandra://"))
                     usePartitionFilter = Boolean.parseBoolean(urlQuery.get("use_secondary"));
             }
             String[] parts = urlParts[0].split("/+");
-            keyspace = parts[1];
+            String[] credentialsAndKeyspace = parts[1].split("@");
+            if (credentialsAndKeyspace.length > 1)
+            {
+                String[] credentials = credentialsAndKeyspace[0].split(":");
+                username = credentials[0];
+                password = credentials[1];
+                keyspace = credentialsAndKeyspace[1];
+            }
+            else
+            {
+                keyspace = parts[1];
+            }
             column_family = parts[2];
         }
         catch (Exception e)
         {
-            throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1][&allow_deletes=true][widerows=true][use_secondary=true]]': " + e.getMessage());
+            throw new IOException("Expected 'cassandra://[username:password@]<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1][&allow_deletes=true][widerows=true][use_secondary=true]]': " + e.getMessage());
         }
     }
 
     {
         conf = job.getConfiguration();
         setLocationFromUri(location);
+
         if (ConfigHelper.getInputSlicePredicate(conf) == null)
         {
             SliceRange range = new SliceRange(slice_start, slice_end, slice_reverse, limit);
         if (usePartitionFilter && getIndexExpressions() != null)
             ConfigHelper.setInputRange(conf, getIndexExpressions());
 
+        if (username != null && password != null)
+            ConfigHelper.setInputKeyspaceUserNameAndPassword(conf, username, password);
+
         ConfigHelper.setInputColumnFamily(conf, keyspace, column_family, widerows);
         setConnectionInformation();
 
     {
         conf = job.getConfiguration();
         setLocationFromUri(location);
+
+        if (username != null && password != null)
+            ConfigHelper.setOutputKeyspaceUserNameAndPassword(conf, username, password);
+
         ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
         setConnectionInformation();
 
 
     private void initSchema(String signature)
     {
-        UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties properties = UDFContext.getUDFContext().getUDFProperties(CassandraStorage.class);
 
         // Only get the schema if we haven't already gotten it
-        if (!property.containsKey(signature))
+        if (!properties.containsKey(signature))
         {
-            Cassandra.Client client = null;
             try
             {
-                client = ConfigHelper.getClientFromInputAddressList(conf);
-                CfDef cfDef = null;
+                Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
                 client.set_keyspace(keyspace);
+
+                if (username != null && password != null)
+                {
+                    Map<String, String> credentials = new HashMap<String, String>(2);
+                    credentials.put(IAuthenticator.USERNAME_KEY, username);
+                    credentials.put(IAuthenticator.PASSWORD_KEY, password);
+
+                    try
+                    {
+                        client.login(new AuthenticationRequest(credentials));
+                    }
+                    catch (AuthenticationException e)
+                    {
+                        logger.error("Authentication exception: invalid username and/or password");
+                        throw new RuntimeException(e);
+                    }
+                    catch (AuthorizationException e)
+                    {
+                        throw new AssertionError(e); // never actually throws AuthorizationException.
+                    }
+                }
+
+                CfDef cfDef = null;
                 KsDef ksDef = client.describe_keyspace(keyspace);
                 List<CfDef> defs = ksDef.getCf_defs();
                 for (CfDef def : defs)
                     }
                 }
                 if (cfDef != null)
-                    property.setProperty(signature, cfdefToString(cfDef));
+                    properties.setProperty(signature, cfdefToString(cfDef));
                 else
-                    throw new RuntimeException("Column family '" + column_family + "' not found in keyspace '" + keyspace + "'");
+                    throw new RuntimeException(String.format("Column family '%s' not found in keyspace '%s'",
+                                                             column_family,
+                                                             keyspace));
             }
             catch (TException e)
             {