accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhutc...@apache.org
Subject [2/2] accumulo git commit: ACCUMULO-4229 SyncingTabletLocator ensures caching clients align with tserver
Date Thu, 05 May 2016 21:24:07 GMT
ACCUMULO-4229 SyncingTabletLocator ensures caching clients align with tserver

The previously failing test case now passes.

A boolean indicator variable in TabletLocator
is set to false when the static TabletLocator cache is cleared.
Client writers and readers should check the indicator
before using a locally cached Locator.

A SyncingTabletLocator automatically fetches the current TabletLocator
when the cached TabletLocator becomes outdated (when isValid flips false).
Clients now stay in sync with the static TabletLocator cache
through the use of SyncingTabletLocator, which
TimeoutTabletLocator now extends.

Variables assumed invariant for SyncingTabletLocator are now final.
This does not affect semantics.

The new design allows for TabletServerBatchWriter to use its local Locator cache
more aggressively than it used to.

The invalidateCache() methods of SyncingTabletLocator defensively sync,
guarding against goofy code that might use more than one TabletLocator.

It may be possible to replace the static locators collection with a Guava class.
This is not done here for simplicity.

Closes apache/accumulo#96

Closes apache/accumulo#97


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9aac9c0f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9aac9c0f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9aac9c0f

Branch: refs/heads/1.8
Commit: 9aac9c0f011e3b5e188a41956b01cfea15ffa9c5
Parents: 5275f33
Author: Dylan Hutchison <dhutchis@cs.washington.edu>
Authored: Sat Apr 23 01:33:24 2016 -0700
Committer: Dylan Hutchison <dhutchis@cs.washington.edu>
Committed: Thu May 5 14:15:24 2016 -0700

----------------------------------------------------------------------
 .../core/client/impl/ConditionalWriterImpl.java |   4 +-
 .../core/client/impl/SyncingTabletLocator.java  | 115 ++++++++
 .../core/client/impl/TabletLocator.java         |  12 +
 .../impl/TabletServerBatchReaderIterator.java   |   4 +-
 .../client/impl/TabletServerBatchWriter.java    |   9 +-
 .../core/client/impl/TimeoutTabletLocator.java  |  48 +---
 .../test/BatchWriterInTabletServerIT.java       | 126 +++++++++
 .../accumulo/test/BatchWriterIterator.java      |  49 ++--
 .../apache/accumulo/test/SerializationUtil.java | 261 -------------------
 .../accumulo/test/util/SerializationUtil.java   | 257 ++++++++++++++++++
 .../test/BatchWriterInTabletServerIT.java       | 118 ---------
 11 files changed, 557 insertions(+), 446 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/9aac9c0f/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
index 00140e9..1c1aaf3 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
@@ -113,7 +113,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
   private Map<Text,Boolean> cache = Collections.synchronizedMap(new LRUMap(1000));
   private final ClientContext context;
   private TabletLocator locator;
-  private String tableId;
+  private final String tableId;
   private long timeout;
   private final Durability durability;
   private final String classLoaderContext;
@@ -388,7 +388,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
     this.auths = config.getAuthorizations();
     this.ve = new VisibilityEvaluator(config.getAuthorizations());
     this.threadPool = new ScheduledThreadPoolExecutor(config.getMaxWriteThreads(), new NamingThreadFactory(this.getClass().getSimpleName()));
