accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [6/6] git commit: ACCUMULO-1000 added conditional mutations to Accumulo
Date Wed, 24 Jul 2013 19:49:25 GMT
ACCUMULO-1000 added conditional mutations to Accumulo


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9dc24448
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9dc24448
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9dc24448

Branch: refs/heads/master
Commit: 9dc244484b4d35859d4d22b27580a47ae7da0e1a
Parents: 77cac56
Author: Keith Turner <kturner@apache.org>
Authored: Wed Jul 24 15:48:48 2013 -0400
Committer: Keith Turner <kturner@apache.org>
Committed: Wed Jul 24 15:48:48 2013 -0400

----------------------------------------------------------------------
 .../accumulo/core/client/ConditionalWriter.java |  138 +
 .../core/client/ConditionalWriterConfig.java    |  118 +
 .../apache/accumulo/core/client/Connector.java  |   15 +
 .../core/client/impl/CompressedIterators.java   |  131 +
 .../core/client/impl/ConditionalWriterImpl.java |  794 ++++
 .../core/client/impl/ConnectorImpl.java         |    8 +-
 .../core/client/impl/RootTabletLocator.java     |    8 +-
 .../accumulo/core/client/impl/ServerClient.java |    4 +-
 .../core/client/impl/TabletLocator.java         |   16 +-
 .../core/client/impl/TabletLocatorImpl.java     |   16 +-
 .../client/impl/TabletServerBatchWriter.java    |   34 +-
 .../core/client/impl/TimeoutTabletLocator.java  |    4 +-
 .../core/client/mock/MockConnector.java         |    8 +
 .../core/client/mock/MockTabletLocator.java     |    8 +-
 .../accumulo/core/data/ArrayByteSequence.java   |   13 +
 .../apache/accumulo/core/data/Condition.java    |  148 +
 .../accumulo/core/data/ConditionalMutation.java |   83 +
 .../org/apache/accumulo/core/data/Mutation.java |  130 +-
 .../accumulo/core/data/thrift/TCMResult.java    |  516 ++
 .../accumulo/core/data/thrift/TCMStatus.java    |   67 +
 .../accumulo/core/data/thrift/TCondition.java   | 1049 +++++
 .../core/data/thrift/TConditionalMutation.java  |  659 +++
 .../core/data/thrift/TConditionalSession.java   |  578 +++
 .../accumulo/core/file/rfile/RelativeKey.java   |   33 +-
 .../iterators/system/ColumnQualifierFilter.java |    5 +-
 .../thrift/TabletClientService.java             | 4395 +++++++++++++++++-
 .../accumulo/core/util/ByteBufferUtil.java      |   13 +
 .../core/util/UnsynchronizedBuffer.java         |  195 +
 core/src/main/thrift/data.thrift                |   36 +
 core/src/main/thrift/tabletserver.thrift        |   10 +-
 .../core/client/impl/TabletLocatorImplTest.java |    6 +-
 .../core/file/rfile/RelativeKeyTest.java        |   31 +-
 .../server/data/ServerConditionalMutation.java  |   58 +
 .../server/security/SecurityOperation.java      |    7 +
 .../tabletserver/ConditionalMutationSet.java    |   91 +
 .../accumulo/server/tabletserver/RowLocks.java  |  162 +
 .../accumulo/server/tabletserver/Tablet.java    |    8 +-
 .../server/tabletserver/TabletServer.java       |  308 +-
 .../server/client/BulkImporterTest.java         |    7 +-
 .../accumulo/test/FaultyConditionalWriter.java  |   81 +
 .../accumulo/test/functional/BadIterator.java   |    5 +
 .../accumulo/test/functional/SlowIterator.java  |   24 +-
 .../test/performance/thrift/NullTserver.java    |   23 +
 .../accumulo/test/ConditionalWriterTest.java    | 1236 +++++
 44 files changed, 10840 insertions(+), 439 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
