Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B7C992009F2 for ; Thu, 5 May 2016 23:26:23 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B65EF1609F9; Thu, 5 May 2016 21:26:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C2131160A04 for ; Thu, 5 May 2016 23:26:21 +0200 (CEST) Received: (qmail 89826 invoked by uid 500); 5 May 2016 21:26:21 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 89745 invoked by uid 99); 5 May 2016 21:26:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 May 2016 21:26:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C464FDFDE3; Thu, 5 May 2016 21:26:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhutchis@apache.org To: commits@accumulo.apache.org Date: Thu, 05 May 2016 21:26:21 -0000 Message-Id: In-Reply-To: <437ce595d9fa40e18819a0dffe6af190@git.apache.org> References: <437ce595d9fa40e18819a0dffe6af190@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] accumulo git commit: ACCUMULO-4229 SyncingTabletLocator ensures caching clients align with tserver archived-at: Thu, 05 May 2016 21:26:23 -0000 ACCUMULO-4229 SyncingTabletLocator ensures caching clients align with tserver The previously failing test case now passes. A boolean indicator variable in TabletLocator is set to false when the static TabletLocator cache is cleared. Client writers and readers should check the indicator before using a locally cached Locator. A SyncingTabletLocator automatically fetches the current TabletLocator when the cached TabletLocator becomes outdated (when isValid flips false). Clients now stay in sync with the static TabletLocator cache through the use of SyncingTabletLocator, which TimeoutTabletLocator now extends. Variables assumed invariant for SyncingTabletLocator are now final. This does not affect semantics. The new design allows for TabletServerBatchWriter to use its local Locator cache more aggressively than it used to. The invalidateCache() methods of SyncingTabletLocator defensively sync, guarding against goofy code that might use more than one TabletLocator. It may be possible to replace the static locators collection with a Guava class. This is not done here for simplicity. Closes apache/accumulo#96 Closes apache/accumulo#97 Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9aac9c0f Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9aac9c0f Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9aac9c0f Branch: refs/heads/master Commit: 9aac9c0f011e3b5e188a41956b01cfea15ffa9c5 Parents: 5275f33 Author: Dylan Hutchison Authored: Sat Apr 23 01:33:24 2016 -0700 Committer: Dylan Hutchison Committed: Thu May 5 14:15:24 2016 -0700 ---------------------------------------------------------------------- .../core/client/impl/ConditionalWriterImpl.java | 4 +- .../core/client/impl/SyncingTabletLocator.java | 115 ++++++++ .../core/client/impl/TabletLocator.java | 12 + .../impl/TabletServerBatchReaderIterator.java | 4 +- .../client/impl/TabletServerBatchWriter.java | 9 +- .../core/client/impl/TimeoutTabletLocator.java | 48 +--- .../test/BatchWriterInTabletServerIT.java | 126 +++++++++ .../accumulo/test/BatchWriterIterator.java | 49 ++-- .../apache/accumulo/test/SerializationUtil.java | 261 ------------------- .../accumulo/test/util/SerializationUtil.java | 257 ++++++++++++++++++ .../test/BatchWriterInTabletServerIT.java | 118 --------- 11 files changed, 557 insertions(+), 446 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/9aac9c0f/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java index 00140e9..1c1aaf3 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java @@ -113,7 +113,7 @@ class ConditionalWriterImpl implements ConditionalWriter { private Map cache = Collections.synchronizedMap(new LRUMap(1000)); private final ClientContext context; private TabletLocator locator; - private String tableId; + private final String tableId; private long timeout; private final Durability durability; private final String classLoaderContext; @@ -388,7 +388,7 @@ class ConditionalWriterImpl implements ConditionalWriter { this.auths = config.getAuthorizations(); this.ve = new VisibilityEvaluator(config.getAuthorizations()); this.threadPool = new ScheduledThreadPoolExecutor(config.getMaxWriteThreads(), new NamingThreadFactory(this.getClass().getSimpleName())); - this.locator = TabletLocator.getLocator(context, tableId); + this.locator = new SyncingTabletLocator(context, tableId); this.serverQueues = new HashMap(); this.tableId = tableId; this.timeout = config.getTimeout(TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/accumulo/blob/9aac9c0f/core/src/main/java/org/apache/accumulo/core/client/impl/SyncingTabletLocator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/SyncingTabletLocator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/SyncingTabletLocator.java new file mode 100644 index 0000000..6e7e072 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/SyncingTabletLocator.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.impl; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +/** + * Syncs itself with the static collection of TabletLocators, so that when the server clears it, it will automatically get the most up-to-date version. Caching + * TabletLocators locally is safe when using SyncingTabletLocator. + */ +public class SyncingTabletLocator extends TabletLocator { + private static final Logger log = Logger.getLogger(SyncingTabletLocator.class); + + private volatile TabletLocator locator; + private final Callable getLocatorFunction; + + public SyncingTabletLocator(Callable getLocatorFunction) { + this.getLocatorFunction = getLocatorFunction; + try { + this.locator = getLocatorFunction.call(); + } catch (Exception e) { + log.error("Problem obtaining TabletLocator", e); + throw new RuntimeException(e); + } + } + + public SyncingTabletLocator(final ClientContext context, final String tableId) { + this(new Callable() { + @Override + public TabletLocator call() throws Exception { + return TabletLocator.getLocator(context, tableId); + } + }); + } + + private TabletLocator syncLocator() { + TabletLocator loc = this.locator; + if (!loc.isValid()) + synchronized (this) { + if (locator == loc) + try { + loc = locator = getLocatorFunction.call(); + } catch (Exception e) { + log.error("Problem obtaining TabletLocator", e); + throw new RuntimeException(e); + } + } + return loc; + } + + @Override + public TabletLocation locateTablet(ClientContext context, Text row, boolean skipRow, boolean retry) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException { + return syncLocator().locateTablet(context, row, skipRow, retry); + } + + @Override + public void binMutations(ClientContext context, List mutations, Map> binnedMutations, List failures) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + syncLocator().binMutations(context, mutations, binnedMutations, failures); + } + + @Override + public List binRanges(ClientContext context, List ranges, Map>> binnedRanges) throws AccumuloException, + AccumuloSecurityException, TableNotFoundException { + return syncLocator().binRanges(context, ranges, binnedRanges); + } + + @Override + public void invalidateCache(KeyExtent failedExtent) { + syncLocator().invalidateCache(failedExtent); + } + + @Override + public void invalidateCache(Collection keySet) { + syncLocator().invalidateCache(keySet); + } + + @Override + public void invalidateCache() { + syncLocator().invalidateCache(); + } + + @Override + public void invalidateCache(Instance instance, String server) { + syncLocator().invalidateCache(instance, server); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/9aac9c0f/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java index 4a28bff..6054c19 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocator.java @@ -40,6 +40,15 @@ import org.apache.hadoop.io.Text; public abstract class TabletLocator { + /** + * Flipped false on call to {@link #clearLocators}. Checked by client classes that locally cache Locators. + */ + private volatile boolean isValid = true; + + boolean isValid() { + return isValid; + } + public abstract TabletLocation locateTablet(ClientContext context, Text row, boolean skipRow, boolean retry) throws AccumuloException, AccumuloSecurityException, TableNotFoundException; @@ -93,6 +102,9 @@ public abstract class TabletLocator { private static HashMap locators = new HashMap(); public static synchronized void clearLocators() { + for (TabletLocator locator : locators.values()) { + locator.isValid = false; + } locators.clear(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/9aac9c0f/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java index 13e52c0..baeb2c9 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java @@ -99,7 +99,7 @@ public class TabletServerBatchReaderIterator implements Iterator timeoutTrackers; private Set timedoutServers; - private long timeout; + private final long timeout; private TabletLocator locator; @@ -119,7 +119,7 @@ public class TabletServerBatchReaderIterator implements Iterator>>(numThreads); - this.locator = new TimeoutTabletLocator(TabletLocator.getLocator(context, tableId), timeout); + this.locator = new TimeoutTabletLocator(timeout, context, tableId); timeoutTrackers = Collections.synchronizedMap(new HashMap()); timedoutServers = Collections.synchronizedSet(new HashSet()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/9aac9c0f/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java index 2880047..92f90a3 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java @@ -674,8 +674,7 @@ public class TabletServerBatchWriter { private synchronized TabletLocator getLocator(String tableId) { TabletLocator ret = locators.get(tableId); if (ret == null) { - ret = TabletLocator.getLocator(context, tableId); - ret = new TimeoutTabletLocator(ret, timeout); + ret = new TimeoutTabletLocator(timeout, context, tableId); locators.put(tableId, ret); } @@ -899,7 +898,7 @@ public class TabletServerBatchWriter { tables.add(ke.getTableId()); for (String table : tables) - TabletLocator.getLocator(context, table).invalidateCache(context.getInstance(), location); + getLocator(table).invalidateCache(context.getInstance(), location); failedMutations.add(location, tsm); } finally { @@ -936,7 +935,7 @@ public class TabletServerBatchWriter { client.update(tinfo, context.rpcCreds(), entry.getKey().toThrift(), entry.getValue().get(0).toThrift(), DurabilityImpl.toThrift(durability)); } catch (NotServingTabletException e) { allFailures.addAll(entry.getKey().getTableId(), entry.getValue()); - TabletLocator.getLocator(context, entry.getKey().getTableId()).invalidateCache(entry.getKey()); + getLocator(entry.getKey().getTableId()).invalidateCache(entry.getKey()); } catch (ConstraintViolationException e) { updatedConstraintViolations(Translator.translate(e.violationSummaries, Translators.TCVST)); } @@ -977,7 +976,7 @@ public class TabletServerBatchWriter { String tableId = failedExtent.getTableId(); - TabletLocator.getLocator(context, tableId).invalidateCache(failedExtent); + getLocator(tableId).invalidateCache(failedExtent); ArrayList mutations = (ArrayList) tabMuts.get(failedExtent); allFailures.addAll(tableId, mutations.subList(numCommitted, mutations.size())); http://git-wip-us.apache.org/repos/asf/accumulo/blob/9aac9c0f/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java index c0cb219..6e92b68 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TimeoutTabletLocator.java @@ -16,13 +16,8 @@ */ package org.apache.accumulo.core.client.impl; -import java.util.Collection; -import java.util.List; -import java.util.Map; - import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TimedOutException; import org.apache.accumulo.core.data.Mutation; @@ -30,12 +25,16 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.hadoop.io.Text; +import java.util.List; +import java.util.Map; + /** - * + * Throws a {@link TimedOutException} if the specified timeout duration elapses between two failed TabletLocator calls. + *