-    this.locator = TabletLocator.getLocator(context, tableId);
+    this.locator = new SyncingTabletLocator(context, tableId);
     this.serverQueues = new HashMap<String,ServerQueue>();
     this.tableId = tableId;
     this.timeout = config.getTimeout(TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9aac9c0f/core/src/main/java/org/apache/accumulo/core/client/impl/SyncingTabletLocator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/SyncingTabletLocator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/SyncingTabletLocator.java
new file mode 100644
index 0000000..6e7e072
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/SyncingTabletLocator.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+/**
+ * Syncs itself with the static collection of TabletLocators, so that when the server clears it, it will automatically get the most up-to-date version. Caching
+ * TabletLocators locally is safe when using SyncingTabletLocator.
+ */
+public class SyncingTabletLocator extends TabletLocator {
+  private static final Logger log = Logger.getLogger(SyncingTabletLocator.class);
+
+  private volatile TabletLocator locator;
+  private final Callable<TabletLocator> getLocatorFunction;
+
+  public SyncingTabletLocator(Callable<TabletLocator> getLocatorFunction) {
+    this.getLocatorFunction = getLocatorFunction;
+    try {
+      this.locator = getLocatorFunction.call();
+    } catch (Exception e) {
+      log.error("Problem obtaining TabletLocator", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  public SyncingTabletLocator(final ClientContext context, final String tableId) {
+    this(new Callable<TabletLocator>() {
+      @Override
+      public TabletLocator call() throws Exception {
+        return TabletLocator.getLocator(context, tableId);
+      }
+    });
+  }
+
+  private TabletLocator syncLocator() {
+    TabletLocator loc = this.locator;
+    if (!loc.isValid())
+      synchronized (this) {
+        if (locator == loc)
+          try {
+            loc = locator = getLocatorFunction.call();
+          } catch (Exception e) {
+            log.error("Problem obtaining TabletLocator", e);
+            throw new RuntimeException(e);
+          }
+      }
+    return loc;
+  }
+
+  @Override
+  public TabletLocation locateTablet(ClientContext context, Text row, boolean skipRow, boolean retry) throws AccumuloException, AccumuloSecurityException,
+      TableNotFoundException {
+    return syncLocator().locateTablet(context, row, skipRow, retry);
+  }
+
+  @Override
+  public <T extends Mutation> void binMutations(ClientContext context, List<T> mutations, Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures)
+      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    syncLocator().binMutations(context, mutations, binnedMutations, failures);
+  }
+
+  @Override
+  public List<Range> binRanges(ClientContext context, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException,
+      AccumuloSecurityException, TableNotFoundException {
+    return syncLocator().binRanges(context, ranges, binnedRanges);
+  }
+
+  @Override
+  public void invalidateCache(KeyExtent failedExtent) {
+    syncLocator().invalidateCache(failedExtent);
+  }
+
+  @Override
+  public void invalidateCache(Collection<KeyExtent> keySet) {
+    syncLocator().invalidateCache(keySet);
+  }
+
+  @Override
+  public void invalidateCache() {
+    syncLocator().invalidateCache();
+  }
+
+  @Override
+  public void invalidateCache(Instance instance, String server) {
+    syncLocator().invalidateCache(instance, server);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9aac9c0f/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java
index 4a28bff..6054c19 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java
@@ -40,6 +40,15 @@ import org.apache.hadoop.io.Text;
 
 public abstract class TabletLocator {
 
+  /**
+   * Flipped false on call to {@link #clearLocators}. Checked by client classes that locally cache Locators.
+   */
+  private volatile boolean isValid = true;
+
+  boolean isValid() {
+    return isValid;
+  }
+
   public abstract TabletLocation locateTablet(ClientContext context, Text row, boolean skipRow, boolean retry) throws AccumuloException,
       AccumuloSecurityException, TableNotFoundException;
 
@@ -93,6 +102,9 @@ public abstract class TabletLocator {
   private static HashMap<LocatorKey,TabletLocator> locators = new HashMap<LocatorKey,TabletLocator>();
 
   public static synchronized void clearLocators() {
+    for (TabletLocator locator : locators.values()) {
+      locator.isValid = false;
+    }
     locators.clear();
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9aac9c0f/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
index 13e52c0..baeb2c9 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
@@ -99,7 +99,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
 
   private Map<String,TimeoutTracker> timeoutTrackers;
   private Set<String> timedoutServers;
-  private long timeout;
+  private final long timeout;
 
   private TabletLocator locator;
 
@@ -119,7 +119,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
     this.options = new ScannerOptions(scannerOptions);
     resultsQueue = new ArrayBlockingQueue<List<Entry<Key,Value>>>(numThreads);
 
-    this.locator = new TimeoutTabletLocator(TabletLocator.getLocator(context, tableId), timeout);
+    this.locator = new TimeoutTabletLocator(timeout, context, tableId);
 
     timeoutTrackers = Collections.synchronizedMap(new HashMap<String,TabletServerBatchReaderIterator.TimeoutTracker>());
     timedoutServers = Collections.synchronizedSet(new HashSet<String>());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9aac9c0f/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
index 2880047..92f90a3 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
@@ -674,8 +674,7 @@ public class TabletServerBatchWriter {
     private synchronized TabletLocator getLocator(String tableId) {
       TabletLocator ret = locators.get(tableId);
       if (ret == null) {
-        ret = TabletLocator.getLocator(context, tableId);
-        ret = new TimeoutTabletLocator(ret, timeout);
+        ret = new TimeoutTabletLocator(timeout, context, tableId);
         locators.put(tableId, ret);
       }
 
@@ -899,7 +898,7 @@ public class TabletServerBatchWriter {
             tables.add(ke.getTableId());
 
           for (String table : tables)
-            TabletLocator.getLocator(context, table).invalidateCache(context.getInstance(), location);
+            getLocator(table).invalidateCache(context.getInstance(), location);
 
           failedMutations.add(location, tsm);
         } finally {
@@ -936,7 +935,7 @@ public class TabletServerBatchWriter {
               client.update(tinfo, context.rpcCreds(), entry.getKey().toThrift(), entry.getValue().get(0).toThrift(), DurabilityImpl.toThrift(durability));
             } catch (NotServingTabletException e) {
               allFailures.addAll(entry.getKey().getTableId(), entry.getValue());
-              TabletLocator.getLocator(context, entry.getKey().getTableId()).invalidateCache(entry.getKey());
+              getLocator(entry.getKey().getTableId()).invalidateCache(entry.getKey());
             } catch (ConstraintViolationException e) {
               updatedConstraintViolations(Translator.translate(e.violationSummaries, Translators.TCVST));
             }
@@ -977,7 +976,7 @@ public class TabletServerBatchWriter {
 
               String tableId = failedExtent.getTableId();
 
-              TabletLocator.getLocator(context, tableId).invalidateCache(failedExtent);
+              getLocator(tableId).invalidateCache(failedExtent);
 
               ArrayList<Mutation> mutations = (ArrayList<Mutation>) tabMuts.get(failedExtent);
               allFailures.addAll(tableId, mutations.subList(numCommitted, mutations.size()));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9aac9c0f/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java
index c0cb219..6e92b68 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java
@@ -16,13 +16,8 @@
  */
 package org.apache.accumulo.core.client.impl;
 
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.TimedOutException;
 import org.apache.accumulo.core.data.Mutation;
@@ -30,12 +25,16 @@ import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.hadoop.io.Text;
 
+import java.util.List;
+import java.util.Map;
+
 /**
- *
+ * Throws a {@link TimedOutException} if the specified timeout duration elapses between two failed TabletLocator calls.
+ * <p>
+ * This class is safe to cache locally.
  */
-public class TimeoutTabletLocator extends TabletLocator {
+public class TimeoutTabletLocator extends SyncingTabletLocator {
 
-  private TabletLocator locator;
   private long timeout;
   private Long firstFailTime = null;
 
@@ -51,17 +50,16 @@ public class TimeoutTabletLocator extends TabletLocator {
     firstFailTime = null;
   }
 
-  public TimeoutTabletLocator(TabletLocator locator, long timeout) {
-    this.locator = locator;
+  public TimeoutTabletLocator(long timeout, final ClientContext context, final String table) {
+    super(context, table);
     this.timeout = timeout;
   }
 
   @Override
   public TabletLocation locateTablet(ClientContext context, Text row, boolean skipRow, boolean retry) throws AccumuloException, AccumuloSecurityException,
       TableNotFoundException {
-
     try {
-      TabletLocation ret = locator.locateTablet(context, row, skipRow, retry);
+      TabletLocation ret = super.locateTablet(context, row, skipRow, retry);
 
       if (ret == null)
         failed();
@@ -79,7 +77,7 @@ public class TimeoutTabletLocator extends TabletLocator {
   public <T extends Mutation> void binMutations(ClientContext context, List<T> mutations, Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures)
       throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
     try {
-      locator.binMutations(context, mutations, binnedMutations, failures);
+      super.binMutations(context, mutations, binnedMutations, failures);
 
       if (failures.size() == mutations.size())
         failed();
@@ -95,9 +93,8 @@ public class TimeoutTabletLocator extends TabletLocator {
   @Override
   public List<Range> binRanges(ClientContext context, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException,
       AccumuloSecurityException, TableNotFoundException {
-
     try {
-      List<Range> ret = locator.binRanges(context, ranges, binnedRanges);
+      List<Range> ret = super.binRanges(context, ranges, binnedRanges);
 
       if (ranges.size() == ret.size())
         failed();
@@ -110,25 +107,4 @@ public class TimeoutTabletLocator extends TabletLocator {
       throw ae;
     }
   }
-
-  @Override
-  public void invalidateCache(KeyExtent failedExtent) {
-    locator.invalidateCache(failedExtent);
-  }
-
-  @Override
-  public void invalidateCache(Collection<KeyExtent> keySet) {
-    locator.invalidateCache(keySet);
-  }
-
-  @Override
-  public void invalidateCache() {
-    locator.invalidateCache();
-  }
-
-  @Override
-  public void invalidateCache(Instance instance, String server) {
-    locator.invalidateCache(instance, server);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9aac9c0f/test/src/main/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java b/test/src/main/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java
new file mode 100644
index 0000000..6bd5da4
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import com.google.common.collect.Iterators;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.LongCombiner;
+import org.apache.accumulo.core.iterators.user.SummingCombiner;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+/**
+ * Test writing to another table from inside an iterator.
+ *
+ * @see BatchWriterIterator
+ */
+public class BatchWriterInTabletServerIT extends AccumuloClusterHarness {
+  private static final Logger log = Logger.getLogger(BatchWriterInTabletServerIT.class);
+
+  @Override
+  public boolean canRunTest(ClusterType type) {
+    return ClusterType.MINI == type;
+  }
+
+  /**
+   * This test should succeed.
+   */
+  @Test
+  public void testNormalWrite() throws Exception {
+    String[] uniqueNames = getUniqueNames(2);
+    String t1 = uniqueNames[0], t2 = uniqueNames[1];
+    Connector c = getConnector();
+    int numEntriesToWritePerEntry = 50;
+    IteratorSetting itset = BatchWriterIterator.iteratorSetting(6, 0, 15, 1000, numEntriesToWritePerEntry, t2, c, getAdminToken(), false, false);
+    test(t1, t2, c, itset, numEntriesToWritePerEntry);
+  }
+
+  /**
+   * Fixed by ACCUMULO-4229.
+   * <p>
+   * This tests a situation that a client which shares a LocatorCache with the tablet server may fall into. Before the problem was fixed, adding a split after
+   * the Locator cache falls out of sync caused the BatchWriter to continuously attempt to write to an old, closed tablet. It would do so for 15 seconds until a
+   * timeout on the BatchWriter.
+   */
+  @Test
+  public void testClearLocatorAndSplitWrite() throws Exception {
+    String[] uniqueNames = getUniqueNames(2);
+    String t1 = uniqueNames[0], t2 = uniqueNames[1];
+    Connector c = getConnector();
+    int numEntriesToWritePerEntry = 50;
+    IteratorSetting itset = BatchWriterIterator.iteratorSetting(6, 0, 15, 1000, numEntriesToWritePerEntry, t2, c, getAdminToken(), true, true);
+    test(t1, t2, c, itset, numEntriesToWritePerEntry);
+  }
+
+  private void test(String t1, String t2, Connector c, IteratorSetting itset, int numEntriesToWritePerEntry) throws Exception {
+    // Write an entry to t1
+    c.tableOperations().create(t1);
+    Key k = new Key(new Text("row"), new Text("cf"), new Text("cq"));
+    Value v = new Value("1".getBytes());
+    {
+      BatchWriterConfig config = new BatchWriterConfig();
+      config.setMaxMemory(0);
+      BatchWriter writer = c.createBatchWriter(t1, config);
+      Mutation m = new Mutation(k.getRow());
+      m.put(k.getColumnFamily(), k.getColumnQualifier(), v);
+      writer.addMutation(m);
+      writer.close();
+    }
+
+    // Create t2 with a combiner to count entries written to it
+    c.tableOperations().create(t2);
+    IteratorSetting summer = new IteratorSetting(2, "summer", SummingCombiner.class);
+    LongCombiner.setEncodingType(summer, LongCombiner.Type.STRING);
+    LongCombiner.setCombineAllColumns(summer, true);
+    c.tableOperations().attachIterator(t2, summer);
+
+    Map.Entry<Key,Value> actual;
+    // Scan t1 with an iterator that writes to table t2
+    Scanner scanner = c.createScanner(t1, Authorizations.EMPTY);
+    scanner.addScanIterator(itset);
+    actual = Iterators.getOnlyElement(scanner.iterator());
+    Assert.assertTrue(actual.getKey().equals(k, PartialKey.ROW_COLFAM_COLQUAL));
+    Assert.assertEquals(BatchWriterIterator.SUCCESS_VALUE, actual.getValue());
+    scanner.close();
+
+    // ensure entries correctly wrote to table t2
+    scanner = c.createScanner(t2, Authorizations.EMPTY);
+    actual = Iterators.getOnlyElement(scanner.iterator());
+    log.debug("t2 entry is " + actual.getKey().toStringNoTime() + " -> " + actual.getValue());
+    Assert.assertTrue(actual.getKey().equals(k, PartialKey.ROW_COLFAM_COLQUAL));
+    Assert.assertEquals(numEntriesToWritePerEntry, Integer.parseInt(actual.getValue().toString()));
+    scanner.close();
+
+    c.tableOperations().delete(t1);
+    c.tableOperations().delete(t2);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9aac9c0f/test/src/main/java/org/apache/accumulo/test/BatchWriterIterator.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/BatchWriterIterator.java b/test/src/main/java/org/apache/accumulo/test/BatchWriterIterator.java
index 7d44729..b772f66 100644
--- a/test/src/main/java/org/apache/accumulo/test/BatchWriterIterator.java
+++ b/test/src/main/java/org/apache/accumulo/test/BatchWriterIterator.java
@@ -16,8 +16,6 @@
  */
 package org.apache.accumulo.test;
 
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.ClientConfiguration;
@@ -26,7 +24,6 @@ import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.TimedOutException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.impl.TabletLocator;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
@@ -38,12 +35,14 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.test.util.SerializationUtil;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -61,6 +60,8 @@ import java.util.concurrent.TimeUnit;
 public class BatchWriterIterator extends WrappingIterator {
   private static final Logger log = LoggerFactory.getLogger(BatchWriterIterator.class);
 
+  private Map<String,String> originalOptions; // remembered for deepCopy
+
   private int sleepAfterFirstWrite = 0;
   private int numEntriesToWritePerEntry = 10;
   private long batchWriterTimeout = 0;
@@ -128,6 +129,8 @@ public class BatchWriterIterator extends WrappingIterator {
   }
 
   private void parseOptions(Map<String,String> options) {
+    this.originalOptions = new HashMap<>(options);
+
     if (options.containsKey(OPT_numEntriesToWritePerEntry))
       numEntriesToWritePerEntry = Integer.parseInt(options.get(OPT_numEntriesToWritePerEntry));
     if (options.containsKey(OPT_sleepAfterFirstWrite))
@@ -157,10 +160,7 @@ public class BatchWriterIterator extends WrappingIterator {
     Instance instance = new ZooKeeperInstance(cc);
     try {
       connector = instance.getConnector(username, auth);
-    } catch (AccumuloException e) {
-      log.error("failed to connect to Accumulo instance " + instanceName, e);
-      throw new RuntimeException(e);
-    } catch (AccumuloSecurityException e) {
+    } catch (Exception e) {
       log.error("failed to connect to Accumulo instance " + instanceName, e);
       throw new RuntimeException(e);
     }
@@ -210,20 +210,9 @@ public class BatchWriterIterator extends WrappingIterator {
       }
 
       batchWriter.flush();
-    } catch (MutationsRejectedException e) {
-      log.error("", e);
-      failure = e.getClass().getSimpleName() + ": " + e.getMessage();
-    } catch (TimedOutException e) {
-      log.error("", e);
-      failure = e.getClass().getSimpleName() + ": " + e.getMessage();
-    } catch (AccumuloSecurityException e) {
-      log.error("", e);
-      failure = e.getClass().getSimpleName() + ": " + e.getMessage();
-    } catch (TableNotFoundException e) {
-      log.error("", e);
-      failure = e.getClass().getSimpleName() + ": " + e.getMessage();
-    } catch (AccumuloException e) {
-      log.error("", e);
+    } catch (Exception e) {
+      // in particular: watching for TimedOutException
+      log.error("Problem while BatchWriting to target table " + tableName, e);
       failure = e.getClass().getSimpleName() + ": " + e.getMessage();
     }
     topValue = failure == null ? SUCCESS_VALUE : new Value(failure.getBytes());
@@ -232,7 +221,11 @@ public class BatchWriterIterator extends WrappingIterator {
   @Override
   protected void finalize() throws Throwable {
     super.finalize();
-    batchWriter.close();
+    try {
+      batchWriter.close();
+    } catch (MutationsRejectedException e) {
+      log.error("Failed to close BatchWriter; some mutations may not be applied", e);
+    }
   }
 
   @Override
@@ -253,4 +246,16 @@ public class BatchWriterIterator extends WrappingIterator {
   public Value getTopValue() {
     return topValue;
   }
+
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    BatchWriterIterator newInstance;
+    try {
+      newInstance = this.getClass().newInstance();
+      newInstance.init(getSource().deepCopy(env), originalOptions, env);
+      return newInstance;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9aac9c0f/test/src/main/java/org/apache/accumulo/test/SerializationUtil.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/SerializationUtil.java b/test/src/main/java/org/apache/accumulo/test/SerializationUtil.java
deleted file mode 100644
index a093f44..0000000
--- a/test/src/main/java/org/apache/accumulo/test/SerializationUtil.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.io.Writable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-/**
- * Partially based from {@link org.apache.commons.lang3.SerializationUtils}.
- *
- * <p>
- * For serializing and de-serializing objects.
- * </p>
- */
-public class SerializationUtil {
-  private static final Logger log = LoggerFactory.getLogger(SerializationUtil.class);
-
-  private SerializationUtil() {}
-
-  /**
-   * Create a new instance of a class whose name is given, as a descendent of a given subclass.
-   */
-  public static <E> E subclassNewInstance(String classname, Class<E> parentClass) {
-    Class<?> c;
-    try {
-      c = Class.forName(classname);
-    } catch (ClassNotFoundException e) {
-      throw new IllegalArgumentException("Can't find class: " + classname, e);
-    }
-    Class<? extends E> cm;
-    try {
-      cm = c.asSubclass(parentClass);
-    } catch (ClassCastException e) {
-      throw new IllegalArgumentException(classname + " is not a subclass of " + parentClass.getName(), e);
-    }
-    try {
-      return cm.newInstance();
-    } catch (InstantiationException e) {
-      throw new IllegalArgumentException("can't instantiate new instance of " + cm.getName(), e);
-    } catch (IllegalAccessException e) {
-      throw new IllegalArgumentException("can't instantiate new instance of " + cm.getName(), e);
-    }
-  }
-
-  public static String serializeWritableBase64(Writable writable) {
-    byte[] b = serializeWritable(writable);
-    return org.apache.accumulo.core.util.Base64.encodeBase64String(b);
-  }
-
-  public static void deserializeWritableBase64(Writable writable, String str) {
-    byte[] b = Base64.decodeBase64(str);
-    deserializeWritable(writable, b);
-  }
-
-  public static String serializeBase64(Serializable obj) {
-    byte[] b = serialize(obj);
-    return org.apache.accumulo.core.util.Base64.encodeBase64String(b);
-  }
-
-  public static Object deserializeBase64(String str) {
-    byte[] b = Base64.decodeBase64(str);
-    return deserialize(b);
-  }
-
-  // Interop with Hadoop Writable
-  // -----------------------------------------------------------------------
-
-  public static byte[] serializeWritable(Writable writable) {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
-    serializeWritable(writable, baos);
-    return baos.toByteArray();
-  }
-
-  public static void serializeWritable(Writable obj, OutputStream outputStream) {
-    Preconditions.checkNotNull(obj);
-    Preconditions.checkNotNull(outputStream);
-    DataOutputStream out = null;
-    try {
-      out = new DataOutputStream(outputStream);
-      obj.write(out);
-    } catch (IOException ex) {
-      throw new RuntimeException(ex);
-    } finally {
-      if (out != null)
-        try {
-          out.close();
-        } catch (IOException e) {
-          log.error("cannot close", e);
-        }
-    }
-  }
-
-  public static void deserializeWritable(Writable writable, InputStream inputStream) {
-    Preconditions.checkNotNull(writable);
-    Preconditions.checkNotNull(inputStream);
-    DataInputStream in = null;
-    try {
-      in = new DataInputStream(inputStream);
-      writable.readFields(in);
-    } catch (IOException ex) {
-      throw new RuntimeException(ex);
-    } finally {
-      if (in != null)
-        try {
-          in.close();
-        } catch (IOException e) {
-          log.error("cannot close", e);
-        }
-    }
-  }
-
-  public static void deserializeWritable(Writable writable, byte[] objectData) {
-    Preconditions.checkNotNull(objectData);
-    deserializeWritable(writable, new ByteArrayInputStream(objectData));
-  }
-
-  // Serialize
-  // -----------------------------------------------------------------------
-
-  /**
-   * <p>
-   * Serializes an {@code Object} to the specified stream.
-   * </p>
-   * <p/>
-   * <p>
-   * The stream will be closed once the object is written. This avoids the need for a finally clause, and maybe also exception handling, in the application
-   * code.
-   * </p>
-   * <p/>
-   * <p>
-   * The stream passed in is not buffered internally within this method. This is the responsibility of your application if desired.
-   * </p>
-   *
-   * @param obj
-   *          the object to serialize to bytes, may be null
-   * @param outputStream
-   *          the stream to write to, must not be null
-   * @throws IllegalArgumentException
-   *           if {@code outputStream} is {@code null}
-   */
-  public static void serialize(Serializable obj, OutputStream outputStream) {
-    Preconditions.checkNotNull(outputStream);
-    ObjectOutputStream out = null;
-    try {
-      out = new ObjectOutputStream(outputStream);
-      out.writeObject(obj);
-    } catch (IOException ex) {
-      throw new RuntimeException(ex);
-    } finally {
-      if (out != null)
-        try {
-          out.close();
-        } catch (IOException e) {
-          log.error("cannot close", e);
-        }
-    }
-  }
-
-  /**
-   * <p>
-   * Serializes an {@code Object} to a byte array for storage/serialization.
-   * </p>
-   *
-   * @param obj
-   *          the object to serialize to bytes
-   * @return a byte[] with the converted Serializable
-   */
-  public static byte[] serialize(Serializable obj) {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
-    serialize(obj, baos);
-    return baos.toByteArray();
-  }
-
-  // Deserialize
-  // -----------------------------------------------------------------------
-
-  /**
-   * <p>
-   * Deserializes an {@code Object} from the specified stream.
-   * </p>
-   * <p/>
-   * <p>
-   * The stream will be closed once the object is written. This avoids the need for a finally clause, and maybe also exception handling, in the application
-   * code.
-   * </p>
-   * <p/>
-   * <p>
-   * The stream passed in is not buffered internally within this method. This is the responsibility of your application if desired.
-   * </p>
-   *
-   * @param inputStream
-   *          the serialized object input stream, must not be null
-   * @return the deserialized object
-   * @throws IllegalArgumentException
-   *           if {@code inputStream} is {@code null}
-   */
-  public static Object deserialize(InputStream inputStream) {
-    Preconditions.checkNotNull(inputStream);
-    ObjectInputStream in = null;
-    try {
-      in = new ObjectInputStream(inputStream);
-      return in.readObject();
-    } catch (ClassNotFoundException ex) {
-      throw new RuntimeException(ex);
-    } catch (IOException ex) {
-      throw new RuntimeException(ex);
-    } finally {
-      if (in != null)
-        try {
-          in.close();
-        } catch (IOException e) {
-          log.error("cannot close", e);
-        }
-    }
-  }
-
-  /**
-   * <p>
-   * Deserializes a single {@code Object} from an array of bytes.
-   * </p>
-   *
-   * @param objectData
-   *          the serialized object, must not be null
-   * @return the deserialized object
-   * @throws IllegalArgumentException
-   *           if {@code objectData} is {@code null}
-   */
-  public static Object deserialize(byte[] objectData) {
-    Preconditions.checkNotNull(objectData);
-    return deserialize(new ByteArrayInputStream(objectData));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9aac9c0f/test/src/main/java/org/apache/accumulo/test/util/SerializationUtil.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/util/SerializationUtil.java b/test/src/main/java/org/apache/accumulo/test/util/SerializationUtil.java
new file mode 100644
index 0000000..b683139
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/util/SerializationUtil.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.util;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Partially based from {@link org.apache.commons.lang3.SerializationUtils}.
+ *
+ * <p>
+ * For serializing and de-serializing objects.
+ * </p>
+ */
+public class SerializationUtil {
+  private static final Logger log = LoggerFactory.getLogger(SerializationUtil.class);
+
+  private SerializationUtil() {}
+
+  /**
+   * Create a new instance of a class whose name is given, as a descendent of a given subclass.
+   */
+  public static <E> E subclassNewInstance(String classname, Class<E> parentClass) {
+    Class<?> c;
+    try {
+      c = Class.forName(classname);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalArgumentException("Can't find class: " + classname, e);
+    }
+    Class<? extends E> cm;
+    try {
+      cm = c.asSubclass(parentClass);
+    } catch (ClassCastException e) {
+      throw new IllegalArgumentException(classname + " is not a subclass of " + parentClass.getName(), e);
+    }
+    try {
+      return cm.newInstance();
+    } catch (InstantiationException | IllegalAccessException e) {
+      throw new IllegalArgumentException("can't instantiate new instance of " + cm.getName(), e);
+    }
+  }
+
+  public static String serializeWritableBase64(Writable writable) {
+    byte[] b = serializeWritable(writable);
+    return org.apache.accumulo.core.util.Base64.encodeBase64String(b);
+  }
+
+  public static void deserializeWritableBase64(Writable writable, String str) {
+    byte[] b = Base64.decodeBase64(str);
+    deserializeWritable(writable, b);
+  }
+
+  public static String serializeBase64(Serializable obj) {
+    byte[] b = serialize(obj);
+    return org.apache.accumulo.core.util.Base64.encodeBase64String(b);
+  }
+
+  public static Object deserializeBase64(String str) {
+    byte[] b = Base64.decodeBase64(str);
+    return deserialize(b);
+  }
+
+  // Interop with Hadoop Writable
+  // -----------------------------------------------------------------------
+
+  public static byte[] serializeWritable(Writable writable) {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
+    serializeWritable(writable, baos);
+    return baos.toByteArray();
+  }
+
+  public static void serializeWritable(Writable obj, OutputStream outputStream) {
+    Objects.requireNonNull(obj);
+    Objects.requireNonNull(outputStream);
+    DataOutputStream out = null;
+    try {
+      out = new DataOutputStream(outputStream);
+      obj.write(out);
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    } finally {
+      if (out != null)
+        try {
+          out.close();
+        } catch (IOException e) {
+          log.error("cannot close", e);
+        }
+    }
+  }
+
+  public static void deserializeWritable(Writable writable, InputStream inputStream) {
+    Objects.requireNonNull(writable);
+    Objects.requireNonNull(inputStream);
+    DataInputStream in = null;
+    try {
+      in = new DataInputStream(inputStream);
+      writable.readFields(in);
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    } finally {
+      if (in != null)
+        try {
+          in.close();
+        } catch (IOException e) {
+          log.error("cannot close", e);
+        }
+    }
+  }
+
+  public static void deserializeWritable(Writable writable, byte[] objectData) {
+    Objects.requireNonNull(objectData);
+    deserializeWritable(writable, new ByteArrayInputStream(objectData));
+  }
+
+  // Serialize
+  // -----------------------------------------------------------------------
+
+  /**
+   * <p>
+   * Serializes an {@code Object} to the specified stream.
+   * </p>
+   * <p/>
+   * <p>
+   * The stream will be closed once the object is written. This avoids the need for a finally clause, and maybe also exception handling, in the application
+   * code.
+   * </p>
+   * <p/>
+   * <p>
+   * The stream passed in is not buffered internally within this method. This is the responsibility of your application if desired.
+   * </p>
+   *
+   * @param obj
+   *          the object to serialize to bytes, may be null
+   * @param outputStream
+   *          the stream to write to, must not be null
+   * @throws IllegalArgumentException
+   *           if {@code outputStream} is {@code null}
+   */
+  public static void serialize(Serializable obj, OutputStream outputStream) {
+    Objects.requireNonNull(outputStream);
+    ObjectOutputStream out = null;
+    try {
+      out = new ObjectOutputStream(outputStream);
+      out.writeObject(obj);
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    } finally {
+      if (out != null)
+        try {
+          out.close();
+        } catch (IOException e) {
+          log.error("cannot close", e);
+        }
+    }
+  }
+
+  /**
+   * <p>
+   * Serializes an {@code Object} to a byte array for storage/serialization.
+   * </p>
+   *
+   * @param obj
+   *          the object to serialize to bytes
+   * @return a byte[] with the converted Serializable
+   */
+  public static byte[] serialize(Serializable obj) {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
+    serialize(obj, baos);
+    return baos.toByteArray();
+  }
+
+  // Deserialize
+  // -----------------------------------------------------------------------
+
+  /**
+   * <p>
+   * Deserializes an {@code Object} from the specified stream.
+   * </p>
+   * <p/>
+   * <p>
+   * The stream will be closed once the object is written. This avoids the need for a finally clause, and maybe also exception handling, in the application
+   * code.
+   * </p>
+   * <p/>
+   * <p>
+   * The stream passed in is not buffered internally within this method. This is the responsibility of your application if desired.
+   * </p>
+   *
+   * @param inputStream
+   *          the serialized object input stream, must not be null
+   * @return the deserialized object
+   * @throws IllegalArgumentException
+   *           if {@code inputStream} is {@code null}
+   */
+  public static Object deserialize(InputStream inputStream) {
+    Objects.requireNonNull(inputStream);
+    ObjectInputStream in = null;
+    try {
+      in = new ObjectInputStream(inputStream);
+      return in.readObject();
+    } catch (ClassNotFoundException | IOException ex) {
+      throw new RuntimeException(ex);
+    } finally {
+      if (in != null)
+        try {
+          in.close();
+        } catch (IOException e) {
+          log.error("cannot close", e);
+        }
+    }
+  }
+
+  /**
+   * <p>
+   * Deserializes a single {@code Object} from an array of bytes.
+   * </p>
+   *
+   * @param objectData
+   *          the serialized object, must not be null
+   * @return the deserialized object
+   * @throws IllegalArgumentException
+   *           if {@code objectData} is {@code null}
+   */
+  public static Object deserialize(byte[] objectData) {
+    Objects.requireNonNull(objectData);
+    return deserialize(new ByteArrayInputStream(objectData));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9aac9c0f/test/src/test/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java b/test/src/test/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java
deleted file mode 100644
index 2551c8e..0000000
--- a/test/src/test/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test;
-
-import com.google.common.collect.Iterators;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.PartialKey;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.LongCombiner;
-import org.apache.accumulo.core.iterators.user.SummingCombiner;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.harness.AccumuloClusterIT;
-import org.apache.hadoop.io.Text;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Map;
-
-/**
- * Test writing to another table from inside an iterator.
- *
- * @see BatchWriterIterator
- */
-public class BatchWriterInTabletServerIT extends AccumuloClusterIT {
-
-  /**
-   * This test should succeed.
-   */
-  @Test
-  public void testNormalWrite() throws Exception {
-    String[] uniqueNames = getUniqueNames(2);
-    String t1 = uniqueNames[0], t2 = uniqueNames[1];
-    Connector c = getConnector();
-    int numEntriesToWritePerEntry = 50;
-    IteratorSetting itset = BatchWriterIterator.iteratorSetting(6, 0, 15, 1000, numEntriesToWritePerEntry, t2, c, getToken(), false, false);
-    test(t1, t2, c, itset, numEntriesToWritePerEntry);
-  }
-
-  /**
-   * ACCUMULO-4229
-   * <p>
-   * This test should fail because the client shares a LocatorCache with the tablet server. Adding a split after the Locator cache falls out of sync causes the
-   * BatchWriter to continuously attempt to write to an old, closed tablet. It will only do so for 15 seconds because we set a timeout on the BatchWriter.
-   */
-  @Test
-  public void testClearLocatorAndSplitWrite() throws Exception {
-    String[] uniqueNames = getUniqueNames(2);
-    String t1 = uniqueNames[0], t2 = uniqueNames[1];
-    Connector c = getConnector();
-    int numEntriesToWritePerEntry = 50;
-    IteratorSetting itset = BatchWriterIterator.iteratorSetting(6, 0, 15, 1000, numEntriesToWritePerEntry, t2, c, getToken(), true, true);
-    test(t1, t2, c, itset, numEntriesToWritePerEntry);
-  }
-
-  private void test(String t1, String t2, Connector c, IteratorSetting itset, int numEntriesToWritePerEntry) throws Exception {
-    // Write an entry to t1
-    c.tableOperations().create(t1);
-    Key k = new Key(new Text("row"), new Text("cf"), new Text("cq"));
-    Value v = new Value("1".getBytes());
-    {
-      BatchWriterConfig config = new BatchWriterConfig();
-      config.setMaxMemory(0);
-      BatchWriter writer = c.createBatchWriter(t1, config);
-      Mutation m = new Mutation(k.getRow());
-      m.put(k.getColumnFamily(), k.getColumnQualifier(), v);
-      writer.addMutation(m);
-      writer.close();
-    }
-
-    // Create t2 with a combiner to count entries written to it
-    c.tableOperations().create(t2);
-    IteratorSetting summer = new IteratorSetting(2, "summer", SummingCombiner.class);
-    LongCombiner.setEncodingType(summer, LongCombiner.Type.STRING);
-    LongCombiner.setCombineAllColumns(summer, true);
-    c.tableOperations().attachIterator(t2, summer);
-
-    Map.Entry<Key,Value> actual;
-    // Scan t1 with an iterator that writes to table t2
-    Scanner scanner = c.createScanner(t1, Authorizations.EMPTY);
-    scanner.addScanIterator(itset);
-    actual = Iterators.getOnlyElement(scanner.iterator());
-    Assert.assertTrue(actual.getKey().equals(k, PartialKey.ROW_COLFAM_COLQUAL));
-    Assert.assertEquals(BatchWriterIterator.SUCCESS_VALUE, actual.getValue());
-    scanner.close();
-
-    // ensure entries correctly wrote to table t2
-    scanner = c.createScanner(t2, Authorizations.EMPTY);
-    actual = Iterators.getOnlyElement(scanner.iterator());
-    // System.out.println("t2 entry is " + actual.getKey().toStringNoTime() + " -> " + actual.getValue());
-    Assert.assertTrue(actual.getKey().equals(k, PartialKey.ROW_COLFAM_COLQUAL));
-    Assert.assertEquals(numEntriesToWritePerEntry, Integer.parseInt(actual.getValue().toString()));
-    scanner.close();
-
-    c.tableOperations().delete(t1);
-    c.tableOperations().delete(t2);
-  }
-
-}


Mime
View raw message