accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1502726 [4/4] - in /accumulo/branches/ACCUMULO-1000: core/src/main/java/org/apache/accumulo/core/client/ core/src/main/java/org/apache/accumulo/core/client/impl/ core/src/main/java/org/apache/accumulo/core/client/mock/ core/src/main/java/o...
Date Fri, 12 Jul 2013 23:29:58 GMT
Modified: accumulo/branches/ACCUMULO-1000/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-1000/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java?rev=1502726&r1=1502725&r2=1502726&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-1000/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java (original)
+++ accumulo/branches/ACCUMULO-1000/core/src/main/java/org/apache/accumulo/core/util/ByteBufferUtil.java Fri Jul 12 23:29:57 2013
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.hadoop.io.Text;
 
 public class ByteBufferUtil {
@@ -76,4 +77,16 @@ public class ByteBufferUtil {
   public static String toString(ByteBuffer bytes) {
     return new String(bytes.array(), bytes.position(), bytes.remaining());
   }
+  
+  public static ByteBuffer toByteBuffers(ByteSequence bs) {
+    if (bs == null)
+      return null;
+
+    if (bs.isBackedByArray()) {
+      return ByteBuffer.wrap(bs.getBackingArray(), bs.offset(), bs.length());
+    } else {
+      // TODO create more efficient impl
+      return ByteBuffer.wrap(bs.toArray());
+    }
+  }
 }

Modified: accumulo/branches/ACCUMULO-1000/core/src/main/thrift/data.thrift
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-1000/core/src/main/thrift/data.thrift?rev=1502726&r1=1502725&r2=1502726&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-1000/core/src/main/thrift/data.thrift (original)
+++ accumulo/branches/ACCUMULO-1000/core/src/main/thrift/data.thrift Fri Jul 12 23:29:57 2013
@@ -110,10 +110,41 @@ struct UpdateErrors {
 	3:map<TKeyExtent, client.SecurityErrorCode> authorizationFailures
 }
 
+enum TCMStatus {
+	ACCEPTED,
+	REJECTED,
+	VIOLATED,
+	IGNORED
+}
+
+struct TCMResult {
+	1:i64 cmid,
+	2:TCMStatus status
+}
+
 struct MapFileInfo {
 	1:i64 estimatedSize
 }
 
+struct TCondition {
+	1:binary cf;
+	2:binary cq;
+	3:binary cv;
+	4:i64 ts;
+	5:bool hasTimestamp;
+	6:binary val;
+	7:list<IterInfo> ssiList
+	8:map<string, map<string, string>> ssio 
+}
+
+struct TConditionalMutation {
+	1:list<TCondition> conditions;
+	2:TMutation mutation;
+	3:i64 id;
+}
+
+typedef map<TKeyExtent,list<TConditionalMutation>> CMBatch
+
 typedef map<TKeyExtent,list<TMutation>> UpdateBatch
 
 typedef map<TKeyExtent, map<string, MapFileInfo>> TabletFiles

Modified: accumulo/branches/ACCUMULO-1000/core/src/main/thrift/tabletserver.thrift
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-1000/core/src/main/thrift/tabletserver.thrift?rev=1502726&r1=1502725&r2=1502726&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-1000/core/src/main/thrift/tabletserver.thrift (original)
+++ accumulo/branches/ACCUMULO-1000/core/src/main/thrift/tabletserver.thrift Fri Jul 12 23:29:57 2013
@@ -160,13 +160,15 @@ service TabletClientService extends clie
   data.UpdateID startUpdate(2:trace.TInfo tinfo, 1:security.TCredentials credentials) throws (1:client.ThriftSecurityException sec),
   oneway void applyUpdates(1:trace.TInfo tinfo, 2:data.UpdateID updateID, 3:data.TKeyExtent keyExtent, 4:list<data.TMutation> mutations),
   data.UpdateErrors closeUpdate(2:trace.TInfo tinfo, 1:data.UpdateID updateID) throws (1:NoSuchScanIDException nssi),
-  
+
   //the following call supports making a single update to a tablet
   void update(4:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:data.TKeyExtent keyExtent, 3:data.TMutation mutation)
     throws (1:client.ThriftSecurityException sec, 
             2:NotServingTabletException nste, 
             3:ConstraintViolationException cve),
   
+  list<data.TCMResult> conditionalUpdate(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:list<binary> authorizations, 4:data.CMBatch mutations);
+
   // on success, returns an empty list
   list<data.TKeyExtent> bulkImport(3:trace.TInfo tinfo, 1:security.TCredentials credentials, 4:i64 tid, 2:data.TabletFiles files, 5:bool setTime) throws (1:client.ThriftSecurityException sec),
 

Added: accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/data/ServerConditionalMutation.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/data/ServerConditionalMutation.java?rev=1502726&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/data/ServerConditionalMutation.java (added)
+++ accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/data/ServerConditionalMutation.java Fri Jul 12 23:29:57 2013
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.data;
+
+import java.util.List;
+
+import org.apache.accumulo.core.client.impl.Translator;
+import org.apache.accumulo.core.data.thrift.TCondition;
+import org.apache.accumulo.core.data.thrift.TConditionalMutation;
+
+/**
+ * 
+ */
+public class ServerConditionalMutation extends ServerMutation {
+  
+  public static class TCMTranslator extends Translator<TConditionalMutation,ServerConditionalMutation> {
+    @Override
+    public ServerConditionalMutation translate(TConditionalMutation input) {
+      return new ServerConditionalMutation(input);
+    }
+  }
+  
+  public static final TCMTranslator TCMT = new TCMTranslator();
+
+  private long cmid;
+  private List<TCondition> conditions;
+  
+  public ServerConditionalMutation(TConditionalMutation input) {
+    super(input.mutation);
+
+    this.cmid = input.id;
+    this.conditions = input.conditions;
+  }
+
+  public long getID() {
+    return cmid;
+  }
+  
+  public List<TCondition> getConditions() {
+    return conditions;
+  }
+  
+
+}

Added: accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java?rev=1502726&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java (added)
+++ accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/ConditionalMutationSet.java Fri Jul 12 23:29:57 2013
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.server.data.ServerConditionalMutation;
+import org.apache.hadoop.io.WritableComparator;
+
+/**
+ * 
+ */
+public class ConditionalMutationSet {
+
+  static interface DeferFilter {
+    void defer(List<ServerConditionalMutation> scml, List<ServerConditionalMutation> okMutations, List<ServerConditionalMutation> deferred);
+  }
+  
+  static class DuplicateFitler implements DeferFilter {
+    public void defer(List<ServerConditionalMutation> scml, List<ServerConditionalMutation> okMutations, List<ServerConditionalMutation> deferred) {
+      okMutations.add(scml.get(0));
+      for (int i = 1; i < scml.size(); i++) {
+        if (Arrays.equals(scml.get(i - 1).getRow(), scml.get(i).getRow())) {
+          deferred.add(scml.get(i));
+        } else {
+          okMutations.add(scml.get(i));
+        }
+      }
+    }
+  }
+  
+  static void defer(Map<KeyExtent,List<ServerConditionalMutation>> updates, Map<KeyExtent,List<ServerConditionalMutation>> deferredMutations, DeferFilter filter) {
+    for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : updates.entrySet()) {
+      List<ServerConditionalMutation> scml = entry.getValue();
+      List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(scml.size());
+      List<ServerConditionalMutation> deferred = new ArrayList<ServerConditionalMutation>();
+      filter.defer(scml, okMutations, deferred);
+      
+      if (deferred.size() > 0) {
+        scml.clear();
+        scml.addAll(okMutations);
+        List<ServerConditionalMutation> l = deferredMutations.get(entry.getKey());
+        if (l == null) {
+          l = deferred;
+          deferredMutations.put(entry.getKey(), l);
+        } else {
+          l.addAll(deferred);
+        }
+
+      }
+    }
+  }
+  
+  static void deferDuplicatesRows(Map<KeyExtent,List<ServerConditionalMutation>> updates, Map<KeyExtent,List<ServerConditionalMutation>> deferred) {
+    defer(updates, deferred, new DuplicateFitler());
+  }
+
+  static void sortConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates) {
+    for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : updates.entrySet()) {
+      // TODO check if its already in sorted order?
+      // TODO maybe the potential benefit of sorting is not worth the cost
+      Collections.sort(entry.getValue(), new Comparator<ServerConditionalMutation>() {
+        @Override
+        public int compare(ServerConditionalMutation o1, ServerConditionalMutation o2) {
+          return WritableComparator.compareBytes(o1.getRow(), 0, o1.getRow().length, o2.getRow(), 0, o2.getRow().length);
+        }
+      });
+    }
+  }
+}

