Commits

pcmanus committed 74a8508

Makes batches atomic

Comments (0)

Files changed (4)

src/java/org/apache/cassandra/cql3/statements/BatchStatement.java

 package org.apache.cassandra.cql3.statements;
 
 import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.CounterMutation;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.thrift.RequestType;
 import org.apache.cassandra.thrift.ThriftValidation;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * A <code>BATCH</code> statement parsed from a CQL query.
     public List<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables)
     throws InvalidRequestException
     {
-        List<IMutation> batch = new LinkedList<IMutation>();
 
+        Map<Pair<String, ByteBuffer>, RowAndCounterMutation> mutations = new HashMap<Pair<String, ByteBuffer>, RowAndCounterMutation>();
         for (ModificationStatement statement : statements)
         {
             if (isSetTimestamp())
                 statement.timestamp = timestamp;
-            batch.addAll(statement.getMutations(clientState, variables));
+
+            List<IMutation> lm = statement.getMutations(clientState, variables);
+            // Group mutation together, otherwise they won't get applied atomically
+            for (IMutation m : lm)
+            {
+                Pair<String, ByteBuffer> key = Pair.create(m.getTable(), m.key());
+                RowAndCounterMutation racm = mutations.get(key);
+                if (racm == null)
+                {
+                    racm = new RowAndCounterMutation();
+                    mutations.put(key, racm);
+                }
+
+                if (m instanceof CounterMutation)
+                {
+                    if (racm.cm == null)
+                        racm.cm = (CounterMutation)m;
+                    else
+                        racm.cm.addAll(m);
+                }
+                else
+                {
+                    assert m instanceof RowMutation;
+                    if (racm.rm == null)
+                        racm.rm = (RowMutation)m;
+                    else
+                        racm.rm.addAll(m);
+                }
+            }
         }
 
+        List<IMutation> batch = new LinkedList<IMutation>();
+        for (RowAndCounterMutation racm : mutations.values())
+        {
+            if (racm.rm != null)
+                batch.add(racm.rm);
+            if (racm.cm != null)
+                batch.add(racm.cm);
+        }
         return batch;
     }
 
     {
         return String.format("BatchStatement(statements=%s, consistency=%s)", statements, cLevel);
     }
+
+    private static class RowAndCounterMutation
+    {
+        public RowMutation rm;
+        public CounterMutation cm;
+    }
 }

src/java/org/apache/cassandra/db/CounterMutation.java

         rm.apply();
     }
 
+    public void addAll(IMutation m)
+    {
+        if (!(m instanceof CounterMutation))
+            throw new IllegalArgumentException();
+
+        CounterMutation cm = (CounterMutation)m;
+        rowMutation.addAll(cm.rowMutation);
+    }
+
     @Override
     public String toString()
     {

src/java/org/apache/cassandra/db/IMutation.java

     public ByteBuffer key();
     public void apply() throws IOException;
     public String toString(boolean shallow);
+    public void addAll(IMutation m);
 }

src/java/org/apache/cassandra/db/RowMutation.java

         }
     }
 
+    public void addAll(IMutation m)
+    {
+        if (!(m instanceof RowMutation))
+            throw new IllegalArgumentException();
+
+        RowMutation rm = (RowMutation)m;
+        if (!table_.equals(rm.table_) || !key_.equals(rm.key_))
+            throw new IllegalArgumentException();
+
+        for (Map.Entry<Integer, ColumnFamily> entry : rm.modifications_.entrySet())
+        {
+            // It's slighty faster to assume the key wasn't present and fix if
+            // not in the case where it wasn't there indeed.
+            ColumnFamily cf = modifications_.put(entry.getKey(), entry.getValue());
+            if (cf != null)
+                entry.getValue().resolve(cf);
+        }
+    }
+
     /*
      * This is equivalent to calling commit. Applies the changes to
      * to the table that is obtained by calling Table.open().