new file mode 100644
index 0000000..4ed4d31
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client;
+
+import java.util.Iterator;
+
+import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.data.ConditionalMutation;
+
+/**
+ * @since 1.6.0
+ */
+public interface ConditionalWriter {
+  public static class Result {
+    
+    private Status status;
+    private ConditionalMutation mutation;
+    private String server;
+    private Exception exception;
+    
+    public Result(Status s, ConditionalMutation m, String server) {
+      this.status = s;
+      this.mutation = m;
+      this.server = server;
+    }
+    
+    public Result(Exception e, ConditionalMutation cm, String server) {
+      this.exception = e;
+      this.mutation = cm;
+      this.server = server;
+    }
+
+    /**
+     * If this method throws an exception, then its possible the mutation is still being actively processed. Therefore if code chooses to continue after seeing
+     * an exception it should take this into consideration.
+     * 
+     * @return status of a conditional mutation
+     * @throws AccumuloException
+     * @throws AccumuloSecurityException
+     */
+
+    public Status getStatus() throws AccumuloException, AccumuloSecurityException {
+      if (status == null) {
+        if (exception instanceof AccumuloException)
+          throw new AccumuloException(exception);
+        if (exception instanceof AccumuloSecurityException) {
+          AccumuloSecurityException ase = (AccumuloSecurityException) exception;
+          throw new AccumuloSecurityException(ase.getUser(), SecurityErrorCode.valueOf(ase.getSecurityErrorCode().name()), ase.getTableInfo(), ase);
+        }
+        else
+          throw new AccumuloException(exception);
+      }
+
+      return status;
+    }
+    
+    /**
+     * 
+     * @return A copy of the mutation previously submitted by a user. The mutation will reference the same data, but the object may be different.
+     */
+    public ConditionalMutation getMutation() {
+      return mutation;
+    }
+    
+    /**
+     * 
+     * @return The server this mutation was sent to. Returns null if was not sent to a server.
+     */
+    public String getTabletServer() {
+      return server;
+    }
+  }
+  
+  public static enum Status {
+    /**
+     * conditions were met and mutation was written
+     */
+    ACCEPTED,
+    /**
+     * conditions were not met and mutation was not written
+     */
+    REJECTED,
+    /**
+     * mutation violated a constraint and was not written
+     */
+    VIOLATED,
+    /**
+     * error occurred after mutation was sent to server, its unknown if the mutation was written. Although the status of the mutation is unknown, Accumulo
+     * guarantees the mutation will not be written at a later point in time.
+     */
+    UNKNOWN,
+    /**
+     * A condition contained a column visibility that could never be seen
+     */
+    INVISIBLE_VISIBILITY,
+
+  }
+
+  /**
+   * This method returns one result for each mutation passed to it. This method is thread safe. Multiple threads can safely use a single conditional writer.
+   * Sharing a conditional writer between multiple threads may result in batching of request to tablet servers.
+   * 
+   * @param mutations
+   * @return Result for each mutation submitted. The mutations may still be processing in the background when this method returns, if so the iterator will
+   *         block.
+   */
+  public abstract Iterator<Result> write(Iterator<ConditionalMutation> mutations);
+  
+  /**
+   * This method has the same thread safety guarantees as @link {@link #write(Iterator)}
+   * 
+   * 
+   * @param mutation
+   * @return Result for the submitted mutation
+   */
+
+  public abstract Result write(ConditionalMutation mutation);
+
+  /**
+   * release any resources (like threads pools) used by conditional writer
+   */
+  public void close();
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java
new file mode 100644
index 0000000..f2a91ea
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.ArgumentChecker;
+
+/**
+ * 
+ * @since 1.6.0
+ */
+public class ConditionalWriterConfig {
+  
+  private static final Long DEFAULT_TIMEOUT = Long.MAX_VALUE;
+  private Long timeout = null;
+  
+  private static final Integer DEFAULT_MAX_WRITE_THREADS = 3;
+  private Integer maxWriteThreads = null;
+  
+  private Authorizations auths = Authorizations.EMPTY;
+  
+  /**
+   * A set of authorization labels that will be checked against the column visibility of each key in order to filter data. The authorizations passed in must be
+   * a subset of the accumulo user's set of authorizations. If the accumulo user has authorizations (A1, A2) and authorizations (A2, A3) are passed, then an
+   * exception will be thrown.
+   * 
+   * <p>
+   * Any condition that is not visible with this set of authorizations will fail.
+   * 
+   * @param auths
+   */
+  public ConditionalWriterConfig setAuthorizations(Authorizations auths) {
+    ArgumentChecker.notNull(auths);
+    this.auths = auths;
+    return this;
+  }
+  
+  /**
+   * Sets the maximum amount of time an unresponsive server will be re-tried. When this timeout is exceeded, the {@link ConditionalWriter} should return the
+   * mutation with an exception.<br />
+   * For no timeout, set to zero, or {@link Long#MAX_VALUE} with {@link TimeUnit#MILLISECONDS}.
+   * 
+   * <p>
+   * {@link TimeUnit#MICROSECONDS} or {@link TimeUnit#NANOSECONDS} will be truncated to the nearest {@link TimeUnit#MILLISECONDS}.<br />
+   * If this truncation would result in making the value zero when it was specified as non-zero, then a minimum value of one {@link TimeUnit#MILLISECONDS} will
+   * be used.
+   * 
+   * <p>
+   * <b>Default:</b> {@link Long#MAX_VALUE} (no timeout)
+   * 
+   * @param timeout
+   *          the timeout, in the unit specified by the value of {@code timeUnit}
+   * @param timeUnit
+   *          determines how {@code timeout} will be interpreted
+   * @throws IllegalArgumentException
+   *           if {@code timeout} is less than 0
+   * @return {@code this} to allow chaining of set methods
+   */
+  public ConditionalWriterConfig setTimeout(long timeout, TimeUnit timeUnit) {
+    if (timeout < 0)
+      throw new IllegalArgumentException("Negative timeout not allowed " + timeout);
+    
+    if (timeout == 0)
+      this.timeout = Long.MAX_VALUE;
+    else
+      // make small, positive values that truncate to 0 when converted use the minimum millis instead
+      this.timeout = Math.max(1, timeUnit.toMillis(timeout));
+    return this;
+  }
+  
+  /**
+   * Sets the maximum number of threads to use for writing data to the tablet servers.
+   * 
+   * <p>
+   * <b>Default:</b> 3
+   * 
+   * @param maxWriteThreads
+   *          the maximum threads to use
+   * @throws IllegalArgumentException
+   *           if {@code maxWriteThreads} is non-positive
+   * @return {@code this} to allow chaining of set methods
+   */
+  public ConditionalWriterConfig setMaxWriteThreads(int maxWriteThreads) {
+    if (maxWriteThreads <= 0)
+      throw new IllegalArgumentException("Max threads must be positive " + maxWriteThreads);
+    
+    this.maxWriteThreads = maxWriteThreads;
+    return this;
+  }
+  
+  public Authorizations getAuthorizations() {
+    return auths;
+  }
+
+  public long getTimeout(TimeUnit timeUnit) {
+    return timeUnit.convert(timeout != null ? timeout : DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
+  }
+  
+  public int getMaxWriteThreads() {
+    return maxWriteThreads != null ? maxWriteThreads : DEFAULT_MAX_WRITE_THREADS;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/java/org/apache/accumulo/core/client/Connector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/Connector.java b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
index d2e7321..bbfa55f 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/Connector.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
@@ -174,6 +174,21 @@ public abstract class Connector {
   public abstract Scanner createScanner(String tableName, Authorizations authorizations) throws TableNotFoundException;
   
   /**
+   * Factory method to create a ConditionalWriter connected to Accumulo.
+   * 
+   * @param tableName
+   *          the name of the table to query data from
+   * @param config
+   *          configuration used to create conditional writer
+   * 
+   * @return ConditionalWriter object for writing ConditionalMutations
+   * @throws TableNotFoundException
+   *           when the specified table doesn't exist
+   * @since 1.6.0
+   */
+  public abstract ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) throws TableNotFoundException;
+
+  /**
    * Accessor method for internal instance object.
    * 
    * @return the internal instance object

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/java/org/apache/accumulo/core/client/impl/CompressedIterators.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/CompressedIterators.java b/core/src/main/java/org/apache/accumulo/core/client/impl/CompressedIterators.java
new file mode 100644
index 0000000..549322e
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/CompressedIterators.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.util.UnsynchronizedBuffer;
+
+public class CompressedIterators {
+  private Map<String,Integer> symbolMap;
+  private List<String> symbolTable;
+  private Map<ByteSequence,IterConfig> cache;
+  
+  public static class IterConfig {
+    public List<IterInfo> ssiList = new ArrayList<IterInfo>();
+    public Map<String,Map<String,String>> ssio = new HashMap<String,Map<String,String>>();
+  }
+
+  public CompressedIterators() {
+    symbolMap = new HashMap<String,Integer>();
+    symbolTable = new ArrayList<String>();
+  }
+  
+  public CompressedIterators(List<String> symbols) {
+    this.symbolTable = symbols;
+    this.cache = new HashMap<ByteSequence,IterConfig>();
+  }
+
+  private int getSymbolID(String symbol) {
+    Integer id = symbolMap.get(symbol);
+    if (id == null) {
+      id = symbolTable.size();
+      symbolTable.add(symbol);
+      symbolMap.put(symbol, id);
+    }
+    
+    return id;
+  }
+  
+  public ByteBuffer compress(IteratorSetting[] iterators) {
+    
+    UnsynchronizedBuffer.Writer out = new UnsynchronizedBuffer.Writer(iterators.length * 8);
+    
+    out.writeVInt(iterators.length);
+
+    for (IteratorSetting is : iterators) {
+      out.writeVInt(getSymbolID(is.getName()));
+      out.writeVInt(getSymbolID(is.getIteratorClass()));
+      out.writeVInt(is.getPriority());
+      
+      Map<String,String> opts = is.getOptions();
+      out.writeVInt(opts.size());
+      
+      for (Entry<String,String> entry : opts.entrySet()) {
+        out.writeVInt(getSymbolID(entry.getKey()));
+        out.writeVInt(getSymbolID(entry.getValue()));
+      }
+    }
+    
+    return out.toByteBuffer();
+    
+  }
+  
+  public IterConfig decompress(ByteBuffer iterators) {
+    
+    ByteSequence iterKey = new ArrayByteSequence(iterators);
+    IterConfig config = cache.get(iterKey);
+    if (config != null) {
+      return config;
+    }
+
+    config = new IterConfig();
+
+    UnsynchronizedBuffer.Reader in = new UnsynchronizedBuffer.Reader(iterators);
+
+    int num = in.readVInt();
+    
+    for (int i = 0; i < num; i++) {
+      String name = symbolTable.get(in.readVInt());
+      String iterClass = symbolTable.get(in.readVInt());
+      int prio = in.readVInt();
+      
+      config.ssiList.add(new IterInfo(prio, iterClass, name));
+      
+      int numOpts = in.readVInt();
+      
+      HashMap<String,String> opts = new HashMap<String,String>();
+      
+      for (int j = 0; j < numOpts; j++) {
+        String key = symbolTable.get(in.readVInt());
+        String val = symbolTable.get(in.readVInt());
+        
+        opts.put(key, val);
+      }
+      
+      config.ssio.put(name, opts);
+      
+    }
+
+    cache.put(iterKey, config);
+    return config;
+  }
+
+  public List<String> getSymbolTable() {
+    return symbolTable;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
new file mode 100644
index 0000000..9d7e257
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
@@ -0,0 +1,794 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.TableDeletedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.TimedOutException;
+import org.apache.accumulo.core.client.impl.TabletLocator.TabletServerMutations;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Condition;
+import org.apache.accumulo.core.data.ConditionalMutation;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.thrift.TCMResult;
+import org.apache.accumulo.core.data.thrift.TCMStatus;
+import org.apache.accumulo.core.data.thrift.TCondition;
+import org.apache.accumulo.core.data.thrift.TConditionalMutation;
+import org.apache.accumulo.core.data.thrift.TConditionalSession;
+import org.apache.accumulo.core.data.thrift.TKeyExtent;
+import org.apache.accumulo.core.data.thrift.TMutation;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.security.VisibilityEvaluator;
+import org.apache.accumulo.core.security.VisibilityParseException;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.util.BadArgumentException;
+import org.apache.accumulo.core.util.ByteBufferUtil;
+import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooLock;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.LockID;
+import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.accumulo.trace.thrift.TInfo;
+import org.apache.commons.collections.map.LRUMap;
+import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TApplicationException;
+import org.apache.thrift.TException;
+import org.apache.thrift.TServiceClient;
+import org.apache.thrift.transport.TTransportException;
+
+
+class ConditionalWriterImpl implements ConditionalWriter {
+  
+  private static ThreadPoolExecutor cleanupThreadPool = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+  
+  static {
+    cleanupThreadPool.allowCoreThreadTimeOut(true);
+  }
+
+  private static final Logger log = Logger.getLogger(ConditionalWriterImpl.class);
+
+  private static final int MAX_SLEEP = 30000;
+
+  private Authorizations auths;
+  private VisibilityEvaluator ve;
+  @SuppressWarnings("unchecked")
+  private Map<Text,Boolean> cache = Collections.synchronizedMap(new LRUMap(1000));
+  private Instance instance;
+  private TCredentials credentials;
+  private TabletLocator locator;
+  private String tableId;
+  private long timeout;
+
+  private static class ServerQueue {
+    BlockingQueue<TabletServerMutations<QCMutation>> queue = new LinkedBlockingQueue<TabletServerMutations<QCMutation>>();
+    boolean taskQueued = false;
+  }
+  
+  private Map<String,ServerQueue> serverQueues;
+  private DelayQueue<QCMutation> failedMutations = new DelayQueue<QCMutation>();
+  private ScheduledThreadPoolExecutor threadPool;
+  
+  private class RQIterator implements Iterator<Result> {
+    
+    private BlockingQueue<Result> rq;
+    private int count;
+    
+    public RQIterator(BlockingQueue<Result> resultQueue, int count) {
+      this.rq = resultQueue;
+      this.count = count;
+    }
+    
+    @Override
+    public boolean hasNext() {
+      return count > 0;
+    }
+    
+    @Override
+    public Result next() {
+      if (count <= 0)
+        throw new NoSuchElementException();
+
+      try {
+        Result result = rq.poll(1, TimeUnit.SECONDS);
+        while (result == null) {
+          
+          if (threadPool.isShutdown()) {
+            throw new NoSuchElementException("ConditionalWriter closed");
+          }
+          
+          result = rq.poll(1, TimeUnit.SECONDS);
+        }
+        count--;
+        return result;
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+    
+  }
+
+  private static class QCMutation extends ConditionalMutation implements Delayed {
+    private BlockingQueue<Result> resultQueue;
+    private long resetTime;
+    private long delay = 50;
+    private long entryTime;
+    
+    QCMutation(ConditionalMutation cm, BlockingQueue<Result> resultQueue, long entryTime) {
+      super(cm);
+      this.resultQueue = resultQueue;
+      this.entryTime = entryTime;
+    }
+
+    @Override
+    public int compareTo(Delayed o) {
+      QCMutation oqcm = (QCMutation) o;
+      return Long.valueOf(resetTime).compareTo(Long.valueOf(oqcm.resetTime));
+    }
+    
+    @Override
+    public long getDelay(TimeUnit unit) {
+      return unit.convert(delay - (System.currentTimeMillis() - resetTime), TimeUnit.MILLISECONDS);
+    }
+    
+    void resetDelay() {
+      delay = Math.min(delay * 2, MAX_SLEEP);
+      resetTime = System.currentTimeMillis();
+    }
+    
+    void queueResult(Result result) {
+      resultQueue.add(result);
+    }
+  }
+  
+  private ServerQueue getServerQueue(String location) {
+    ServerQueue serverQueue;
+    synchronized (serverQueues) {
+       serverQueue = serverQueues.get(location);
+      if (serverQueue == null) {
+        
+        serverQueue = new ServerQueue();
+        serverQueues.put(location, serverQueue);
+      }
+    }
+    return serverQueue;
+  }
+  
+  private class CleanupTask implements Runnable {
+    private List<SessionID> sessions;
+    
+    CleanupTask(List<SessionID> activeSessions) {
+      this.sessions = activeSessions;
+    }
+    
+    @Override
+    public void run() {
+      TabletClientService.Iface client = null;
+      
+      for (SessionID sid : sessions) {
+        if (!sid.isActive())
+          continue;
+        
+        TInfo tinfo = Tracer.traceInfo();
+        try {
+          client = getClient(sid.location);
+          client.closeConditionalUpdate(tinfo, sid.sessionID);
+        } catch (Exception e) {
+        } finally {
+          ThriftUtil.returnClient((TServiceClient) client);
+        }
+
+      }
+    }
+  }
+
+  private void queueRetry(List<QCMutation> mutations, String server) {
+    
+    if (timeout < Long.MAX_VALUE) {
+      
+      long time = System.currentTimeMillis();
+      
+      ArrayList<QCMutation> mutations2 = new ArrayList<ConditionalWriterImpl.QCMutation>(mutations.size());
+
+      for (QCMutation qcm : mutations) {
+        qcm.resetDelay();
+        if (time + qcm.getDelay(TimeUnit.MILLISECONDS) > qcm.entryTime + timeout) {
+          TimedOutException toe;
+          if (server != null)
+            toe = new TimedOutException(Collections.singleton(server));
+          else
+            toe = new TimedOutException("Conditional mutation timed out");
+          
+          qcm.queueResult(new Result(toe, qcm, server));
+        } else {
+          mutations2.add(qcm);
+        }
+      }
+      
+      if (mutations2.size() > 0)
+        failedMutations.addAll(mutations2);
+
+    } else {
+      for (QCMutation qcm : mutations)
+        qcm.resetDelay();
+      failedMutations.addAll(mutations);
+    }
+  }
+
+  private void queue(List<QCMutation> mutations) {
+    List<QCMutation> failures = new ArrayList<QCMutation>();
+    Map<String,TabletServerMutations<QCMutation>> binnedMutations = new HashMap<String,TabletLocator.TabletServerMutations<QCMutation>>();
+    
+    try {
+      locator.binMutations(mutations, binnedMutations, failures, credentials);
+      
+      if (failures.size() == mutations.size())
+        if (!Tables.exists(instance, tableId))
+          throw new TableDeletedException(tableId);
+        else if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+          throw new TableOfflineException(instance, tableId);
+
+    } catch (Exception e) {
+      for (QCMutation qcm : mutations)
+        qcm.queueResult(new Result(e, qcm, null));
+      
+      // do not want to queue anything that was put in before binMutations() failed
+      failures.clear();
+      binnedMutations.clear();
+    }
+    
+    if (failures.size() > 0)
+      queueRetry(failures, null);
+
+    for (Entry<String,TabletServerMutations<QCMutation>> entry : binnedMutations.entrySet()) {
+      queue(entry.getKey(), entry.getValue());
+    }
+
+
+  }
+
+  private void queue(String location, TabletServerMutations<QCMutation> mutations) {
+    
+    ServerQueue serverQueue = getServerQueue(location);
+    
+    synchronized (serverQueue) {
+      serverQueue.queue.add(mutations);
+      // never execute more than one task per server
+      if(!serverQueue.taskQueued){
+        threadPool.execute(new LoggingRunnable(log, new SendTask(location)));
+        serverQueue.taskQueued = true;
+      }
+    }
+   
+  }
+
+  private void reschedule(SendTask task){
+    ServerQueue serverQueue = getServerQueue(task.location);
+    // just finished processing work for this server, could reschedule if it has more work or immediately process the work
+    // this code reschedules the the server for processing later... there may be other queues with
+    // more data that need to be processed... also it will give the current server time to build
+    // up more data... the thinking is that rescheduling instead or processing immediately will result
+    // in bigger batches and less RPC overhead
+    
+    synchronized (serverQueue) {
+      if(serverQueue.queue.size() > 0)
+        threadPool.execute(new LoggingRunnable(log, task));
+      else
+        serverQueue.taskQueued = false;
+    }
+    
+  }
+  
+  private TabletServerMutations<QCMutation> dequeue(String location) {
+    BlockingQueue<TabletServerMutations<QCMutation>> queue = getServerQueue(location).queue;
+    
+    ArrayList<TabletServerMutations<QCMutation>> mutations = new ArrayList<TabletLocator.TabletServerMutations<QCMutation>>();
+    queue.drainTo(mutations);
+    
+    if (mutations.size() == 0)
+      return null;
+    
+    if (mutations.size() == 1) {
+      return mutations.get(0);
+    } else {
+      // merge multiple request to a single tablet server
+      TabletServerMutations<QCMutation> tsm = mutations.get(0);
+      
+      for (int i = 1; i < mutations.size(); i++) {
+        for (Entry<KeyExtent,List<QCMutation>> entry : mutations.get(i).getMutations().entrySet()) {
+          List<QCMutation> list = tsm.getMutations().get(entry.getKey());
+          if (list == null) {
+            list = new ArrayList<QCMutation>();
+            tsm.getMutations().put(entry.getKey(), list);
+          }
+          
+          list.addAll(entry.getValue());
+        }
+      }
+      
+      return tsm;
+    }
+  }
+
+  ConditionalWriterImpl(Instance instance, TCredentials credentials, String tableId, ConditionalWriterConfig config) {
+    this.instance = instance;
+    this.credentials = credentials;
+    this.auths = config.getAuthorizations();
+    this.ve = new VisibilityEvaluator(config.getAuthorizations());
+    this.threadPool = new ScheduledThreadPoolExecutor(config.getMaxWriteThreads());
+    this.threadPool.setMaximumPoolSize(config.getMaxWriteThreads());
+    this.locator = TabletLocator.getLocator(instance, new Text(tableId));
+    this.serverQueues = new HashMap<String,ServerQueue>();
+    this.tableId = tableId;
+    this.timeout = config.getTimeout(TimeUnit.MILLISECONDS);
+
+    Runnable failureHandler = new Runnable() {
+      
+      @Override
+      public void run() {
+          List<QCMutation> mutations = new ArrayList<QCMutation>();
+          failedMutations.drainTo(mutations);
+          queue(mutations);
+      }
+    };
+    
+    failureHandler = new LoggingRunnable(log, failureHandler);
+    
+    threadPool.scheduleAtFixedRate(failureHandler, 250, 250, TimeUnit.MILLISECONDS);
+  }
+
+  public Iterator<Result> write(Iterator<ConditionalMutation> mutations) {
+
+    BlockingQueue<Result> resultQueue = new LinkedBlockingQueue<Result>();
+
+    List<QCMutation> mutationList = new ArrayList<QCMutation>();
+
+    int count = 0;
+
+    long entryTime = System.currentTimeMillis();
+
+    mloop: while (mutations.hasNext()) {
+      ConditionalMutation mut = mutations.next();
+      count++;
+
+      for (Condition cond : mut.getConditions()) {
+        if (!isVisible(cond.getVisibility())) {
+          resultQueue.add(new Result(Status.INVISIBLE_VISIBILITY, mut, null));
+          continue mloop;
+        }
+      }
+
+      // copy the mutations so that even if caller changes it, it will not matter
+      mutationList.add(new QCMutation(mut, resultQueue, entryTime));
+    }
+
+    queue(mutationList);
+
+    return new RQIterator(resultQueue, count);
+
+  }
+
+  private class SendTask implements Runnable {
+    
+
+    String location;
+    
+    public SendTask(String location) {
+      this.location = location;
+
+    }
+    
+    @Override
+    public void run() {
+      try {
+        TabletServerMutations<QCMutation> mutations = dequeue(location);
+        if (mutations != null)
+          sendToServer(location, mutations);
+      } finally {
+        reschedule(this);
+      }
+    }
+  }
+  
+  private static class CMK {
+
+    QCMutation cm;
+    KeyExtent ke;
+    
+    public CMK(KeyExtent ke, QCMutation cm) {
+      this.ke = ke;
+      this.cm = cm;
+    }
+  }
+
+  private static class SessionID {
+    String location;
+    String lockId;
+    long sessionID;
+    boolean reserved;
+    long lastAccessTime;
+    long ttl;
+    
+    boolean isActive() {
+      return System.currentTimeMillis() - lastAccessTime < ttl * .95;
+    }
+  }
+  
+  private HashMap<String, SessionID> cachedSessionIDs = new HashMap<String, SessionID>();
+  
+  private SessionID reserveSessionID(String location, TabletClientService.Iface client, TInfo tinfo) throws ThriftSecurityException, TException {
+    //avoid cost of repeatedly making RPC to create sessions, reuse sessions
+    synchronized (cachedSessionIDs) {
+      SessionID sid = cachedSessionIDs.get(location);
+      if (sid != null) {
+        if (sid.reserved)
+          throw new IllegalStateException();
+        
+        if (!sid.isActive()) {
+          cachedSessionIDs.remove(location);
+        } else {
+          sid.reserved = true;
+          return sid;
+        }
+      }
+    }
+    
+    TConditionalSession tcs = client.startConditionalUpdate(tinfo, credentials, ByteBufferUtil.toByteBuffers(auths.getAuthorizations()), tableId);
+    
+    synchronized (cachedSessionIDs) {
+      SessionID sid = new SessionID();
+      sid.reserved = true;
+      sid.sessionID = tcs.sessionId;
+      sid.lockId = tcs.tserverLock;
+      sid.ttl = tcs.ttl;
+      sid.location = location;
+      if (cachedSessionIDs.put(location, sid) != null)
+        throw new IllegalStateException();
+
+      return sid;
+    }
+    
+  }
+  
+  private void invalidateSessionID(String location) {
+    synchronized (cachedSessionIDs) {
+      cachedSessionIDs.remove(location);
+    }
+    
+  }
+  
+  private void unreserveSessionID(String location){
+    synchronized (cachedSessionIDs) {
+      SessionID sid = cachedSessionIDs.get(location);
+      if (sid != null) {
+        if (!sid.reserved)
+          throw new IllegalStateException();
+        sid.reserved = false;
+        sid.lastAccessTime = System.currentTimeMillis();
+      }
+    }
+  }
+  
+  List<SessionID> getActiveSessions() {
+    ArrayList<SessionID> activeSessions = new ArrayList<SessionID>();
+    for (SessionID sid : cachedSessionIDs.values())
+      if (sid.isActive())
+        activeSessions.add(sid);
+    return activeSessions;
+  }
+
+  private TabletClientService.Iface getClient(String location) throws TTransportException {
+    TabletClientService.Iface client;
+    if (timeout < instance.getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT))
+      client = ThriftUtil.getTServerClient(location, timeout);
+    else
+      client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
+    return client;
+  }
+
+  private void sendToServer(String location, TabletServerMutations<QCMutation> mutations) {
+    TabletClientService.Iface client = null;
+    
+    TInfo tinfo = Tracer.traceInfo();
+
+    Map<Long,CMK> cmidToCm = new HashMap<Long,CMK>();
+    MutableLong cmid = new MutableLong(0);
+
+    SessionID sessionId = null;
+    
+    try {
+      
+      client = getClient(location);
+
+      Map<TKeyExtent,List<TConditionalMutation>> tmutations = new HashMap<TKeyExtent,List<TConditionalMutation>>();
+
+      CompressedIterators compressedIters = new CompressedIterators();
+      convertMutations(mutations, cmidToCm, cmid, tmutations, compressedIters);
+      
+      List<TCMResult> tresults = null;
+      while (tresults == null) {
+        try {
+          sessionId = reserveSessionID(location, client, tinfo);
+          tresults = client.conditionalUpdate(tinfo, sessionId.sessionID, tmutations, compressedIters.getSymbolTable());
+        } catch (NoSuchScanIDException nssie) {
+          sessionId = null;
+          invalidateSessionID(location);
+        }
+      }
+      
+      HashSet<KeyExtent> extentsToInvalidate = new HashSet<KeyExtent>();
+
+      ArrayList<QCMutation> ignored = new ArrayList<QCMutation>();
+
+      for (TCMResult tcmResult : tresults) {
+        if (tcmResult.status == TCMStatus.IGNORED) {
+          CMK cmk = cmidToCm.get(tcmResult.cmid);
+          ignored.add(cmk.cm);
+          extentsToInvalidate.add(cmk.ke);
+        } else {
+          QCMutation qcm = cmidToCm.get(tcmResult.cmid).cm;
+          qcm.queueResult(new Result(fromThrift(tcmResult.status), qcm, location));
+        }
+      }
+
+      for (KeyExtent ke : extentsToInvalidate) {
+        locator.invalidateCache(ke);
+      }
+
+      queueRetry(ignored, location);
+
+    } catch (ThriftSecurityException tse) {
+      AccumuloSecurityException ase = new AccumuloSecurityException(credentials.getPrincipal(), tse.getCode(), Tables.getPrintableTableInfoFromId(instance,
+          tableId), tse);
+      queueException(location, cmidToCm, ase);
+    } catch (TTransportException e) {
+      locator.invalidateCache(location);
+      invalidateSession(location, mutations, cmidToCm, sessionId);
+    } catch (TApplicationException tae) {
+      queueException(location, cmidToCm, new AccumuloServerException(location, tae));
+    } catch (TException e) {
+      locator.invalidateCache(location);
+      invalidateSession(location, mutations, cmidToCm, sessionId);
+    } catch (Exception e) {
+      queueException(location, cmidToCm, e);
+    } finally {
+      unreserveSessionID(location);
+      ThriftUtil.returnClient((TServiceClient) client);
+    }
+  }
+
+
+  private void queueRetry(Map<Long,CMK> cmidToCm, String location) {
+    ArrayList<QCMutation> ignored = new ArrayList<QCMutation>();
+    for (CMK cmk : cmidToCm.values())
+    	ignored.add(cmk.cm);
+    queueRetry(ignored, location);
+  }
+
+  private void queueException(String location, Map<Long,CMK> cmidToCm, Exception e) {
+    for (CMK cmk : cmidToCm.values())
+      cmk.cm.queueResult(new Result(e, cmk.cm, location));
+  }
+
+  private void invalidateSession(String location, TabletServerMutations<QCMutation> mutations, Map<Long,CMK> cmidToCm, SessionID sessionId) {
+    if(sessionId == null){
+      queueRetry(cmidToCm, location);
+    }else{
+      try {
+        invalidateSession(sessionId, location, mutations);
+        for (CMK cmk : cmidToCm.values())
+          cmk.cm.queueResult(new Result(Status.UNKNOWN, cmk.cm, location));
+      }catch(Exception e2){
+        queueException(location, cmidToCm, e2);
+      }
+    }
+  }
+  
+  /*
+   * The purpose of this code is to ensure that a conditional mutation will not execute when its status is unknown. This allows a user to read the row when the
+   * status is unknown and not have to worry about the tserver applying the mutation after the scan.
+   * 
+   * If a conditional mutation is taking a long time to process, then this method will wait for it to finish... unless this exceeds timeout.
+   */
+  private void invalidateSession(SessionID sessionId, String location, TabletServerMutations<QCMutation> mutations) throws AccumuloException,
+      AccumuloSecurityException, TableNotFoundException {
+    
+    ArrayList<QCMutation> mutList = new ArrayList<QCMutation>();
+    
+    for (List<QCMutation> tml : mutations.getMutations().values()) {
+      mutList.addAll(tml);
+    }
+    
+    long sleepTime = 50;
+
+    long startTime = System.currentTimeMillis();
+
+    LockID lid = new LockID(ZooUtil.getRoot(instance) + Constants.ZTSERVERS, sessionId.lockId);
+
+    while (true) {
+      if (!ZooLock.isLockHeld(ServerClient.getZooCache(instance), lid)) {
+        // TODO if ACCUMULO-1152 adds a tserver lock check to the tablet location cache, then this invalidation would prevent future attempts to contact the
+        // tserver even its gone zombie and is still running w/o a lock
+        locator.invalidateCache(location);
+        return;
+      }
+      
+      try {
+        // if the mutation is currently processing, this method will block until its done or times out
+        invalidateSession(sessionId.sessionID, location);
+
+        return;
+      } catch (TApplicationException tae) {
+        throw new AccumuloServerException(location, tae);
+      } catch (TException e) {
+        locator.invalidateCache(location);
+      }
+      
+      if ((System.currentTimeMillis() - startTime) + sleepTime > timeout)
+        throw new TimedOutException(Collections.singleton(location));
+
+      UtilWaitThread.sleep(sleepTime);
+      sleepTime = Math.min(2 * sleepTime, MAX_SLEEP);
+
+    }
+	
+  }
+  
+  private void invalidateSession(long sessionId, String location) throws TException {
+    TabletClientService.Iface client = null;
+    
+    TInfo tinfo = Tracer.traceInfo();
+    
+    try {
+      client = getClient(location);
+      client.invalidateConditionalUpdate(tinfo, sessionId);
+    } finally {
+      ThriftUtil.returnClient((TServiceClient) client);
+    }
+  }
+
+  private Status fromThrift(TCMStatus status) {
+    switch (status) {
+      case ACCEPTED:
+        return Status.ACCEPTED;
+      case REJECTED:
+        return Status.REJECTED;
+      case VIOLATED:
+        return Status.VIOLATED;
+      default:
+        throw new IllegalArgumentException(status.toString());
+    }
+  }
+
+  private void convertMutations(TabletServerMutations<QCMutation> mutations, Map<Long,CMK> cmidToCm, MutableLong cmid,
+      Map<TKeyExtent,List<TConditionalMutation>> tmutations, CompressedIterators compressedIters) {
+
+    for (Entry<KeyExtent,List<QCMutation>> entry : mutations.getMutations().entrySet()) {
+      TKeyExtent tke = entry.getKey().toThrift();
+      ArrayList<TConditionalMutation> tcondMutaions = new ArrayList<TConditionalMutation>();
+      
+      List<QCMutation> condMutations = entry.getValue();
+      
+      for (QCMutation cm : condMutations) {
+        TMutation tm = cm.toThrift();
+
+        List<TCondition> conditions = convertConditions(cm, compressedIters);
+
+        cmidToCm.put(cmid.longValue(), new CMK(entry.getKey(), cm));
+        TConditionalMutation tcm = new TConditionalMutation(conditions, tm, cmid.longValue());
+        cmid.increment();
+        tcondMutaions.add(tcm);
+      }
+      
+      tmutations.put(tke, tcondMutaions);
+    }
+  }
+
+  private List<TCondition> convertConditions(ConditionalMutation cm, CompressedIterators compressedIters) {
+    List<TCondition> conditions = new ArrayList<TCondition>(cm.getConditions().size());
+    
+    for (Condition cond : cm.getConditions()) {
+      long ts = 0;
+      boolean hasTs = false;
+      
+      if (cond.getTimestamp() != null) {
+        ts = cond.getTimestamp();
+        hasTs = true;
+      }
+      
+      ByteBuffer iters = compressedIters.compress(cond.getIterators());
+      
+      TCondition tc = new TCondition(ByteBufferUtil.toByteBuffers(cond.getFamily()), ByteBufferUtil.toByteBuffers(cond.getQualifier()),
+          ByteBufferUtil.toByteBuffers(cond.getVisibility()), ts, hasTs, ByteBufferUtil.toByteBuffers(cond.getValue()), iters);
+      
+      conditions.add(tc);
+    }
+    
+    return conditions;
+  }
+
+  private boolean isVisible(ByteSequence cv) {
+    Text testVis = new Text(cv.toArray());
+    if (testVis.getLength() == 0)
+      return true;
+    
+    Boolean b = cache.get(testVis);
+    if (b != null)
+      return b;
+    
+    try {
+      Boolean bb = ve.evaluate(new ColumnVisibility(testVis));
+      cache.put(new Text(testVis), bb);
+      return bb;
+    } catch (VisibilityParseException e) {
+      return false;
+    } catch (BadArgumentException e) {
+      return false;
+    }
+  }
+
+  public Result write(ConditionalMutation mutation) {
+    return write(Collections.singleton(mutation).iterator()).next();
+  }
+  
+  @Override
+  public void close() {
+    threadPool.shutdownNow();
+    cleanupThreadPool.execute(new CleanupTask(getActiveSessions()));
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
index 3858cdc..57e36fd 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
@@ -24,6 +24,8 @@ import org.apache.accumulo.core.client.BatchDeleter;
 import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.MultiTableBatchWriter;
@@ -130,6 +132,11 @@ public class ConnectorImpl extends Connector {
   }
   
   @Override
+  public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) throws TableNotFoundException {
+    return new ConditionalWriterImpl(instance, credentials, getTableId(tableName), config);
+  }
+  
+  @Override
   public Scanner createScanner(String tableName, Authorizations authorizations) throws TableNotFoundException {
     ArgumentChecker.notNull(tableName, authorizations);
     return new ScannerImpl(instance, credentials, getTableId(tableName), authorizations);
@@ -162,5 +169,4 @@ public class ConnectorImpl extends Connector {
     
     return instanceops;
   }
-  
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/java/org/apache/accumulo/core/client/impl/RootTabletLocator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/RootTabletLocator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/RootTabletLocator.java
index 18b2a27..88e5c3a 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/RootTabletLocator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/RootTabletLocator.java
@@ -42,12 +42,12 @@ public class RootTabletLocator extends TabletLocator {
   }
   
   @Override
-  public void binMutations(List<Mutation> mutations, Map<String,TabletServerMutations> binnedMutations, List<Mutation> failures, TCredentials credentials)
-      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+  public <T extends Mutation> void binMutations(List<T> mutations, Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures,
+      TCredentials credentials) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
     String rootTabletLocation = instance.getRootTabletLocation();
     if (rootTabletLocation != null) {
-      TabletServerMutations tsm = new TabletServerMutations();
-      for (Mutation mutation : mutations) {
+      TabletServerMutations<T> tsm = new TabletServerMutations<T>();
+      for (T mutation : mutations) {
         tsm.addMutation(RootTable.EXTENT, mutation);
       }
       binnedMutations.put(rootTabletLocation, tsm);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
index b933c2b..218bd36 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
@@ -25,8 +25,8 @@ import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.impl.thrift.ClientService;
-import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.client.impl.thrift.ClientService.Client;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.util.ArgumentChecker;
@@ -45,7 +45,7 @@ public class ServerClient {
   private static final Logger log = Logger.getLogger(ServerClient.class);
   private static final Map<String,ZooCache> zooCaches = new HashMap<String,ZooCache>();
   
-  private synchronized static ZooCache getZooCache(Instance instance) {
+  synchronized static ZooCache getZooCache(Instance instance) {
     ZooCache result = zooCaches.get(instance.getZooKeepers());
     if (result == null) {
       result = new ZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), null);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java
index de8e053..f9110b2 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java
@@ -43,7 +43,7 @@ public abstract class TabletLocator {
   public abstract TabletLocation locateTablet(Text row, boolean skipRow, boolean retry, TCredentials credentials) throws AccumuloException,
       AccumuloSecurityException, TableNotFoundException;
   
-  public abstract void binMutations(List<Mutation> mutations, Map<String,TabletServerMutations> binnedMutations, List<Mutation> failures,
+  public abstract <T extends Mutation> void binMutations(List<T> mutations, Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures,
       TCredentials credentials) throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
   
   public abstract List<Range> binRanges(List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges, TCredentials credentials)
@@ -187,24 +187,24 @@ public abstract class TabletLocator {
     }
   }
   
-  public static class TabletServerMutations {
-    private Map<KeyExtent,List<Mutation>> mutations;
+  public static class TabletServerMutations<T extends Mutation> {
+    private Map<KeyExtent,List<T>> mutations;
     
     public TabletServerMutations() {
-      mutations = new HashMap<KeyExtent,List<Mutation>>();
+      mutations = new HashMap<KeyExtent,List<T>>();
     }
     
-    public void addMutation(KeyExtent ke, Mutation m) {
-      List<Mutation> mutList = mutations.get(ke);
+    public void addMutation(KeyExtent ke, T m) {
+      List<T> mutList = mutations.get(ke);
       if (mutList == null) {
-        mutList = new ArrayList<Mutation>();
+        mutList = new ArrayList<T>();
         mutations.put(ke, mutList);
       }
       
       mutList.add(m);
     }
     
-    public Map<KeyExtent,List<Mutation>> getMutations() {
+    public Map<KeyExtent,List<T>> getMutations() {
       return mutations;
     }
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java
index df5d66b..4b2e1d3 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java
@@ -113,14 +113,14 @@ public class TabletLocatorImpl extends TabletLocator {
   }
   
   @Override
-  public void binMutations(List<Mutation> mutations, Map<String,TabletServerMutations> binnedMutations, List<Mutation> failures, TCredentials credentials)
-      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+  public <T extends Mutation> void binMutations(List<T> mutations, Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures,
+      TCredentials credentials) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
     
     OpTimer opTimer = null;
     if (log.isTraceEnabled())
       opTimer = new OpTimer(log, Level.TRACE).start("Binning " + mutations.size() + " mutations for table " + tableId);
     
-    ArrayList<Mutation> notInCache = new ArrayList<Mutation>();
+    ArrayList<T> notInCache = new ArrayList<T>();
     Text row = new Text();
     
     rLock.lock();
@@ -133,7 +133,7 @@ public class TabletLocatorImpl extends TabletLocator {
       // For this to be efficient, need to avoid fine grained synchronization and fine grained logging.
       // Therefore methods called by this are not synchronized and should not log.
       
-      for (Mutation mutation : mutations) {
+      for (T mutation : mutations) {
         row.set(mutation.getRow());
         TabletLocation tl = locateTabletInCache(row);
         if (tl == null)
@@ -157,7 +157,7 @@ public class TabletLocatorImpl extends TabletLocator {
       wLock.lock();
       try {
         boolean failed = false;
-        for (Mutation mutation : notInCache) {
+        for (T mutation : notInCache) {
           if (failed) {
             // when one table does not return a location, something is probably
             // screwy, go ahead and fail everything.
@@ -185,11 +185,11 @@ public class TabletLocatorImpl extends TabletLocator {
       opTimer.stop("Binned " + mutations.size() + " mutations for table " + tableId + " to " + binnedMutations.size() + " tservers in %DURATION%");
   }
   
-  private void addMutation(Map<String,TabletServerMutations> binnedMutations, Mutation mutation, TabletLocation tl) {
-    TabletServerMutations tsm = binnedMutations.get(tl.tablet_location);
+  private <T extends Mutation> void addMutation(Map<String,TabletServerMutations<T>> binnedMutations, T mutation, TabletLocation tl) {
+    TabletServerMutations<T> tsm = binnedMutations.get(tl.tablet_location);
     
     if (tsm == null) {
-      tsm = new TabletServerMutations();
+      tsm = new TabletServerMutations<T>();
       binnedMutations.put(tl.tablet_location, tsm);
     }
     

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
index 766cea9..12f5243 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
@@ -415,13 +415,13 @@ public class TabletServerBatchWriter {
     totalSendTime.addAndGet(time);
   }
   
-  public void updateBinningStats(int count, long time, Map<String,TabletServerMutations> binnedMutations) {
+  public void updateBinningStats(int count, long time, Map<String,TabletServerMutations<Mutation>> binnedMutations) {
     totalBinTime.addAndGet(time);
     totalBinned.addAndGet(count);
     updateBatchStats(binnedMutations);
   }
   
-  private synchronized void updateBatchStats(Map<String,TabletServerMutations> binnedMutations) {
+  private synchronized void updateBatchStats(Map<String,TabletServerMutations<Mutation>> binnedMutations) {
     tabletServersBatchSum += binnedMutations.size();
     
     minTabletServersBatch = Math.min(minTabletServersBatch, binnedMutations.size());
@@ -429,8 +429,8 @@ public class TabletServerBatchWriter {
     
     int numTablets = 0;
     
-    for (Entry<String,TabletServerMutations> entry : binnedMutations.entrySet()) {
-      TabletServerMutations tsm = entry.getValue();
+    for (Entry<String,TabletServerMutations<Mutation>> entry : binnedMutations.entrySet()) {
+      TabletServerMutations<Mutation> tsm = entry.getValue();
       numTablets += tsm.getMutations().size();
     }
     
@@ -577,7 +577,7 @@ public class TabletServerBatchWriter {
       init().addAll(failures);
     }
     
-    synchronized void add(String location, TabletServerMutations tsm) {
+    synchronized void add(String location, TabletServerMutations<Mutation> tsm) {
       init();
       for (Entry<KeyExtent,List<Mutation>> entry : tsm.getMutations().entrySet()) {
         recentFailures.addAll(entry.getKey().getTableId().toString(), entry.getValue());
@@ -617,12 +617,12 @@ public class TabletServerBatchWriter {
     
     private static final int MUTATION_BATCH_SIZE = 1 << 17;
     private ExecutorService sendThreadPool;
-    private Map<String,TabletServerMutations> serversMutations;
+    private Map<String,TabletServerMutations<Mutation>> serversMutations;
     private Set<String> queued;
     private Map<String,TabletLocator> locators;
     
     public MutationWriter(int numSendThreads) {
-      serversMutations = new HashMap<String,TabletServerMutations>();
+      serversMutations = new HashMap<String,TabletServerMutations<Mutation>>();
       queued = new HashSet<String>();
       sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName());
       locators = new HashMap<String,TabletLocator>();
@@ -639,7 +639,7 @@ public class TabletServerBatchWriter {
       return ret;
     }
     
-    private void binMutations(MutationSet mutationsToProcess, Map<String,TabletServerMutations> binnedMutations) {
+    private void binMutations(MutationSet mutationsToProcess, Map<String,TabletServerMutations<Mutation>> binnedMutations) {
       try {
         Set<Entry<String,List<Mutation>>> es = mutationsToProcess.getMutations().entrySet();
         for (Entry<String,List<Mutation>> entry : es) {
@@ -687,7 +687,7 @@ public class TabletServerBatchWriter {
     }
     
     void addMutations(MutationSet mutationsToSend) {
-      Map<String,TabletServerMutations> binnedMutations = new HashMap<String,TabletServerMutations>();
+      Map<String,TabletServerMutations<Mutation>> binnedMutations = new HashMap<String,TabletServerMutations<Mutation>>();
       Span span = Trace.start("binMutations");
       try {
         long t1 = System.currentTimeMillis();
@@ -700,15 +700,15 @@ public class TabletServerBatchWriter {
       addMutations(binnedMutations);
     }
     
-    private synchronized void addMutations(Map<String,TabletServerMutations> binnedMutations) {
+    private synchronized void addMutations(Map<String,TabletServerMutations<Mutation>> binnedMutations) {
       
       int count = 0;
       
       // merge mutations into existing mutations for a tablet server
-      for (Entry<String,TabletServerMutations> entry : binnedMutations.entrySet()) {
+      for (Entry<String,TabletServerMutations<Mutation>> entry : binnedMutations.entrySet()) {
         String server = entry.getKey();
         
-        TabletServerMutations currentMutations = serversMutations.get(server);
+        TabletServerMutations<Mutation> currentMutations = serversMutations.get(server);
         
         if (currentMutations == null) {
           serversMutations.put(server, entry.getValue());
@@ -740,8 +740,8 @@ public class TabletServerBatchWriter {
         }
     }
     
-    private synchronized TabletServerMutations getMutationsToSend(String server) {
-      TabletServerMutations tsmuts = serversMutations.remove(server);
+    private synchronized TabletServerMutations<Mutation> getMutationsToSend(String server) {
+      TabletServerMutations<Mutation> tsmuts = serversMutations.remove(server);
       if (tsmuts == null)
         queued.remove(server);
       
@@ -759,7 +759,7 @@ public class TabletServerBatchWriter {
       @Override
       public void run() {
         try {
-          TabletServerMutations tsmuts = getMutationsToSend(location);
+          TabletServerMutations<Mutation> tsmuts = getMutationsToSend(location);
           
           while (tsmuts != null) {
             send(tsmuts);
@@ -772,7 +772,7 @@ public class TabletServerBatchWriter {
         }
       }
       
-      public void send(TabletServerMutations tsm) throws AccumuloServerException, AccumuloSecurityException {
+      public void send(TabletServerMutations<Mutation> tsm) throws AccumuloServerException, AccumuloSecurityException {
         
         MutationSet failures = null;
         
@@ -846,6 +846,8 @@ public class TabletServerBatchWriter {
         return new MutationSet();
       }
       TInfo tinfo = Tracer.traceInfo();
+      
+      // TODO remove this
       TTransport transport = null;
       
       timeoutTracker.startingWrite();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java
index 62518ec..e8cd678 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java
@@ -76,8 +76,8 @@ public class TimeoutTabletLocator extends TabletLocator {
   }
   
   @Override
-  public void binMutations(List<Mutation> mutations, Map<String,TabletServerMutations> binnedMutations, List<Mutation> failures, TCredentials credentials)
-      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+  public <T extends Mutation> void binMutations(List<T> mutations, Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures,
+      TCredentials credentials) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
     try {
       locator.binMutations(mutations, binnedMutations, failures, credentials);
       

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
index 1179559..4af2ea5 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
@@ -22,6 +22,8 @@ import org.apache.accumulo.core.client.BatchDeleter;
 import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.MultiTableBatchWriter;
@@ -128,4 +130,10 @@ public class MockConnector extends Connector {
     return new MockInstanceOperations(acu);
   }
   
+  @Override
+  public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) throws TableNotFoundException {
+    // TODO add implementation
+    throw new UnsupportedOperationException();
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/java/org/apache/accumulo/core/client/mock/MockTabletLocator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTabletLocator.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTabletLocator.java
index aca3ba4..8d91d5d 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTabletLocator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTabletLocator.java
@@ -40,10 +40,10 @@ public class MockTabletLocator extends TabletLocator {
   }
   
   @Override
-  public void binMutations(List<Mutation> mutations, Map<String,TabletServerMutations> binnedMutations, List<Mutation> failures, TCredentials credentials) throws AccumuloException,
-      AccumuloSecurityException, TableNotFoundException {
-    TabletServerMutations tsm = new TabletServerMutations();
-    for (Mutation m : mutations)
+  public <T extends Mutation> void binMutations(List<T> mutations, Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures,
+      TCredentials credentials) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    TabletServerMutations<T> tsm = new TabletServerMutations<T>();
+    for (T m : mutations)
       tsm.addMutation(new KeyExtent(), m);
     binnedMutations.put("", tsm);
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/java/org/apache/accumulo/core/data/ArrayByteSequence.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/data/ArrayByteSequence.java b/core/src/main/java/org/apache/accumulo/core/data/ArrayByteSequence.java
index d44a7a6..eaa61b9 100644
--- a/core/src/main/java/org/apache/accumulo/core/data/ArrayByteSequence.java
+++ b/core/src/main/java/org/apache/accumulo/core/data/ArrayByteSequence.java
@@ -17,6 +17,7 @@
 package org.apache.accumulo.core.data;
 
 import java.io.Serializable;
+import java.nio.ByteBuffer;
 
 public class ArrayByteSequence extends ByteSequence implements Serializable {
   
@@ -48,6 +49,18 @@ public class ArrayByteSequence extends ByteSequence implements Serializable {
     this(s.getBytes());
   }
   
+  public ArrayByteSequence(ByteBuffer buffer) {
+    if (buffer.hasArray()) {
+      this.data = buffer.array();
+      this.offset = buffer.arrayOffset();
+      this.length = buffer.limit();
+    } else {
+      this.data = new byte[buffer.remaining()];
+      this.offset = 0;
+      buffer.get(data);
+    }
+  }
+
   @Override
   public byte byteAt(int i) {
     

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/java/org/apache/accumulo/core/data/Condition.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/data/Condition.java b/core/src/main/java/org/apache/accumulo/core/data/Condition.java
new file mode 100644
index 0000000..97df7e0
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/data/Condition.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.data;
+
+import java.util.HashSet;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.ArgumentChecker;
+import org.apache.hadoop.io.Text;
+
+/**
+ * 
+ * @since 1.6.0
+ */
+public class Condition {
+  
+  private ByteSequence cf;
+  private ByteSequence cq;
+  private ByteSequence cv;
+  private ByteSequence val;
+  private Long ts;
+  private IteratorSetting iterators[] = new IteratorSetting[0];
+  private static final ByteSequence EMPTY = new ArrayByteSequence(new byte[0]);
+  
+
+  public Condition(CharSequence cf, CharSequence cq) {
+    ArgumentChecker.notNull(cf, cq);
+    this.cf = new ArrayByteSequence(cf.toString().getBytes(Constants.UTF8));
+    this.cq = new ArrayByteSequence(cq.toString().getBytes(Constants.UTF8));
+    this.cv = EMPTY;
+  }
+  
+  public Condition(byte[] cf, byte[] cq) {
+    ArgumentChecker.notNull(cf, cq);
+    this.cf = new ArrayByteSequence(cf);
+    this.cq = new ArrayByteSequence(cq);
+    this.cv = EMPTY;
+  }
+
+  public Condition(Text cf, Text cq) {
+    ArgumentChecker.notNull(cf, cq);
+    this.cf = new ArrayByteSequence(cf.getBytes(), 0, cf.getLength());
+    this.cq = new ArrayByteSequence(cq.getBytes(), 0, cq.getLength());
+    this.cv = EMPTY;
+  }
+
+  public Condition(ByteSequence cf, ByteSequence cq) {
+    ArgumentChecker.notNull(cf, cq);
+    this.cf = cf;
+    this.cq = cq;
+    this.cv = EMPTY;
+  }
+
+  public ByteSequence getFamily() {
+    return cf;
+  }
+  
+  public ByteSequence getQualifier() {
+    return cq;
+  }
+
+  public Condition setTimestamp(long ts) {
+    this.ts = ts;
+    return this;
+  }
+  
+  public Long getTimestamp() {
+    return ts;
+  }
+
+  public Condition setValue(CharSequence value) {
+    ArgumentChecker.notNull(value);
+    this.val = new ArrayByteSequence(value.toString().getBytes(Constants.UTF8));
+    return this;
+  }
+
+  public Condition setValue(byte[] value) {
+    ArgumentChecker.notNull(value);
+    this.val = new ArrayByteSequence(value);
+    return this;
+  }
+  
+  public Condition setValue(Text value) {
+    ArgumentChecker.notNull(value);
+    this.val = new ArrayByteSequence(value.getBytes(), 0, value.getLength());
+    return this;
+  }
+  
+  public Condition setValue(ByteSequence value) {
+    ArgumentChecker.notNull(value);
+    this.val = value;
+    return this;
+  }
+
+  public ByteSequence getValue() {
+    return val;
+  }
+
+  public Condition setVisibility(ColumnVisibility cv) {
+    ArgumentChecker.notNull(cv);
+    this.cv = new ArrayByteSequence(cv.getExpression());
+    return this;
+  }
+
+  public ByteSequence getVisibility() {
+    return cv;
+  }
+
+  public Condition setIterators(IteratorSetting... iterators) {
+    ArgumentChecker.notNull(iterators);
+    
+    if (iterators.length > 1) {
+      HashSet<String> names = new HashSet<String>();
+      HashSet<Integer> prios = new HashSet<Integer>();
+      
+      for (IteratorSetting iteratorSetting : iterators) {
+        if (!names.add(iteratorSetting.getName()))
+          throw new IllegalArgumentException("iterator name used more than once " + iteratorSetting.getName());
+        if (!prios.add(iteratorSetting.getPriority()))
+          throw new IllegalArgumentException("iterator priority used more than once " + iteratorSetting.getPriority());
+      }
+    }
+    
+    this.iterators = iterators;
+    return this;
+  }
+
+  public IteratorSetting[] getIterators() {
+    return iterators;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java b/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java
new file mode 100644
index 0000000..23bf7d0
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.data;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.accumulo.core.util.ArgumentChecker;
+import org.apache.hadoop.io.Text;
+
+/**
+ * @since 1.6.0
+ */
+public class ConditionalMutation extends Mutation {
+  
+  private List<Condition> conditions = new ArrayList<Condition>();
+
+  public ConditionalMutation(byte[] row, Condition condition, Condition... conditions) {
+    super(row);
+    init(condition, conditions);
+  }
+  
+  public ConditionalMutation(byte[] row, int start, int length, Condition condition, Condition... conditions) {
+    super(row, start, length);
+    init(condition, conditions);
+  }
+  
+  public ConditionalMutation(Text row, Condition condition, Condition... conditions) {
+    super(row);
+    init(condition, conditions);
+  }
+  
+  public ConditionalMutation(CharSequence row, Condition condition, Condition... conditions) {
+    super(row);
+    init(condition, conditions);
+  }
+  
+  public ConditionalMutation(ByteSequence row, Condition condition, Condition... conditions) {
+    // TODO add ByteSequence methods to mutations
+    super(row.toArray());
+    init(condition, conditions);
+  }
+  
+  public ConditionalMutation(ConditionalMutation cm) {
+    super(cm);
+    this.conditions = new ArrayList<Condition>(cm.conditions);
+  }
+
+  private void init(Condition condition, Condition... conditions) {
+    ArgumentChecker.notNull(condition);
+    this.conditions.add(condition);
+    if (conditions.length > 0) {
+      this.conditions.addAll(Arrays.asList(conditions));
+    }
+  }
+  
+  public void addCondition(Condition condition) {
+    ArgumentChecker.notNull(condition);
+    this.conditions.add(condition);
+  }
+  
+  public List<Condition> getConditions() {
+    return Collections.unmodifiableList(conditions);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dc24448/core/src/main/java/org/apache/accumulo/core/data/Mutation.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/data/Mutation.java b/core/src/main/java/org/apache/accumulo/core/data/Mutation.java
index 00cefbf..4ac3f0c 100644
--- a/core/src/main/java/org/apache/accumulo/core/data/Mutation.java
+++ b/core/src/main/java/org/apache/accumulo/core/data/Mutation.java
@@ -27,6 +27,7 @@ import java.util.List;
 import org.apache.accumulo.core.data.thrift.TMutation;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.util.ByteBufferUtil;
+import org.apache.accumulo.core.util.UnsynchronizedBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -65,121 +66,8 @@ public class Mutation implements Writable {
   private byte[] data;
   private int entries;
   private List<byte[]> values;
-
-  // created this little class instead of using ByteArrayOutput stream and DataOutputStream
-  // because both are synchronized... lots of small syncs slow things down
-  private static class ByteBuffer {
-    
-    int offset;
-    byte data[] = new byte[64];
-    
-    private void reserve(int l) {
-      if (offset + l > data.length) {
-        int newSize = data.length * 2;
-        while (newSize <= offset + l)
-          newSize = newSize * 2;
-        
-        byte[] newData = new byte[newSize];
-        System.arraycopy(data, 0, newData, 0, offset);
-        data = newData;
-      }
-      
-    }
-    
-    public void add(byte[] bytes, int off, int length) {
-      reserve(length);
-      System.arraycopy(bytes, off, data, offset, length);
-      offset += length;
-    }
-    
-    void add(boolean b) {
-      reserve(1);
-      if (b)
-        data[offset++] = 1;
-      else
-        data[offset++] = 0;
-    }
-    
-    public byte[] toArray() {
-      byte ret[] = new byte[offset];
-      System.arraycopy(data, 0, ret, 0, offset);
-      return ret;
-    }
-    
-    public void writeVLong(long i) {
-      reserve(9);
-      if (i >= -112 && i <= 127) {
-        data[offset++] = (byte)i;
-        return;
-      }
-        
-      int len = -112;
-      if (i < 0) {
-        i ^= -1L; // take one's complement'
-        len = -120;
-      }
-        
-      long tmp = i;
-      while (tmp != 0) {
-        tmp = tmp >> 8;
-        len--;
-      }
-        
-      data[offset++] = (byte)len;
-        
-      len = (len < -120) ? -(len + 120) : -(len + 112);
-        
-      for (int idx = len; idx != 0; idx--) {
-        int shiftbits = (idx - 1) * 8;
-        long mask = 0xFFL << shiftbits;
-        data[offset++] = (byte)((i & mask) >> shiftbits);
-      }
-    }
-  }
-  
-  private static class SimpleReader {
-    int offset;
-    byte data[];
-    
-    SimpleReader(byte b[]) {
-      this.data = b;
-    }
-
-    int readInt() {
-      return (data[offset++] << 24) + ((data[offset++] & 255) << 16) + ((data[offset++] & 255) << 8) + ((data[offset++] & 255) << 0);
-    }
-    
-    long readLong() {
-      return (((long) data[offset++] << 56) + ((long) (data[offset++] & 255) << 48) + ((long) (data[offset++] & 255) << 40)
-          + ((long) (data[offset++] & 255) << 32) + ((long) (data[offset++] & 255) << 24) + ((data[offset++] & 255) << 16) + ((data[offset++] & 255) << 8) + ((data[offset++] & 255) << 0));
-    }
-    
-    void readBytes(byte b[]) {
-      System.arraycopy(data, offset, b, 0, b.length);
-      offset += b.length;
-    }
-    
-    boolean readBoolean() {
-      return (data[offset++] == 1);
-    }
-    
-    long readVLong() {
-      byte firstByte = data[offset++];
-      int len =  WritableUtils.decodeVIntSize(firstByte);
-      if (len == 1) {
-        return firstByte;
-      }
-      long i = 0;
-      for (int idx = 0; idx < len-1; idx++) {
-        byte b = data[offset++];
-        i = i << 8;
-        i = i | (b & 0xFF);
-      }
-      return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
-    }
-  }
   
-  private ByteBuffer buffer;
+  private UnsynchronizedBuffer.Writer buffer;
   
   private List<ColumnUpdate> updates;
   
@@ -205,7 +93,7 @@ public class Mutation implements Writable {
   public Mutation(byte[] row, int start, int length) {
     this.row = new byte[length];
     System.arraycopy(row, start, this.row, 0, length);
-    buffer = new ByteBuffer();
+    buffer = new UnsynchronizedBuffer.Writer();
   }
   
   public Mutation(Text row) {
@@ -445,7 +333,7 @@ public class Mutation implements Writable {
     put(columnFamily, columnQualifier, columnVisibility.getExpression(), true, timestamp, true, EMPTY_BYTES);
   }
 
-  private byte[] oldReadBytes(SimpleReader in) {
+  private byte[] oldReadBytes(UnsynchronizedBuffer.Reader in) {
     int len = in.readInt();
     if (len == 0)
       return EMPTY_BYTES;
@@ -455,7 +343,7 @@ public class Mutation implements Writable {
     return bytes;
   }
   
-  private byte[] readBytes(SimpleReader in) {
+  private byte[] readBytes(UnsynchronizedBuffer.Reader in) {
     int len = (int)in.readVLong();
     if (len == 0)
       return EMPTY_BYTES;
@@ -468,7 +356,7 @@ public class Mutation implements Writable {
   public List<ColumnUpdate> getUpdates() {
     serialize();
     
-    SimpleReader in = new SimpleReader(data);
+    UnsynchronizedBuffer.Reader in = new UnsynchronizedBuffer.Reader(data);
     
     if (updates == null) {
       if (entries == 1) {
@@ -490,7 +378,7 @@ public class Mutation implements Writable {
     return new ColumnUpdate(cf, cq, cv, hasts, ts, deleted, val);
   }
 
-  private ColumnUpdate deserializeColumnUpdate(SimpleReader in) {
+  private ColumnUpdate deserializeColumnUpdate(UnsynchronizedBuffer.Reader in) {
     byte[] cf = readBytes(in);
     byte[] cq = readBytes(in);
     byte[] cv = readBytes(in);
@@ -623,8 +511,8 @@ public class Mutation implements Writable {
     }
     
     // convert data to new format
-    SimpleReader din = new SimpleReader(localData);
-    buffer = new ByteBuffer();
+    UnsynchronizedBuffer.Reader din = new UnsynchronizedBuffer.Reader(localData);
+    buffer = new UnsynchronizedBuffer.Writer();
     for (int i = 0; i < localEntries; i++) {
       byte[] cf = oldReadBytes(din);
       byte[] cq = oldReadBytes(din);


Mime
View raw message