Added: accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/RowLocks.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/RowLocks.java?rev=1502726&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/RowLocks.java (added)
+++ accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/RowLocks.java Fri Jul 12 23:29:57 2013
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.server.data.ServerConditionalMutation;
+import org.apache.accumulo.server.tabletserver.ConditionalMutationSet.DeferFilter;
+
+/**
+ * 
+ */
+class RowLocks {
+  
+  private Map<ByteSequence,RowLock> rowLocks = new HashMap<ByteSequence,RowLock>();
+  
+  static class RowLock {
+    ReentrantLock rlock;
+    int count;
+    ByteSequence rowSeq;
+    
+    RowLock(ReentrantLock rlock, ByteSequence rowSeq) {
+      this.rlock = rlock;
+      this.count = 0;
+      this.rowSeq = rowSeq;
+    }
+    
+    public boolean tryLock() {
+      return rlock.tryLock();
+    }
+    
+    public void lock() {
+      rlock.lock();
+    }
+    
+    public void unlock() {
+      rlock.unlock();
+    }
+  }
+  
+  private RowLock getRowLock(ArrayByteSequence rowSeq) {
+      RowLock lock = rowLocks.get(rowSeq);
+      if (lock == null) {
+        lock = new RowLock(new ReentrantLock(), rowSeq);
+        rowLocks.put(rowSeq, lock);
+      }
+      
+      lock.count++;
+      return lock;
+  }
+  
+  private void returnRowLock(RowLock lock) {
+      if (lock.count == 0)
+        throw new IllegalStateException();
+      lock.count--;
+      
+      if (lock.count == 0) {
+        rowLocks.remove(lock.rowSeq);
+      }
+  }
+  
+  List<RowLock> acquireRowlocks(Map<KeyExtent,List<ServerConditionalMutation>> updates, Map<KeyExtent,List<ServerConditionalMutation>> deferred) {
+    ArrayList<RowLock> locks = new ArrayList<RowLock>();
+    
+    // assume that mutations are in sorted order to avoid deadlock
+    synchronized (rowLocks) {
+      for (List<ServerConditionalMutation> scml : updates.values()) {
+        for (ServerConditionalMutation scm : scml) {
+          locks.add(getRowLock(new ArrayByteSequence(scm.getRow())));
+        }
+      }
+    }
+    
+    HashSet<ByteSequence> rowsNotLocked = null;
+
+    // acquire as many locks as possible, not blocking on rows that are already locked
+    if (locks.size() > 1) {
+      for (RowLock rowLock : locks) {
+        if (!rowLock.tryLock()) {
+          if (rowsNotLocked == null)
+            rowsNotLocked = new HashSet<ByteSequence>();
+          rowsNotLocked.add(rowLock.rowSeq);
+        }
+      }
+    } else {
+      // if there is only one lock, then wait for it
+      locks.get(0).lock();
+    }
+    
+    if (rowsNotLocked != null) {
+      
+      final HashSet<ByteSequence> rnlf = rowsNotLocked;
+      // assume will get locks needed, do something expensive otherwise
+      ConditionalMutationSet.defer(updates, deferred, new DeferFilter() {
+        @Override
+        public void defer(List<ServerConditionalMutation> scml, List<ServerConditionalMutation> okMutations, List<ServerConditionalMutation> deferred) {
+          for (ServerConditionalMutation scm : scml) {
+            if (rnlf.contains(new ArrayByteSequence(scm.getRow())))
+              deferred.add(scm);
+            else
+              okMutations.add(scm);
+            
+          }
+        }
+      });
+      
+      ArrayList<RowLock> filteredLocks = new ArrayList<RowLock>();
+      ArrayList<RowLock> locksToReturn = new ArrayList<RowLock>();
+      for (RowLock rowLock : locks) {
+        if (rowsNotLocked.contains(rowLock.rowSeq)) {
+          locksToReturn.add(rowLock);
+        } else {
+          filteredLocks.add(rowLock);
+        }
+      }
+      
+      synchronized (rowLocks) {
+        for (RowLock rowLock : locksToReturn) {
+          returnRowLock(rowLock);
+        }
+      }
+
+      locks = filteredLocks;
+    }
+    return locks;
+  }
+  
+  void releaseRowLocks(List<RowLock> locks) {
+    for (RowLock rowLock : locks) {
+      rowLock.unlock();
+    }
+    
+    synchronized (rowLocks) {
+      for (RowLock rowLock : locks) {
+        returnRowLock(rowLock);
+      }
+    }
+  }
+
+}