+ * This class is safe to cache locally. */ -public class TimeoutTabletLocator extends TabletLocator { +public class TimeoutTabletLocator extends SyncingTabletLocator { - private TabletLocator locator; private long timeout; private Long firstFailTime = null; @@ -51,17 +50,16 @@ public class TimeoutTabletLocator extends TabletLocator { firstFailTime = null; } - public TimeoutTabletLocator(TabletLocator locator, long timeout) { - this.locator = locator; + public TimeoutTabletLocator(long timeout, final ClientContext context, final String table) { + super(context, table); this.timeout = timeout; } @Override public TabletLocation locateTablet(ClientContext context, Text row, boolean skipRow, boolean retry) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - try { - TabletLocation ret = locator.locateTablet(context, row, skipRow, retry); + TabletLocation ret = super.locateTablet(context, row, skipRow, retry); if (ret == null) failed(); @@ -79,7 +77,7 @@ public class TimeoutTabletLocator extends TabletLocator { public void binMutations(ClientContext context, List mutations, Map> binnedMutations, List failures) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { try { - locator.binMutations(context, mutations, binnedMutations, failures); + super.binMutations(context, mutations, binnedMutations, failures); if (failures.size() == mutations.size()) failed(); @@ -95,9 +93,8 @@ public class TimeoutTabletLocator extends TabletLocator { @Override public List binRanges(ClientContext context, List ranges, Map>> binnedRanges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - try { - List ret = locator.binRanges(context, ranges, binnedRanges); + List ret = super.binRanges(context, ranges, binnedRanges); if (ranges.size() == ret.size()) failed(); @@ -110,25 +107,4 @@ public class TimeoutTabletLocator extends TabletLocator { throw ae; } } - - @Override - public void invalidateCache(KeyExtent failedExtent) { - locator.invalidateCache(failedExtent); - } - - @Override - public void invalidateCache(Collection keySet) { - locator.invalidateCache(keySet); - } - - @Override - public void invalidateCache() { - locator.invalidateCache(); - } - - @Override - public void invalidateCache(Instance instance, String server) { - locator.invalidateCache(instance, server); - } - } http://git-wip-us.apache.org/repos/asf/accumulo/blob/9aac9c0f/test/src/main/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java b/test/src/main/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java new file mode 100644 index 0000000..6bd5da4 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import com.google.common.collect.Iterators; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.LongCombiner; +import org.apache.accumulo.core.iterators.user.SummingCombiner; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +/** + * Test writing to another table from inside an iterator. + * + * @see BatchWriterIterator + */ +public class BatchWriterInTabletServerIT extends AccumuloClusterHarness { + private static final Logger log = Logger.getLogger(BatchWriterInTabletServerIT.class); + + @Override + public boolean canRunTest(ClusterType type) { + return ClusterType.MINI == type; + } + + /** + * This test should succeed. + */ + @Test + public void testNormalWrite() throws Exception { + String[] uniqueNames = getUniqueNames(2); + String t1 = uniqueNames[0], t2 = uniqueNames[1]; + Connector c = getConnector(); + int numEntriesToWritePerEntry = 50; + IteratorSetting itset = BatchWriterIterator.iteratorSetting(6, 0, 15, 1000, numEntriesToWritePerEntry, t2, c, getAdminToken(), false, false); + test(t1, t2, c, itset, numEntriesToWritePerEntry); + } + + /** + * Fixed by ACCUMULO-4229. + *

+ * This tests a situation that a client which shares a LocatorCache with the tablet server may fall into. Before the problem was fixed, adding a split after + * the Locator cache falls out of sync caused the BatchWriter to continuously attempt to write to an old, closed tablet. It would do so for 15 seconds until a + * timeout on the BatchWriter. + */ + @Test + public void testClearLocatorAndSplitWrite() throws Exception { + String[] uniqueNames = getUniqueNames(2); + String t1 = uniqueNames[0], t2 = uniqueNames[1]; + Connector c = getConnector(); + int numEntriesToWritePerEntry = 50; + IteratorSetting itset = BatchWriterIterator.iteratorSetting(6, 0, 15, 1000, numEntriesToWritePerEntry, t2, c, getAdminToken(), true, true); + test(t1, t2, c, itset, numEntriesToWritePerEntry); + } + + private void test(String t1, String t2, Connector c, IteratorSetting itset, int numEntriesToWritePerEntry) throws Exception { + // Write an entry to t1 + c.tableOperations().create(t1); + Key k = new Key(new Text("row"), new Text("cf"), new Text("cq")); + Value v = new Value("1".getBytes()); + { + BatchWriterConfig config = new BatchWriterConfig(); + config.setMaxMemory(0); + BatchWriter writer = c.createBatchWriter(t1, config); + Mutation m = new Mutation(k.getRow()); + m.put(k.getColumnFamily(), k.getColumnQualifier(), v); + writer.addMutation(m); + writer.close(); + } + + // Create t2 with a combiner to count entries written to it + c.tableOperations().create(t2); + IteratorSetting summer = new IteratorSetting(2, "summer", SummingCombiner.class); + LongCombiner.setEncodingType(summer, LongCombiner.Type.STRING); + LongCombiner.setCombineAllColumns(summer, true); + c.tableOperations().attachIterator(t2, summer); + + Map.Entry actual; + // Scan t1 with an iterator that writes to table t2 + Scanner scanner = c.createScanner(t1, Authorizations.EMPTY); + scanner.addScanIterator(itset); + actual = Iterators.getOnlyElement(scanner.iterator()); + Assert.assertTrue(actual.getKey().equals(k, PartialKey.ROW_COLFAM_COLQUAL)); + Assert.assertEquals(BatchWriterIterator.SUCCESS_VALUE, actual.getValue()); + scanner.close(); + + // ensure entries correctly wrote to table t2 + scanner = c.createScanner(t2, Authorizations.EMPTY); + actual = Iterators.getOnlyElement(scanner.iterator()); + log.debug("t2 entry is " + actual.getKey().toStringNoTime() + " -> " + actual.getValue()); + Assert.assertTrue(actual.getKey().equals(k, PartialKey.ROW_COLFAM_COLQUAL)); + Assert.assertEquals(numEntriesToWritePerEntry, Integer.parseInt(actual.getValue().toString())); + scanner.close(); + + c.tableOperations().delete(t1); + c.tableOperations().delete(t2); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/9aac9c0f/test/src/main/java/org/apache/accumulo/test/BatchWriterIterator.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/BatchWriterIterator.java b/test/src/main/java/org/apache/accumulo/test/BatchWriterIterator.java index 7d44729..b772f66 100644 --- a/test/src/main/java/org/apache/accumulo/test/BatchWriterIterator.java +++ b/test/src/main/java/org/apache/accumulo/test/BatchWriterIterator.java @@ -16,8 +16,6 @@ */ package org.apache.accumulo.test; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.ClientConfiguration; @@ -26,7 +24,6 @@ import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.TimedOutException; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.impl.TabletLocator; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; @@ -38,12 +35,14 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.test.util.SerializationUtil; import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collection; +import java.util.HashMap; import java.util.Map; import java.util.SortedSet; import java.util.TreeSet; @@ -61,6 +60,8 @@ import java.util.concurrent.TimeUnit; public class BatchWriterIterator extends WrappingIterator { private static final Logger log = LoggerFactory.getLogger(BatchWriterIterator.class); + private Map originalOptions; // remembered for deepCopy + private int sleepAfterFirstWrite = 0; private int numEntriesToWritePerEntry = 10; private long batchWriterTimeout = 0; @@ -128,6 +129,8 @@ public class BatchWriterIterator extends WrappingIterator { } private void parseOptions(Map options) { + this.originalOptions = new HashMap<>(options); + if (options.containsKey(OPT_numEntriesToWritePerEntry)) numEntriesToWritePerEntry = Integer.parseInt(options.get(OPT_numEntriesToWritePerEntry)); if (options.containsKey(OPT_sleepAfterFirstWrite)) @@ -157,10 +160,7 @@ public class BatchWriterIterator extends WrappingIterator { Instance instance = new ZooKeeperInstance(cc); try { connector = instance.getConnector(username, auth); - } catch (AccumuloException e) { - log.error("failed to connect to Accumulo instance " + instanceName, e); - throw new RuntimeException(e); - } catch (AccumuloSecurityException e) { + } catch (Exception e) { log.error("failed to connect to Accumulo instance " + instanceName, e); throw new RuntimeException(e); } @@ -210,20 +210,9 @@ public class BatchWriterIterator extends WrappingIterator { } batchWriter.flush(); - } catch (MutationsRejectedException e) { - log.error("", e); - failure = e.getClass().getSimpleName() + ": " + e.getMessage(); - } catch (TimedOutException e) { - log.error("", e); - failure = e.getClass().getSimpleName() + ": " + e.getMessage(); - } catch (AccumuloSecurityException e) { - log.error("", e); - failure = e.getClass().getSimpleName() + ": " + e.getMessage(); - } catch (TableNotFoundException e) { - log.error("", e); - failure = e.getClass().getSimpleName() + ": " + e.getMessage(); - } catch (AccumuloException e) { - log.error("", e); + } catch (Exception e) { + // in particular: watching for TimedOutException + log.error("Problem while BatchWriting to target table " + tableName, e); failure = e.getClass().getSimpleName() + ": " + e.getMessage(); } topValue = failure == null ? SUCCESS_VALUE : new Value(failure.getBytes()); @@ -232,7 +221,11 @@ public class BatchWriterIterator extends WrappingIterator { @Override protected void finalize() throws Throwable { super.finalize(); - batchWriter.close(); + try { + batchWriter.close(); + } catch (MutationsRejectedException e) { + log.error("Failed to close BatchWriter; some mutations may not be applied", e); + } } @Override @@ -253,4 +246,16 @@ public class BatchWriterIterator extends WrappingIterator { public Value getTopValue() { return topValue; } + + @Override + public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { + BatchWriterIterator newInstance; + try { + newInstance = this.getClass().newInstance(); + newInstance.init(getSource().deepCopy(env), originalOptions, env); + return newInstance; + } catch (Exception e) { + throw new RuntimeException(e); + } + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/9aac9c0f/test/src/main/java/org/apache/accumulo/test/SerializationUtil.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/SerializationUtil.java b/test/src/main/java/org/apache/accumulo/test/SerializationUtil.java deleted file mode 100644 index a093f44..0000000 --- a/test/src/main/java/org/apache/accumulo/test/SerializationUtil.java +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test; - -import com.google.common.base.Preconditions; -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.io.Writable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.OutputStream; -import java.io.Serializable; - -/** - * Partially based from {@link org.apache.commons.lang3.SerializationUtils}. - * - *

- * For serializing and de-serializing objects. - *

- */ -public class SerializationUtil { - private static final Logger log = LoggerFactory.getLogger(SerializationUtil.class); - - private SerializationUtil() {} - - /** - * Create a new instance of a class whose name is given, as a descendent of a given subclass. - */ - public static E subclassNewInstance(String classname, Class parentClass) { - Class c; - try { - c = Class.forName(classname); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException("Can't find class: " + classname, e); - } - Class cm; - try { - cm = c.asSubclass(parentClass); - } catch (ClassCastException e) { - throw new IllegalArgumentException(classname + " is not a subclass of " + parentClass.getName(), e); - } - try { - return cm.newInstance(); - } catch (InstantiationException e) { - throw new IllegalArgumentException("can't instantiate new instance of " + cm.getName(), e); - } catch (IllegalAccessException e) { - throw new IllegalArgumentException("can't instantiate new instance of " + cm.getName(), e); - } - } - - public static String serializeWritableBase64(Writable writable) { - byte[] b = serializeWritable(writable); - return org.apache.accumulo.core.util.Base64.encodeBase64String(b); - } - - public static void deserializeWritableBase64(Writable writable, String str) { - byte[] b = Base64.decodeBase64(str); - deserializeWritable(writable, b); - } - - public static String serializeBase64(Serializable obj) { - byte[] b = serialize(obj); - return org.apache.accumulo.core.util.Base64.encodeBase64String(b); - } - - public static Object deserializeBase64(String str) { - byte[] b = Base64.decodeBase64(str); - return deserialize(b); - } - - // Interop with Hadoop Writable - // ----------------------------------------------------------------------- - - public static byte[] serializeWritable(Writable writable) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - serializeWritable(writable, baos); - return baos.toByteArray(); - } - - public static void serializeWritable(Writable obj, OutputStream outputStream) { - Preconditions.checkNotNull(obj); - Preconditions.checkNotNull(outputStream); - DataOutputStream out = null; - try { - out = new DataOutputStream(outputStream); - obj.write(out); - } catch (IOException ex) { - throw new RuntimeException(ex); - } finally { - if (out != null) - try { - out.close(); - } catch (IOException e) { - log.error("cannot close", e); - } - } - } - - public static void deserializeWritable(Writable writable, InputStream inputStream) { - Preconditions.checkNotNull(writable); - Preconditions.checkNotNull(inputStream); - DataInputStream in = null; - try { - in = new DataInputStream(inputStream); - writable.readFields(in); - } catch (IOException ex) { - throw new RuntimeException(ex); - } finally { - if (in != null) - try { - in.close(); - } catch (IOException e) { - log.error("cannot close", e); - } - } - } - - public static void deserializeWritable(Writable writable, byte[] objectData) { - Preconditions.checkNotNull(objectData); - deserializeWritable(writable, new ByteArrayInputStream(objectData)); - } - - // Serialize - // ----------------------------------------------------------------------- - - /** - *

- * Serializes an {@code Object} to the specified stream. - *

- *

- *

- * The stream will be closed once the object is written. This avoids the need for a finally clause, and maybe also exception handling, in the application - * code. - *

- *

- *

- * The stream passed in is not buffered internally within this method. This is the responsibility of your application if desired. - *

- * - * @param obj - * the object to serialize to bytes, may be null - * @param outputStream - * the stream to write to, must not be null - * @throws IllegalArgumentException - * if {@code outputStream} is {@code null} - */ - public static void serialize(Serializable obj, OutputStream outputStream) { - Preconditions.checkNotNull(outputStream); - ObjectOutputStream out = null; - try { - out = new ObjectOutputStream(outputStream); - out.writeObject(obj); - } catch (IOException ex) { - throw new RuntimeException(ex); - } finally { - if (out != null) - try { - out.close(); - } catch (IOException e) { - log.error("cannot close", e); - } - } - } - - /** - *

- * Serializes an {@code Object} to a byte array for storage/serialization. - *

- * - * @param obj - * the object to serialize to bytes - * @return a byte[] with the converted Serializable - */ - public static byte[] serialize(Serializable obj) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - serialize(obj, baos); - return baos.toByteArray(); - } - - // Deserialize - // ----------------------------------------------------------------------- - - /** - *

- * Deserializes an {@code Object} from the specified stream. - *

- *

- *

- * The stream will be closed once the object is written. This avoids the need for a finally clause, and maybe also exception handling, in the application - * code. - *

- *

- *

- * The stream passed in is not buffered internally within this method. This is the responsibility of your application if desired. - *

- * - * @param inputStream - * the serialized object input stream, must not be null - * @return the deserialized object - * @throws IllegalArgumentException - * if {@code inputStream} is {@code null} - */ - public static Object deserialize(InputStream inputStream) { - Preconditions.checkNotNull(inputStream); - ObjectInputStream in = null; - try { - in = new ObjectInputStream(inputStream); - return in.readObject(); - } catch (ClassNotFoundException ex) { - throw new RuntimeException(ex); - } catch (IOException ex) { - throw new RuntimeException(ex); - } finally { - if (in != null) - try { - in.close(); - } catch (IOException e) { - log.error("cannot close", e); - } - } - } - - /** - *

- * Deserializes a single {@code Object} from an array of bytes. - *

- * - * @param objectData - * the serialized object, must not be null - * @return the deserialized object - * @throws IllegalArgumentException - * if {@code objectData} is {@code null} - */ - public static Object deserialize(byte[] objectData) { - Preconditions.checkNotNull(objectData); - return deserialize(new ByteArrayInputStream(objectData)); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/9aac9c0f/test/src/main/java/org/apache/accumulo/test/util/SerializationUtil.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/util/SerializationUtil.java b/test/src/main/java/org/apache/accumulo/test/util/SerializationUtil.java new file mode 100644 index 0000000..b683139 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/util/SerializationUtil.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.util; + +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.io.Writable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.Objects; + +/** + * Partially based from {@link org.apache.commons.lang3.SerializationUtils}. + * + *

+ * For serializing and de-serializing objects. + *

+ */ +public class SerializationUtil { + private static final Logger log = LoggerFactory.getLogger(SerializationUtil.class); + + private SerializationUtil() {} + + /** + * Create a new instance of a class whose name is given, as a descendent of a given subclass. + */ + public static E subclassNewInstance(String classname, Class parentClass) { + Class c; + try { + c = Class.forName(classname); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Can't find class: " + classname, e); + } + Class cm; + try { + cm = c.asSubclass(parentClass); + } catch (ClassCastException e) { + throw new IllegalArgumentException(classname + " is not a subclass of " + parentClass.getName(), e); + } + try { + return cm.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new IllegalArgumentException("can't instantiate new instance of " + cm.getName(), e); + } + } + + public static String serializeWritableBase64(Writable writable) { + byte[] b = serializeWritable(writable); + return org.apache.accumulo.core.util.Base64.encodeBase64String(b); + } + + public static void deserializeWritableBase64(Writable writable, String str) { + byte[] b = Base64.decodeBase64(str); + deserializeWritable(writable, b); + } + + public static String serializeBase64(Serializable obj) { + byte[] b = serialize(obj); + return org.apache.accumulo.core.util.Base64.encodeBase64String(b); + } + + public static Object deserializeBase64(String str) { + byte[] b = Base64.decodeBase64(str); + return deserialize(b); + } + + // Interop with Hadoop Writable + // ----------------------------------------------------------------------- + + public static byte[] serializeWritable(Writable writable) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(512); + serializeWritable(writable, baos); + return baos.toByteArray(); + } + + public static void serializeWritable(Writable obj, OutputStream outputStream) { + Objects.requireNonNull(obj); + Objects.requireNonNull(outputStream); + DataOutputStream out = null; + try { + out = new DataOutputStream(outputStream); + obj.write(out); + } catch (IOException ex) { + throw new RuntimeException(ex); + } finally { + if (out != null) + try { + out.close(); + } catch (IOException e) { + log.error("cannot close", e); + } + } + } + + public static void deserializeWritable(Writable writable, InputStream inputStream) { + Objects.requireNonNull(writable); + Objects.requireNonNull(inputStream); + DataInputStream in = null; + try { + in = new DataInputStream(inputStream); + writable.readFields(in); + } catch (IOException ex) { + throw new RuntimeException(ex); + } finally { + if (in != null) + try { + in.close(); + } catch (IOException e) { + log.error("cannot close", e); + } + } + } + + public static void deserializeWritable(Writable writable, byte[] objectData) { + Objects.requireNonNull(objectData); + deserializeWritable(writable, new ByteArrayInputStream(objectData)); + } + + // Serialize + // ----------------------------------------------------------------------- + + /** + *

+ * Serializes an {@code Object} to the specified stream. + *

+ *

+ *

+ * The stream will be closed once the object is written. This avoids the need for a finally clause, and maybe also exception handling, in the application + * code. + *

+ *

+ *

+ * The stream passed in is not buffered internally within this method. This is the responsibility of your application if desired. + *

+ * + * @param obj + * the object to serialize to bytes, may be null + * @param outputStream + * the stream to write to, must not be null + * @throws IllegalArgumentException + * if {@code outputStream} is {@code null} + */ + public static void serialize(Serializable obj, OutputStream outputStream) { + Objects.requireNonNull(outputStream); + ObjectOutputStream out = null; + try { + out = new ObjectOutputStream(outputStream); + out.writeObject(obj); + } catch (IOException ex) { + throw new RuntimeException(ex); + } finally { + if (out != null) + try { + out.close(); + } catch (IOException e) { + log.error("cannot close", e); + } + } + } + + /** + *

+ * Serializes an {@code Object} to a byte array for storage/serialization. + *

+ * + * @param obj + * the object to serialize to bytes + * @return a byte[] with the converted Serializable + */ + public static byte[] serialize(Serializable obj) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(512); + serialize(obj, baos); + return baos.toByteArray(); + } + + // Deserialize + // ----------------------------------------------------------------------- + + /** + *

+ * Deserializes an {@code Object} from the specified stream. + *

+ *

+ *

+ * The stream will be closed once the object is written. This avoids the need for a finally clause, and maybe also exception handling, in the application + * code. + *

+ *

+ *

+ * The stream passed in is not buffered internally within this method. This is the responsibility of your application if desired. + *

+ * + * @param inputStream + * the serialized object input stream, must not be null + * @return the deserialized object + * @throws IllegalArgumentException + * if {@code inputStream} is {@code null} + */ + public static Object deserialize(InputStream inputStream) { + Objects.requireNonNull(inputStream); + ObjectInputStream in = null; + try { + in = new ObjectInputStream(inputStream); + return in.readObject(); + } catch (ClassNotFoundException | IOException ex) { + throw new RuntimeException(ex); + } finally { + if (in != null) + try { + in.close(); + } catch (IOException e) { + log.error("cannot close", e); + } + } + } + + /** + *

+ * Deserializes a single {@code Object} from an array of bytes. + *

+ * + * @param objectData + * the serialized object, must not be null + * @return the deserialized object + * @throws IllegalArgumentException + * if {@code objectData} is {@code null} + */ + public static Object deserialize(byte[] objectData) { + Objects.requireNonNull(objectData); + return deserialize(new ByteArrayInputStream(objectData)); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/9aac9c0f/test/src/test/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java b/test/src/test/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java deleted file mode 100644 index 2551c8e..0000000 --- a/test/src/test/java/org/apache/accumulo/test/BatchWriterInTabletServerIT.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test; - -import com.google.common.collect.Iterators; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.PartialKey; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.LongCombiner; -import org.apache.accumulo.core.iterators.user.SummingCombiner; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.harness.AccumuloClusterIT; -import org.apache.hadoop.io.Text; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Map; - -/** - * Test writing to another table from inside an iterator. - * - * @see BatchWriterIterator - */ -public class BatchWriterInTabletServerIT extends AccumuloClusterIT { - - /** - * This test should succeed. - */ - @Test - public void testNormalWrite() throws Exception { - String[] uniqueNames = getUniqueNames(2); - String t1 = uniqueNames[0], t2 = uniqueNames[1]; - Connector c = getConnector(); - int numEntriesToWritePerEntry = 50; - IteratorSetting itset = BatchWriterIterator.iteratorSetting(6, 0, 15, 1000, numEntriesToWritePerEntry, t2, c, getToken(), false, false); - test(t1, t2, c, itset, numEntriesToWritePerEntry); - } - - /** - * ACCUMULO-4229 - *

- * This test should fail because the client shares a LocatorCache with the tablet server. Adding a split after the Locator cache falls out of sync causes the - * BatchWriter to continuously attempt to write to an old, closed tablet. It will only do so for 15 seconds because we set a timeout on the BatchWriter. - */ - @Test - public void testClearLocatorAndSplitWrite() throws Exception { - String[] uniqueNames = getUniqueNames(2); - String t1 = uniqueNames[0], t2 = uniqueNames[1]; - Connector c = getConnector(); - int numEntriesToWritePerEntry = 50; - IteratorSetting itset = BatchWriterIterator.iteratorSetting(6, 0, 15, 1000, numEntriesToWritePerEntry, t2, c, getToken(), true, true); - test(t1, t2, c, itset, numEntriesToWritePerEntry); - } - - private void test(String t1, String t2, Connector c, IteratorSetting itset, int numEntriesToWritePerEntry) throws Exception { - // Write an entry to t1 - c.tableOperations().create(t1); - Key k = new Key(new Text("row"), new Text("cf"), new Text("cq")); - Value v = new Value("1".getBytes()); - { - BatchWriterConfig config = new BatchWriterConfig(); - config.setMaxMemory(0); - BatchWriter writer = c.createBatchWriter(t1, config); - Mutation m = new Mutation(k.getRow()); - m.put(k.getColumnFamily(), k.getColumnQualifier(), v); - writer.addMutation(m); - writer.close(); - } - - // Create t2 with a combiner to count entries written to it - c.tableOperations().create(t2); - IteratorSetting summer = new IteratorSetting(2, "summer", SummingCombiner.class); - LongCombiner.setEncodingType(summer, LongCombiner.Type.STRING); - LongCombiner.setCombineAllColumns(summer, true); - c.tableOperations().attachIterator(t2, summer); - - Map.Entry actual; - // Scan t1 with an iterator that writes to table t2 - Scanner scanner = c.createScanner(t1, Authorizations.EMPTY); - scanner.addScanIterator(itset); - actual = Iterators.getOnlyElement(scanner.iterator()); - Assert.assertTrue(actual.getKey().equals(k, PartialKey.ROW_COLFAM_COLQUAL)); - Assert.assertEquals(BatchWriterIterator.SUCCESS_VALUE, actual.getValue()); - scanner.close(); - - // ensure entries correctly wrote to table t2 - scanner = c.createScanner(t2, Authorizations.EMPTY); - actual = Iterators.getOnlyElement(scanner.iterator()); - // System.out.println("t2 entry is " + actual.getKey().toStringNoTime() + " -> " + actual.getValue()); - Assert.assertTrue(actual.getKey().equals(k, PartialKey.ROW_COLFAM_COLQUAL)); - Assert.assertEquals(numEntriesToWritePerEntry, Integer.parseInt(actual.getValue().toString())); - scanner.close(); - - c.tableOperations().delete(t1); - c.tableOperations().delete(t2); - } - -}