Modified: accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1502726&r1=1502725&r2=1502726&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/branches/ACCUMULO-1000/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Fri Jul 12 23:29:57 2013
@@ -88,7 +88,11 @@ import org.apache.accumulo.core.data.thr
 import org.apache.accumulo.core.data.thrift.MapFileInfo;
 import org.apache.accumulo.core.data.thrift.MultiScanResult;
 import org.apache.accumulo.core.data.thrift.ScanResult;
+import org.apache.accumulo.core.data.thrift.TCMResult;
+import org.apache.accumulo.core.data.thrift.TCMStatus;
 import org.apache.accumulo.core.data.thrift.TColumn;
+import org.apache.accumulo.core.data.thrift.TCondition;
+import org.apache.accumulo.core.data.thrift.TConditionalMutation;
 import org.apache.accumulo.core.data.thrift.TKey;
 import org.apache.accumulo.core.data.thrift.TKeyExtent;
 import org.apache.accumulo.core.data.thrift.TKeyValue;
@@ -141,6 +145,7 @@ import org.apache.accumulo.server.client
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.data.ServerConditionalMutation;
 import org.apache.accumulo.server.data.ServerMutation;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager;
@@ -159,6 +164,7 @@ import org.apache.accumulo.server.securi
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.security.SecurityOperation;
 import org.apache.accumulo.server.tabletserver.Compactor.CompactionInfo;
+import org.apache.accumulo.server.tabletserver.RowLocks.RowLock;
 import org.apache.accumulo.server.tabletserver.Tablet.CommitSession;
 import org.apache.accumulo.server.tabletserver.Tablet.KVEntry;
 import org.apache.accumulo.server.tabletserver.Tablet.LookupResult;
@@ -1690,6 +1696,202 @@ public class TabletServer extends Abstra
       }
     }
     
+    private RowLocks rowLocks = new RowLocks();
+
+    private void checkConditions(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, Authorizations authorizations) {
+      Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter = updates.entrySet().iterator();
+      
+      // TODO use constant
+      HashSet<Column> columns = new HashSet<Column>();
+
+      while (iter.hasNext()) {
+        Entry<KeyExtent,List<ServerConditionalMutation>> entry = iter.next();
+        Tablet tablet = onlineTablets.get(entry.getKey());
+        
+        if (tablet == null || tablet.isClosed()) {
+          for (ServerConditionalMutation scm : entry.getValue())
+            results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+          iter.remove();
+        } else {
+          List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(entry.getValue().size());
+          
+          // TODO extract to method
+          for (ServerConditionalMutation scm : entry.getValue()) {
+            boolean add = true;
+            for(TCondition tc : scm.getConditions()){
+            
+              Range range;
+              if (tc.hasTimestamp)
+                range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs());
+              else
+                range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()));
+              
+              AtomicBoolean interruptFlag = new AtomicBoolean();
+
+              //TODO use one iterator per tablet, push checks into tablet?
+              Scanner scanner = tablet.createScanner(range, 1, columns, authorizations, tc.ssiList, tc.ssio, false, interruptFlag);
+              
+              try {
+                ScanBatch batch = scanner.read();
+                
+                Value val = null;
+                
+                for (KVEntry entry2 : batch.results) {
+                  val = entry2.getValue();
+                  break;
+                }
+                
+                if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) {
+                  results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED));
+                  add = false;
+                  break;
+                }
+                
+              } catch (TabletClosedException e) {
+                // TODO ignore rest of tablets mutations
+                results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+                add = false;
+                break;
+              } catch (IterationInterruptedException iie) {
+                // TODO determine why this happened, ignore rest of tablets mutations?
+                results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+                add = false;
+                break;
+              } catch (TooManyFilesException tmfe) {
+                // TODO handle differently?
+                results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+                add = false;
+                break;
+              } catch (IOException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+              } finally {
+                scanner.close();
+              }
+            }
+            
+            if (add)
+              okMutations.add(scm);
+          }
+          
+          // TODO just rebuild map
+          entry.getValue().clear();
+          entry.getValue().addAll(okMutations);
+        }
+        
+      }
+    }
+
+    private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, TCredentials credentials) {
+      Set<Entry<KeyExtent,List<ServerConditionalMutation>>> es = updates.entrySet();
+      
+      Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
+
+      // TODO stats
+
+      for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) {
+        Tablet tablet = onlineTablets.get(entry.getKey());
+        if (tablet == null || tablet.isClosed()) {
+          for (ServerConditionalMutation scm : entry.getValue())
+            results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+        } else {
+          // TODO write tracker
+          
+          try {
+            
+            List<Mutation> mutations = (List<Mutation>) (List<? extends Mutation>) entry.getValue();
+            if (mutations.size() > 0) {
+
+              CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, credentials), mutations);
+              
+              if (cs == null) {
+                for (ServerConditionalMutation scm : entry.getValue())
+                  results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+              } else {
+                for (ServerConditionalMutation scm : entry.getValue())
+                  results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED));
+                sendables.put(cs, mutations);
+              }
+            }
+          } catch (TConstraintViolationException e) {
+            if (e.getNonViolators().size() > 0) {
+              sendables.put(e.getCommitSession(), e.getNonViolators());
+              for (Mutation m : e.getNonViolators())
+                results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.ACCEPTED));
+            }
+            
+            for (Mutation m : e.getViolators())
+              results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.VIOLATED));
+          }
+        }
+      }
+      
+      while (true && sendables.size() > 0) {
+        try {
+          logger.logManyTablets(sendables);
+          break;
+        } catch (IOException ex) {
+          log.warn("logging mutations failed, retrying");
+        } catch (FSError ex) { // happens when DFS is localFS
+          log.warn("logging mutations failed, retrying");
+        } catch (Throwable t) {
+          log.error("Unknown exception logging mutations, counts for mutations in flight not decremented!", t);
+          throw new RuntimeException(t);
+        }
+      }
+      
+      for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) {
+        CommitSession commitSession = entry.getKey();
+        List<Mutation> mutations = entry.getValue();
+        
+        commitSession.commit(mutations);
+      }
+
+    }
+
+    private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(TCredentials credentials, List<ByteBuffer> authorizations,
+        Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results) {
+      // sort each list of mutations, this is done to avoid deadlock and doing seeks in order is more efficient and detect duplicate rows.
+      ConditionalMutationSet.sortConditionalMutations(updates);
+      
+      Map<KeyExtent,List<ServerConditionalMutation>> deferred = new HashMap<KeyExtent,List<ServerConditionalMutation>>();
+
+      // can not process two mutations for the same row, because one will not see what the other writes
+      ConditionalMutationSet.deferDuplicatesRows(updates, deferred);
+
+      // get as many locks as possible w/o blocking... defer any rows that are locked
+      List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
+      try {
+        checkConditions(updates, results, new Authorizations(authorizations));
+        writeConditionalMutations(updates, results, credentials);
+      } finally {
+        rowLocks.releaseRowLocks(locks);
+      }
+      return deferred;
+    }
+    
+    @Override
+    public List<TCMResult> conditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations,
+        Map<TKeyExtent,List<TConditionalMutation>> mutations) throws TException {
+      // TODO check credentials, permissions, and authorizations
+      // TODO sessions, should show up in list scans
+      // TODO timeout like scans do
+      
+      Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations, Translator.TKET,
+          new Translator.ListTranslator<TConditionalMutation,ServerConditionalMutation>(ServerConditionalMutation.TCMT));
+      
+      ArrayList<TCMResult> results = new ArrayList<TCMResult>();
+      
+      Map<KeyExtent,List<ServerConditionalMutation>> deferred = conditionalUpdate(credentials, authorizations, updates, results);
+
+      while (deferred.size() > 0) {
+        deferred = conditionalUpdate(credentials, authorizations, deferred, results);
+      }
+
+      return results;
+    }
+
+
     @Override
     public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, ByteBuffer splitPoint) throws NotServingTabletException,
         ThriftSecurityException {

Added: accumulo/branches/ACCUMULO-1000/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-1000/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java?rev=1502726&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-1000/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java (added)
+++ accumulo/branches/ACCUMULO-1000/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java Fri Jul 12 23:29:57 2013
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.data.ConditionalMutation;
+
+
+/**
+ * A writer that will sometimes return unknown. When it returns unknown the condition may or may not have been written.
+ */
+public class FaultyConditionalWriter implements ConditionalWriter {
+  
+  private ConditionalWriter cw;
+  private double up;
+  private Random rand;
+  private double wp;
+  
+  public FaultyConditionalWriter(ConditionalWriter cw, double unknownProbability, double writeProbability) {
+    this.cw = cw;
+    this.up = unknownProbability;
+    this.wp = writeProbability;
+    this.rand = new Random();
+
+  }
+
+  public Iterator<Result> write(Iterator<ConditionalMutation> mutations) {
+    ArrayList<Result> resultList = new ArrayList<Result>();
+    ArrayList<ConditionalMutation> writes = new ArrayList<ConditionalMutation>();
+    
+    while (mutations.hasNext()) {
+      ConditionalMutation cm = mutations.next();
+      if (rand.nextDouble() <= up && rand.nextDouble() > wp)
+        resultList.add(new Result(Status.UNKNOWN, cm));
+      else
+        writes.add(cm);
+    }
+    
+    if (writes.size() > 0) {
+      Iterator<Result> results = cw.write(writes.iterator());
+      
+      while (results.hasNext()) {
+        Result result = results.next();
+        
+        if (rand.nextDouble() <= up && rand.nextDouble() <= wp)
+          result = new Result(Status.UNKNOWN, result.getMutation());
+        resultList.add(result);
+      }
+    }
+    return resultList.iterator();
+  }
+  
+  public Result write(ConditionalMutation mutation) {
+    return write(Collections.singleton(mutation).iterator()).next();
+  }
+  
+  public void setTimeout(long timeOut, TimeUnit timeUnit) {
+    cw.setTimeout(timeOut, timeUnit);
+  }
+  
+  public long getTimeout(TimeUnit timeUnit) {
+    return cw.getTimeout(timeUnit);
+  }
+  
+}

Modified: accumulo/branches/ACCUMULO-1000/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-1000/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java?rev=1502726&r1=1502725&r2=1502726&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-1000/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java (original)
+++ accumulo/branches/ACCUMULO-1000/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java Fri Jul 12 23:29:57 2013
@@ -40,7 +40,9 @@ import org.apache.accumulo.core.data.thr
 import org.apache.accumulo.core.data.thrift.MapFileInfo;
 import org.apache.accumulo.core.data.thrift.MultiScanResult;
 import org.apache.accumulo.core.data.thrift.ScanResult;
+import org.apache.accumulo.core.data.thrift.TCMResult;
 import org.apache.accumulo.core.data.thrift.TColumn;
+import org.apache.accumulo.core.data.thrift.TConditionalMutation;
 import org.apache.accumulo.core.data.thrift.TConstraintViolationSummary;
 import org.apache.accumulo.core.data.thrift.TKeyExtent;
 import org.apache.accumulo.core.data.thrift.TMutation;
@@ -200,6 +202,12 @@ public class NullTserver {
     public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
       return new ArrayList<ActiveCompaction>();
     }
+
+    @Override
+    public List<TCMResult> conditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations,
+        Map<TKeyExtent,List<TConditionalMutation>> mutations) throws TException {
+      return null;
+    }
   }
   
   static class Opts extends Help {

Added: accumulo/branches/ACCUMULO-1000/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-1000/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java?rev=1502726&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-1000/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java (added)
+++ accumulo/branches/ACCUMULO-1000/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java Fri Jul 12 23:29:57 2013
@@ -0,0 +1,740 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriter.Result;
+import org.apache.accumulo.core.client.ConditionalWriter.Status;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Condition;
+import org.apache.accumulo.core.data.ConditionalMutation;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.LongCombiner.Type;
+import org.apache.accumulo.core.iterators.user.SummingCombiner;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.FastFormat;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.examples.simple.constraints.AlphaNumKeyConstraint;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.accumulo.minicluster.MiniAccumuloConfig;
+import org.apache.hadoop.io.Text;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+
+/**
+ * 
+ */
+public class ConditionalWriterTest {
+  
+  private static String secret = "superSecret";
+  public static TemporaryFolder folder = new TemporaryFolder();
+  public static MiniAccumuloCluster cluster;
+  
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    folder.create();
+    MiniAccumuloConfig cfg = new MiniAccumuloConfig(folder.newFolder("miniAccumulo"), secret);
+    cluster = new MiniAccumuloCluster(cfg);
+    cluster.start();
+  }
+
+  @Test
+  public void testBasic() throws Exception {
+
+    ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+    Connector conn = zki.getConnector("root", new PasswordToken(secret));
+    
+    conn.tableOperations().create("foo");
+
+    ConditionalWriter cw = conn.createConditionalWriter("foo", Authorizations.EMPTY);
+    
+    // mutation conditional on column tx:seq not exiting
+    ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq"));
+    cm0.put("name", "last", "doe");
+    cm0.put("name", "first", "john");
+    cm0.put("tx", "seq", "1");
+    Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus());
+    Assert.assertEquals(Status.REJECTED, cw.write(cm0).getStatus());
+
+    // mutation conditional on column tx:seq being 1
+    ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("1"));
+    cm1.put("name", "last", "Doe");
+    cm1.put("tx", "seq", "2");
+    Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
+
+    // test condition where value differs
+    ConditionalMutation cm2 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("1"));
+    cm2.put("name", "last", "DOE");
+    cm2.put("tx", "seq", "2");
+    Assert.assertEquals(Status.REJECTED, cw.write(cm2).getStatus());
+    
+    // test condition where column does not exists
+    ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("txtypo", "seq").setValue("1"));
+    cm3.put("name", "last", "deo");
+    cm3.put("tx", "seq", "2");
+    Assert.assertEquals(Status.REJECTED, cw.write(cm3).getStatus());
+    
+    // test two conditions, where one should fail
+    ConditionalMutation cm4 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("2"), new Condition("name", "last").setValue("doe"));
+    cm4.put("name", "last", "deo");
+    cm4.put("tx", "seq", "3");
+    Assert.assertEquals(Status.REJECTED, cw.write(cm4).getStatus());
+
+    // test two conditions, where one should fail
+    ConditionalMutation cm5 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("1"), new Condition("name", "last").setValue("Doe"));
+    cm5.put("name", "last", "deo");
+    cm5.put("tx", "seq", "3");
+    Assert.assertEquals(Status.REJECTED, cw.write(cm5).getStatus());
+
+    // ensure rejected mutations did not write
+    Scanner scanner = conn.createScanner("foo", Authorizations.EMPTY);
+    scanner.fetchColumn(new Text("name"), new Text("last"));
+    scanner.setRange(new Range("99006"));
+    Assert.assertEquals("Doe", scanner.iterator().next().getValue().toString());
+
+    // test w/ two conditions that are met
+    ConditionalMutation cm6 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("2"), new Condition("name", "last").setValue("Doe"));
+    cm6.put("name", "last", "DOE");
+    cm6.put("tx", "seq", "3");
+    Assert.assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus());
+    
+    Assert.assertEquals("DOE", scanner.iterator().next().getValue().toString());
+    
+    // test a conditional mutation that deletes
+    ConditionalMutation cm7 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("3"));
+    cm7.putDelete("name", "last");
+    cm7.putDelete("name", "first");
+    cm7.putDelete("tx", "seq");
+    Assert.assertEquals(Status.ACCEPTED, cw.write(cm7).getStatus());
+    
+    Assert.assertFalse(scanner.iterator().hasNext());
+
+    // add the row back
+    Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus());
+    Assert.assertEquals(Status.REJECTED, cw.write(cm0).getStatus());
+    
+    Assert.assertEquals("doe", scanner.iterator().next().getValue().toString());
+  }
+  
+  @Test
+  public void testFields() throws Exception {
+    String table = "foo2";
+    
+    ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+    Connector conn = zki.getConnector("root", new PasswordToken(secret));
+    
+    conn.tableOperations().create(table);
+    
+    Authorizations auths = new Authorizations("A", "B");
+    
+    conn.securityOperations().changeUserAuthorizations("root", auths);
+    
+    ConditionalWriter cw = conn.createConditionalWriter(table, auths);
+    
+    ColumnVisibility cva = new ColumnVisibility("A");
+    ColumnVisibility cvb = new ColumnVisibility("B");
+    
+    ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva));
+    cm0.put("name", "last", cva, "doe");
+    cm0.put("name", "first", cva, "john");
+    cm0.put("tx", "seq", cva, "1");
+    Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus());
+    
+    Scanner scanner = conn.createScanner(table, auths);
+    scanner.setRange(new Range("99006"));
+    // TODO verify all columns
+    scanner.fetchColumn(new Text("tx"), new Text("seq"));
+    Entry<Key,Value> entry = scanner.iterator().next();
+    Assert.assertEquals("1", entry.getValue().toString());
+    long ts = entry.getKey().getTimestamp();
+    
+    // test wrong colf
+    ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("txA", "seq").setVisibility(cva).setValue("1"));
+    cm1.put("name", "last", cva, "Doe");
+    cm1.put("name", "first", cva, "John");
+    cm1.put("tx", "seq", cva, "2");
+    Assert.assertEquals(Status.REJECTED, cw.write(cm1).getStatus());
+    
+    // test wrong colq
+    ConditionalMutation cm2 = new ConditionalMutation("99006", new Condition("tx", "seqA").setVisibility(cva).setValue("1"));
+    cm2.put("name", "last", cva, "Doe");
+    cm2.put("name", "first", cva, "John");
+    cm2.put("tx", "seq", cva, "2");
+    Assert.assertEquals(Status.REJECTED, cw.write(cm2).getStatus());
+    
+    // test wrong colv
+    ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1"));
+    cm3.put("name", "last", cva, "Doe");
+    cm3.put("name", "first", cva, "John");
+    cm3.put("tx", "seq", cva, "2");
+    Assert.assertEquals(Status.REJECTED, cw.write(cm3).getStatus());
+
+    // test wrong timestamp
+    ConditionalMutation cm4 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva).setTimestamp(ts + 1).setValue("1"));
+    cm4.put("name", "last", cva, "Doe");
+    cm4.put("name", "first", cva, "John");
+    cm4.put("tx", "seq", cva, "2");
+    Assert.assertEquals(Status.REJECTED, cw.write(cm4).getStatus());
+    
+    // test wrong timestamp
+    ConditionalMutation cm5 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva).setTimestamp(ts - 1).setValue("1"));
+    cm5.put("name", "last", cva, "Doe");
+    cm5.put("name", "first", cva, "John");
+    cm5.put("tx", "seq", cva, "2");
+    Assert.assertEquals(Status.REJECTED, cw.write(cm5).getStatus());
+
+    // ensure no updates were made
+    entry = scanner.iterator().next();
+    Assert.assertEquals("1", entry.getValue().toString());
+
+    // set all columns correctly
+    ConditionalMutation cm6 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva).setTimestamp(ts).setValue("1"));
+    cm6.put("name", "last", cva, "Doe");
+    cm6.put("name", "first", cva, "John");
+    cm6.put("tx", "seq", cva, "2");
+    Assert.assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus());
+
+    entry = scanner.iterator().next();
+    Assert.assertEquals("2", entry.getValue().toString());
+    
+    // TODO test each field w/ absence
+
+  }
+
+  @Test
+  public void testBadColVis() throws Exception {
+    // test when a user sets a col vis in a condition that can never be seen
+    String table = "foo3";
+    
+    ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+    Connector conn = zki.getConnector("root", new PasswordToken(secret));
+    
+    conn.tableOperations().create(table);
+    
+    Authorizations auths = new Authorizations("A", "B");
+    
+    conn.securityOperations().changeUserAuthorizations("root", auths);
+
+    Authorizations filteredAuths = new Authorizations("A");
+    
+    ConditionalWriter cw = conn.createConditionalWriter(table, filteredAuths);
+    
+    ColumnVisibility cva = new ColumnVisibility("A");
+    ColumnVisibility cvb = new ColumnVisibility("B");
+    ColumnVisibility cvc = new ColumnVisibility("C");
+    
+    // User has authorization, but didn't include it in the writer
+    ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb));
+    cm0.put("name", "last", cva, "doe");
+    cm0.put("name", "first", cva, "john");
+    cm0.put("tx", "seq", cva, "1");
+    Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm0).getStatus());
+    
+    ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1"));
+    cm1.put("name", "last", cva, "doe");
+    cm1.put("name", "first", cva, "john");
+    cm1.put("tx", "seq", cva, "1");
+    Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm1).getStatus());
+
+    // User does not have the authorization
+    ConditionalMutation cm2 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvc));
+    cm2.put("name", "last", cva, "doe");
+    cm2.put("name", "first", cva, "john");
+    cm2.put("tx", "seq", cva, "1");
+    Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm2).getStatus());
+    
+    ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvc).setValue("1"));
+    cm3.put("name", "last", cva, "doe");
+    cm3.put("name", "first", cva, "john");
+    cm3.put("tx", "seq", cva, "1");
+    Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm3).getStatus());
+
+    // if any visibility is bad, good visibilities don't override
+    ConditionalMutation cm4 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb), new Condition("tx", "seq").setVisibility(cva));
+
+    cm4.put("name", "last", cva, "doe");
+    cm4.put("name", "first", cva, "john");
+    cm4.put("tx", "seq", cva, "1");
+    Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm4).getStatus());
+    
+    ConditionalMutation cm5 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1"), new Condition("tx", "seq")
+        .setVisibility(cva).setValue("1"));
+    cm5.put("name", "last", cva, "doe");
+    cm5.put("name", "first", cva, "john");
+    cm5.put("tx", "seq", cva, "1");
+    Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm5).getStatus());
+
+    ConditionalMutation cm6 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1"),
+        new Condition("tx", "seq").setVisibility(cva));
+    cm6.put("name", "last", cva, "doe");
+    cm6.put("name", "first", cva, "john");
+    cm6.put("tx", "seq", cva, "1");
+    Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm6).getStatus());
+
+    ConditionalMutation cm7 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb), new Condition("tx", "seq").setVisibility(cva)
+        .setValue("1"));
+    cm7.put("name", "last", cva, "doe");
+    cm7.put("name", "first", cva, "john");
+    cm7.put("tx", "seq", cva, "1");
+    Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm7).getStatus());
+  }
+  
+  @Test
+  public void testConstraints() throws Exception {
+    // ensure constraint violations are properly reported
+    String table = "foo5";
+    
+    ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+    Connector conn = zki.getConnector("root", new PasswordToken(secret));
+    
+    conn.tableOperations().create(table);
+    conn.tableOperations().addConstraint(table, AlphaNumKeyConstraint.class.getName());
+    conn.tableOperations().clone(table, table + "_clone", true, new HashMap<String,String>(), new HashSet<String>());
+    
+    Scanner scanner = conn.createScanner(table + "_clone", new Authorizations());
+
+    ConditionalWriter cw = conn.createConditionalWriter(table + "_clone", new Authorizations());
+
+    ConditionalMutation cm0 = new ConditionalMutation("99006+", new Condition("tx", "seq"));
+    cm0.put("tx", "seq", "1");
+    
+    Assert.assertEquals(Status.VIOLATED, cw.write(cm0).getStatus());
+    Assert.assertFalse(scanner.iterator().hasNext());
+    
+    ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("tx", "seq"));
+    cm1.put("tx", "seq", "1");
+    
+    Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
+    Assert.assertTrue(scanner.iterator().hasNext());
+
+  }
+
+  @Test
+  public void testIterators() throws Exception {
+    String table = "foo4";
+    
+    ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+    Connector conn = zki.getConnector("root", new PasswordToken(secret));
+    
+    conn.tableOperations().create(table, false);
+    
+    BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
+    
+    Mutation m = new Mutation("ACCUMULO-1000");
+    m.put("count", "comments", "1");
+    bw.addMutation(m);
+    bw.addMutation(m);
+    bw.addMutation(m);
+    bw.close();
+    
+    IteratorSetting iterConfig = new IteratorSetting(10, SummingCombiner.class);
+    SummingCombiner.setEncodingType(iterConfig, Type.STRING);
+    SummingCombiner.setColumns(iterConfig, Collections.singletonList(new IteratorSetting.Column("count")));
+    
+    Scanner scanner = conn.createScanner(table, new Authorizations());
+    scanner.addScanIterator(iterConfig);
+    scanner.setRange(new Range("ACCUMULO-1000"));
+    scanner.fetchColumn(new Text("count"), new Text("comments"));
+    
+    Assert.assertEquals("3", scanner.iterator().next().getValue().toString());
+
+    ConditionalWriter cw = conn.createConditionalWriter(table, new Authorizations());
+    
+    ConditionalMutation cm0 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setValue("3"));
+    cm0.put("count", "comments", "1");
+    Assert.assertEquals(Status.REJECTED, cw.write(cm0).getStatus());
+    Assert.assertEquals("3", scanner.iterator().next().getValue().toString());
+    
+    ConditionalMutation cm1 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(iterConfig).setValue("3"));
+    cm1.put("count", "comments", "1");
+    Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
+    Assert.assertEquals("4", scanner.iterator().next().getValue().toString());
+    
+    ConditionalMutation cm2 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setValue("4"));
+    cm2.put("count", "comments", "1");
+    Assert.assertEquals(Status.REJECTED, cw.write(cm1).getStatus());
+    Assert.assertEquals("4", scanner.iterator().next().getValue().toString());
+    
+    // TODO test conditions with different iterators
+    // TODO test w/ table that has iterators configured
+  }
+
+  @Test
+  public void testBatch() throws Exception {
+    String table = "foo6";
+    
+    ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+    Connector conn = zki.getConnector("root", new PasswordToken(secret));
+    
+    conn.tableOperations().create(table);
+    
+    conn.securityOperations().changeUserAuthorizations("root", new Authorizations("A", "B"));
+    
+    ColumnVisibility cvab = new ColumnVisibility("A|B");
+    
+    ArrayList<ConditionalMutation> mutations = new ArrayList<ConditionalMutation>();
+    
+    ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvab));
+    cm0.put("name", "last", cvab, "doe");
+    cm0.put("name", "first", cvab, "john");
+    cm0.put("tx", "seq", cvab, "1");
+    mutations.add(cm0);
+    
+    ConditionalMutation cm1 = new ConditionalMutation("59056", new Condition("tx", "seq").setVisibility(cvab));
+    cm1.put("name", "last", cvab, "doe");
+    cm1.put("name", "first", cvab, "jane");
+    cm1.put("tx", "seq", cvab, "1");
+    mutations.add(cm1);
+    
+    ConditionalMutation cm2 = new ConditionalMutation("19059", new Condition("tx", "seq").setVisibility(cvab));
+    cm2.put("name", "last", cvab, "doe");
+    cm2.put("name", "first", cvab, "jack");
+    cm2.put("tx", "seq", cvab, "1");
+    mutations.add(cm2);
+    
+    ConditionalWriter cw = conn.createConditionalWriter(table, new Authorizations("A"));
+    Iterator<Result> results = cw.write(mutations.iterator());
+    int count = 0;
+    while (results.hasNext()) {
+      Result result = results.next();
+      Assert.assertEquals(Status.ACCEPTED, result.getStatus());
+      count++;
+    }
+    
+    Assert.assertEquals(3, count);
+
+    Scanner scanner = conn.createScanner(table, new Authorizations("A"));
+    scanner.fetchColumn(new Text("tx"), new Text("seq"));
+    
+    for (String row : new String[] {"99006", "59056", "19059"}) {
+      scanner.setRange(new Range(row));
+      Assert.assertEquals("1", scanner.iterator().next().getValue().toString());
+    }
+
+    TreeSet<Text> splits = new TreeSet<Text>();
+    splits.add(new Text("7"));
+    splits.add(new Text("3"));
+    conn.tableOperations().addSplits(table, splits);
+
+    mutations.clear();
+
+    ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvab).setValue("1"));
+    cm3.put("name", "last", cvab, "Doe");
+    cm3.put("tx", "seq", cvab, "2");
+    mutations.add(cm3);
+    
+    ConditionalMutation cm4 = new ConditionalMutation("59056", new Condition("tx", "seq").setVisibility(cvab));
+    cm4.put("name", "last", cvab, "Doe");
+    cm4.put("tx", "seq", cvab, "1");
+    mutations.add(cm4);
+    
+    ConditionalMutation cm5 = new ConditionalMutation("19059", new Condition("tx", "seq").setVisibility(cvab).setValue("2"));
+    cm5.put("name", "last", cvab, "Doe");
+    cm5.put("tx", "seq", cvab, "3");
+    mutations.add(cm5);
+
+    results = cw.write(mutations.iterator());
+    int accepted = 0;
+    int rejected = 0;
+    while (results.hasNext()) {
+      Result result = results.next();
+      if (new String(result.getMutation().getRow()).equals("99006")) {
+        Assert.assertEquals(Status.ACCEPTED, result.getStatus());
+        accepted++;
+      } else {
+        Assert.assertEquals(Status.REJECTED, result.getStatus());
+        rejected++;
+      }
+    }
+    
+    Assert.assertEquals(1, accepted);
+    Assert.assertEquals(2, rejected);
+
+    for (String row : new String[] {"59056", "19059"}) {
+      scanner.setRange(new Range(row));
+      Assert.assertEquals("1", scanner.iterator().next().getValue().toString());
+    }
+    
+    scanner.setRange(new Range("99006"));
+    Assert.assertEquals("2", scanner.iterator().next().getValue().toString());
+
+    scanner.clearColumns();
+    scanner.fetchColumn(new Text("name"), new Text("last"));
+    Assert.assertEquals("Doe", scanner.iterator().next().getValue().toString());
+  }
+  
+  @Test
+  public void testBigBatch() throws Exception {
+    
+    String table = "foo100";
+    
+    ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+    Connector conn = zki.getConnector("root", new PasswordToken(secret));
+    
+    conn.tableOperations().create(table);
+    conn.tableOperations().addSplits(table, nss("2", "4", "6"));
+    
+    UtilWaitThread.sleep(2000);
+
+    int num = 100;
+    
+    ArrayList<byte[]> rows = new ArrayList<byte[]>(num);
+    ArrayList<ConditionalMutation> cml = new ArrayList<ConditionalMutation>(num);
+    
+    Random r = new Random();
+    byte[] e = new byte[0];
+    
+    for (int i = 0; i < num; i++) {
+      rows.add(FastFormat.toZeroPaddedString(Math.abs(r.nextLong()), 16, 16, e));
+    }
+    
+    for (int i = 0; i < num; i++) {
+      ConditionalMutation cm = new ConditionalMutation(rows.get(i), new Condition("meta", "seq"));
+      
+      cm.put("meta", "seq", "1");
+      cm.put("meta", "tx", UUID.randomUUID().toString());
+      
+      cml.add(cm);
+    }
+
+    ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY);
+
+    Iterator<Result> results = cw.write(cml.iterator());
+
+    int count = 0;
+    
+    // TODO check got each row back
+    while (results.hasNext()) {
+      Result result = results.next();
+      Assert.assertEquals(Status.ACCEPTED, result.getStatus());
+      count++;
+    }
+    
+    Assert.assertEquals(num, count);
+
+    ArrayList<ConditionalMutation> cml2 = new ArrayList<ConditionalMutation>(num);
+    
+    for (int i = 0; i < num; i++) {
+      ConditionalMutation cm = new ConditionalMutation(rows.get(i), new Condition("meta", "seq").setValue("1"));
+      
+      cm.put("meta", "seq", "2");
+      cm.put("meta", "tx", UUID.randomUUID().toString());
+      
+      cml2.add(cm);
+    }
+    
+    count = 0;
+
+    results = cw.write(cml2.iterator());
+    
+    while (results.hasNext()) {
+      Result result = results.next();
+      Assert.assertEquals(Status.ACCEPTED, result.getStatus());
+      count++;
+    }
+    
+    Assert.assertEquals(num, count);
+  }
+  
+  @Test
+  public void testBatchErrors() throws Exception {
+    
+    String table = "foo7";
+    
+    ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+    Connector conn = zki.getConnector("root", new PasswordToken(secret));
+    
+    conn.tableOperations().create(table);
+    conn.tableOperations().addConstraint(table, AlphaNumKeyConstraint.class.getName());
+    conn.tableOperations().clone(table, table + "_clone", true, new HashMap<String,String>(), new HashSet<String>());
+
+    conn.securityOperations().changeUserAuthorizations("root", new Authorizations("A", "B"));
+    
+    ColumnVisibility cvaob = new ColumnVisibility("A|B");
+    ColumnVisibility cvaab = new ColumnVisibility("A&B");
+    
+    switch ((new Random()).nextInt(3)) {
+      case 1:
+        conn.tableOperations().addSplits(table, nss("6"));
+        break;
+      case 2:
+        conn.tableOperations().addSplits(table, nss("2", "95"));
+        break;
+    }
+    
+    ArrayList<ConditionalMutation> mutations = new ArrayList<ConditionalMutation>();
+    
+    ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvaob));
+    cm0.put("name+", "last", cvaob, "doe");
+    cm0.put("name", "first", cvaob, "john");
+    cm0.put("tx", "seq", cvaob, "1");
+    mutations.add(cm0);
+    
+    ConditionalMutation cm1 = new ConditionalMutation("59056", new Condition("tx", "seq").setVisibility(cvaab));
+    cm1.put("name", "last", cvaab, "doe");
+    cm1.put("name", "first", cvaab, "jane");
+    cm1.put("tx", "seq", cvaab, "1");
+    mutations.add(cm1);
+    
+    ConditionalMutation cm2 = new ConditionalMutation("19059", new Condition("tx", "seq").setVisibility(cvaob));
+    cm2.put("name", "last", cvaob, "doe");
+    cm2.put("name", "first", cvaob, "jack");
+    cm2.put("tx", "seq", cvaob, "1");
+    mutations.add(cm2);
+    
+    ConditionalMutation cm3 = new ConditionalMutation("90909", new Condition("tx", "seq").setVisibility(cvaob).setValue("1"));
+    cm3.put("name", "last", cvaob, "doe");
+    cm3.put("name", "first", cvaob, "john");
+    cm3.put("tx", "seq", cvaob, "2");
+    mutations.add(cm3);
+
+    ConditionalWriter cw = conn.createConditionalWriter(table, new Authorizations("A"));
+    Iterator<Result> results = cw.write(mutations.iterator());
+    HashSet<String> rows = new HashSet<String>();
+    while (results.hasNext()) {
+      Result result = results.next();
+      String row = new String(result.getMutation().getRow());
+      if (row.equals("19059")) {
+        Assert.assertEquals(Status.ACCEPTED, result.getStatus());
+      } else if (row.equals("59056")) {
+        Assert.assertEquals(Status.INVISIBLE_VISIBILITY, result.getStatus());
+      } else if (row.equals("99006")) {
+        Assert.assertEquals(Status.VIOLATED, result.getStatus());
+      } else if (row.equals("90909")) {
+        Assert.assertEquals(Status.REJECTED, result.getStatus());
+      }
+      rows.add(row);
+    }
+    
+    Assert.assertEquals(4, rows.size());
+
+    Scanner scanner = conn.createScanner(table, new Authorizations("A"));
+    scanner.fetchColumn(new Text("tx"), new Text("seq"));
+    
+    Iterator<Entry<Key,Value>> iter = scanner.iterator();
+    Assert.assertEquals("1", iter.next().getValue().toString());
+    Assert.assertFalse(iter.hasNext());
+
+  }
+  
+  @Test
+  public void testSameRow() throws Exception {
+    // test multiple mutations for same row in same batch
+    
+    String table = "foo8";
+    
+    ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers());
+    Connector conn = zki.getConnector("root", new PasswordToken(secret));
+    
+    conn.tableOperations().create(table);
+    
+    ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY);
+    
+    ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq"));
+    cm1.put("tx", "seq", "1");
+    cm1.put("data", "x", "a");
+    
+    Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
+    
+    ConditionalMutation cm2 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1"));
+    cm2.put("tx", "seq", "2");
+    cm2.put("data", "x", "b");
+    
+    ConditionalMutation cm3 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1"));
+    cm3.put("tx", "seq", "2");
+    cm3.put("data", "x", "c");
+    
+    ConditionalMutation cm4 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1"));
+    cm4.put("tx", "seq", "2");
+    cm4.put("data", "x", "d");
+    
+    Iterator<Result> results = cw.write(Arrays.asList(cm2, cm3, cm4).iterator());
+    
+    int accepted = 0;
+    int rejected = 0;
+    int total = 0;
+    
+    while (results.hasNext()) {
+      Status status = results.next().getStatus();
+      if (status == Status.ACCEPTED)
+        accepted++;
+      if (status == Status.REJECTED)
+        rejected++;
+      total++;
+    }
+    
+    Assert.assertEquals(1, accepted);
+    Assert.assertEquals(2, rejected);
+    Assert.assertEquals(3, total);
+  }
+
+  private SortedSet<Text> nss(String... splits) {
+    TreeSet<Text> ret = new TreeSet<Text>();
+    for (String split : splits)
+      ret.add(new Text(split));
+    
+    return ret;
+  }
+
+  @Test
+  public void testSecurity() {
+    // test against table user does not have read and/or write permissions for
+  }
+
+  @Test
+  public void testTimeout() {
+    
+  }
+
+  @Test
+  public void testOffline() {
+    // TODO test against a offline table
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    cluster.stop();
+    folder.delete();
+  }
+}



Mime
View raw message