Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 36422200CF8 for ; Wed, 26 Jul 2017 01:02:57 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 34D22167A29; Tue, 25 Jul 2017 23:02:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 418BF167A2D for ; Wed, 26 Jul 2017 01:02:54 +0200 (CEST) Received: (qmail 45627 invoked by uid 500); 25 Jul 2017 23:02:53 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 44477 invoked by uid 99); 25 Jul 2017 23:02:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Jul 2017 23:02:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9251DDFF8A; Tue, 25 Jul 2017 23:02:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ctubbsii@apache.org To: commits@accumulo.apache.org Date: Tue, 25 Jul 2017 23:03:07 -0000 Message-Id: In-Reply-To: <6fff3a8c435a4995b3f2172478ec7b90@git.apache.org> References: <6fff3a8c435a4995b3f2172478ec7b90@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [19/24] accumulo git commit: Merge branch '1.7' into 1.8 archived-at: Tue, 25 Jul 2017 23:02:57 -0000 http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java index a83b0e2,0000000..8455a40 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java @@@ -1,382 -1,0 +1,382 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + ++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.DiskUsage; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.constraints.DefaultKeySizeConstraint; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.test.functional.BadIterator; +import org.apache.accumulo.test.functional.FunctionalTestUtils; +import org.apache.hadoop.io.Text; +import org.apache.thrift.TException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Sets; - import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + +public class TableOperationsIT extends AccumuloClusterHarness { + + static TabletClientService.Client client; + + private Connector connector; + + @Override + public int defaultTimeoutSeconds() { + return 30; + } + + @Before + public void setup() throws Exception { + connector = getConnector(); + } + + @After + public void checkForDanglingFateLocks() { + FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster()); + } + + @Test + public void getDiskUsageErrors() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException, TException { + String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + List diskUsage = connector.tableOperations().getDiskUsage(Collections.singleton(tableName)); + assertEquals(1, diskUsage.size()); + assertEquals(0, (long) diskUsage.get(0).getUsage()); + assertEquals(tableName, diskUsage.get(0).getTables().iterator().next()); + + connector.securityOperations().revokeTablePermission(getAdminPrincipal(), tableName, TablePermission.READ); + try { + connector.tableOperations().getDiskUsage(Collections.singleton(tableName)); + fail("Should throw securityexception"); + } catch (AccumuloSecurityException e) {} + + connector.tableOperations().delete(tableName); + try { + connector.tableOperations().getDiskUsage(Collections.singleton(tableName)); + fail("Should throw tablenotfound"); + } catch (TableNotFoundException e) {} + } + + @Test + public void getDiskUsage() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException, TException { + final String[] names = getUniqueNames(2); + String tableName = names[0]; + connector.tableOperations().create(tableName); + + // verify 0 disk usage + List diskUsages = connector.tableOperations().getDiskUsage(Collections.singleton(tableName)); + assertEquals(1, diskUsages.size()); + assertEquals(1, diskUsages.get(0).getTables().size()); + assertEquals(Long.valueOf(0), diskUsages.get(0).getUsage()); + assertEquals(tableName, diskUsages.get(0).getTables().first()); + + // add some data + BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig()); + Mutation m = new Mutation("a"); + m.put("b", "c", new Value("abcde".getBytes())); + bw.addMutation(m); + bw.flush(); + bw.close(); + + connector.tableOperations().compact(tableName, new Text("A"), new Text("z"), true, true); + + // verify we have usage + diskUsages = connector.tableOperations().getDiskUsage(Collections.singleton(tableName)); + assertEquals(1, diskUsages.size()); + assertEquals(1, diskUsages.get(0).getTables().size()); + assertTrue(diskUsages.get(0).getUsage() > 0); + assertEquals(tableName, diskUsages.get(0).getTables().first()); + + String newTable = names[1]; + + // clone table + connector.tableOperations().clone(tableName, newTable, false, null, null); + + // verify tables are exactly the same + Set tables = new HashSet<>(); + tables.add(tableName); + tables.add(newTable); + diskUsages = connector.tableOperations().getDiskUsage(tables); + assertEquals(1, diskUsages.size()); + assertEquals(2, diskUsages.get(0).getTables().size()); + assertTrue(diskUsages.get(0).getUsage() > 0); + + connector.tableOperations().compact(tableName, new Text("A"), new Text("z"), true, true); + connector.tableOperations().compact(newTable, new Text("A"), new Text("z"), true, true); + + // verify tables have differences + diskUsages = connector.tableOperations().getDiskUsage(tables); + assertEquals(2, diskUsages.size()); + assertEquals(1, diskUsages.get(0).getTables().size()); + assertEquals(1, diskUsages.get(1).getTables().size()); + assertTrue(diskUsages.get(0).getUsage() > 0); + assertTrue(diskUsages.get(1).getUsage() > 0); + + connector.tableOperations().delete(tableName); + } + + @Test + public void createTable() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException { + String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + Iterable> itrProps = connector.tableOperations().getProperties(tableName); + Map props = propsToMap(itrProps); + assertEquals(DefaultKeySizeConstraint.class.getName(), props.get(Property.TABLE_CONSTRAINT_PREFIX.toString() + "1")); + connector.tableOperations().delete(tableName); + } + + @Test + public void createMergeClonedTable() throws Exception { + String[] names = getUniqueNames(2); + String originalTable = names[0]; + TableOperations tops = connector.tableOperations(); + + TreeSet splits = Sets.newTreeSet(Arrays.asList(new Text("a"), new Text("b"), new Text("c"), new Text("d"))); + + tops.create(originalTable); + tops.addSplits(originalTable, splits); + + BatchWriter bw = connector.createBatchWriter(originalTable, new BatchWriterConfig()); + for (Text row : splits) { + Mutation m = new Mutation(row); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + m.put(Integer.toString(i), Integer.toString(j), Integer.toString(i + j)); + } + } + + bw.addMutation(m); + } + + bw.close(); + + String clonedTable = names[1]; + + tops.clone(originalTable, clonedTable, true, null, null); + tops.merge(clonedTable, null, new Text("b")); + + Map rowCounts = new HashMap<>(); + Scanner s = connector.createScanner(clonedTable, new Authorizations()); + for (Entry entry : s) { + final Key key = entry.getKey(); + String row = key.getRow().toString(); + String cf = key.getColumnFamily().toString(), cq = key.getColumnQualifier().toString(); + String value = entry.getValue().toString(); + + if (rowCounts.containsKey(row)) { + rowCounts.put(row, rowCounts.get(row) + 1); + } else { + rowCounts.put(row, 1); + } + + Assert.assertEquals(Integer.parseInt(cf) + Integer.parseInt(cq), Integer.parseInt(value)); + } + + Collection clonedSplits = tops.listSplits(clonedTable); + Set expectedSplits = Sets.newHashSet(new Text("b"), new Text("c"), new Text("d")); + for (Text clonedSplit : clonedSplits) { + Assert.assertTrue("Encountered unexpected split on the cloned table: " + clonedSplit, expectedSplits.remove(clonedSplit)); + } + + Assert.assertTrue("Did not find all expected splits on the cloned table: " + expectedSplits, expectedSplits.isEmpty()); + } + + private Map propsToMap(Iterable> props) { + Map map = new HashMap<>(); + for (Map.Entry prop : props) { + map.put(prop.getKey(), prop.getValue()); + } + return map; + } + + @Test + public void testCompactEmptyTableWithGeneratorIterator() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException { + String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + + List list = new ArrayList<>(); + list.add(new IteratorSetting(15, HardListIterator.class)); + connector.tableOperations().compact(tableName, null, null, list, true, true); + + Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY); + Map actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ + for (Map.Entry entry : scanner) + actual.put(entry.getKey(), entry.getValue()); + assertEquals(HardListIterator.allEntriesToInject, actual); + connector.tableOperations().delete(tableName); + } + + /** Compare only the row, column family and column qualifier. */ + static class KeyRowColFColQComparator implements Comparator { + @Override + public int compare(Key k1, Key k2) { + return k1.compareTo(k2, PartialKey.ROW_COLFAM_COLQUAL); + } + } + + static final KeyRowColFColQComparator COMPARE_KEY_TO_COLQ = new KeyRowColFColQComparator(); + + @Test + public void testCompactEmptyTableWithGeneratorIterator_Splits() throws TableExistsException, AccumuloException, AccumuloSecurityException, + TableNotFoundException { + String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + SortedSet splitset = new TreeSet<>(); + splitset.add(new Text("f")); + connector.tableOperations().addSplits(tableName, splitset); + + List list = new ArrayList<>(); + list.add(new IteratorSetting(15, HardListIterator.class)); + connector.tableOperations().compact(tableName, null, null, list, true, true); + + Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY); + Map actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ + for (Map.Entry entry : scanner) + actual.put(entry.getKey(), entry.getValue()); + assertEquals(HardListIterator.allEntriesToInject, actual); + connector.tableOperations().delete(tableName); + } + + @Test + public void testCompactEmptyTableWithGeneratorIterator_Splits_Cancel() throws TableExistsException, AccumuloException, AccumuloSecurityException, + TableNotFoundException { + String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + SortedSet splitset = new TreeSet<>(); + splitset.add(new Text("f")); + connector.tableOperations().addSplits(tableName, splitset); + + List list = new ArrayList<>(); + list.add(new IteratorSetting(15, HardListIterator.class)); + connector.tableOperations().compact(tableName, null, null, list, true, false); // don't block + connector.tableOperations().cancelCompaction(tableName); + // depending on timing, compaction will finish or be canceled + + Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY); + Map actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ + for (Map.Entry entry : scanner) + actual.put(entry.getKey(), entry.getValue()); + switch (actual.size()) { + case 3: + // Compaction cancel didn't happen in time + assertTrue(HardListIterator.allEntriesToInject.equals(actual)); + break; + case 2: + // Compacted the first tablet (-inf, f) + assertEquals(HardListIterator.allEntriesToInject.headMap(new Key("f")), actual); + break; + case 1: + // Compacted the second tablet [f, +inf) + assertEquals(HardListIterator.allEntriesToInject.tailMap(new Key("f")), actual); + break; + case 0: + // Cancelled the compaction before it ran. No generated entries. + break; + default: + Assert.fail("Unexpected number of entries"); + break; + } + connector.tableOperations().delete(tableName); + } + + @Test + public void testCompactEmptyTableWithGeneratorIterator_Splits_Partial() throws TableExistsException, AccumuloException, AccumuloSecurityException, + TableNotFoundException { + String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + Text splitRow = new Text("f"); + SortedSet splitset = new TreeSet<>(); + splitset.add(splitRow); + connector.tableOperations().addSplits(tableName, splitset); + + List list = new ArrayList<>(); + list.add(new IteratorSetting(15, HardListIterator.class)); + // compact the second tablet, not the first + connector.tableOperations().compact(tableName, splitRow, null, list, true, true); + + Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY); + Map actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ + for (Map.Entry entry : scanner) + actual.put(entry.getKey(), entry.getValue()); + // only expect the entries in the second tablet + assertEquals(HardListIterator.allEntriesToInject.tailMap(new Key(splitRow)), actual); + connector.tableOperations().delete(tableName); + } + + /** Test recovery from bad majc iterator via compaction cancel. */ + @Test + public void testCompactEmptyTablesWithBadIterator_FailsAndCancel() throws TableExistsException, AccumuloException, AccumuloSecurityException, + TableNotFoundException { + String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + + List list = new ArrayList<>(); + list.add(new IteratorSetting(15, BadIterator.class)); + connector.tableOperations().compact(tableName, null, null, list, true, false); // don't block + sleepUninterruptibly(2, TimeUnit.SECONDS); // start compaction + connector.tableOperations().cancelCompaction(tableName); + + Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY); + Map actual = new TreeMap<>(); + for (Map.Entry entry : scanner) + actual.put(entry.getKey(), entry.getValue()); + assertTrue("Should be empty. Actual is " + actual, actual.isEmpty()); + connector.tableOperations().delete(tableName); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java index 33c1798,0000000..bad1a55 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java +++ b/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java @@@ -1,77 -1,0 +1,76 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + ++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static org.junit.Assert.assertEquals; + +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.Test; + - import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; - +// ACCUMULO-2480 +public class TabletServerGivesUpIT extends ConfigurableMacBase { + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.useMiniDFS(true); + cfg.setNumTservers(1); + cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + cfg.setProperty(Property.TSERV_WALOG_TOLERATED_CREATION_FAILURES, "15"); + cfg.setProperty(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION, "0s"); + } + + @Test(timeout = 30 * 1000) + public void test() throws Exception { + final Connector conn = this.getConnector(); + // Yes, there's a tabletserver + assertEquals(1, conn.instanceOperations().getTabletServers().size()); + final String tableName = getUniqueNames(1)[0]; + conn.tableOperations().create(tableName); + // Kill dfs + cluster.getMiniDfs().shutdown(); + // ask the tserver to do something + final AtomicReference ex = new AtomicReference<>(); + Thread splitter = new Thread() { + @Override + public void run() { + try { + TreeSet splits = new TreeSet<>(); + splits.add(new Text("X")); + conn.tableOperations().addSplits(tableName, splits); + } catch (Exception e) { + ex.set(e); + } + } + }; + splitter.start(); + // wait for the tserver to give up on writing to the WAL + while (conn.instanceOperations().getTabletServers().size() == 1) { + sleepUninterruptibly(1, TimeUnit.SECONDS); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java index be800ad,0000000..ea3f680 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java +++ b/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java @@@ -1,131 -1,0 +1,131 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + ++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static org.junit.Assert.assertTrue; + +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.minicluster.MemoryUnit; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.AccumuloServerContext; +import org.apache.accumulo.server.conf.ServerConfigurationFactory; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +import com.google.common.net.HostAndPort; - import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + +// see ACCUMULO-1950 +public class TotalQueuedIT extends ConfigurableMacBase { + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(1); + cfg.setDefaultMemory(cfg.getDefaultMemory() * 2, MemoryUnit.BYTE); + cfg.useMiniDFS(); + } + + int SMALL_QUEUE_SIZE = 100000; + int LARGE_QUEUE_SIZE = SMALL_QUEUE_SIZE * 10; + static final long N = 1000000; + + @Test(timeout = 4 * 60 * 1000) + public void test() throws Exception { + Random random = new Random(); + Connector c = getConnector(); + c.instanceOperations().setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), "" + SMALL_QUEUE_SIZE); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "9999"); + c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "999"); + sleepUninterruptibly(1, TimeUnit.SECONDS); + // get an idea of how fast the syncs occur + byte row[] = new byte[250]; + BatchWriterConfig cfg = new BatchWriterConfig(); + cfg.setMaxWriteThreads(10); + cfg.setMaxLatency(1, TimeUnit.SECONDS); + cfg.setMaxMemory(1024 * 1024); + long realSyncs = getSyncs(); + BatchWriter bw = c.createBatchWriter(tableName, cfg); + long now = System.currentTimeMillis(); + long bytesSent = 0; + for (int i = 0; i < N; i++) { + random.nextBytes(row); + Mutation m = new Mutation(row); + m.put("", "", ""); + bw.addMutation(m); + bytesSent += m.estimatedMemoryUsed(); + } + bw.close(); + long diff = System.currentTimeMillis() - now; + double secs = diff / 1000.; + double syncs = bytesSent / SMALL_QUEUE_SIZE; + double syncsPerSec = syncs / secs; + System.out.println(String.format("Sent %d bytes in %f secs approximately %d syncs (%f syncs per sec)", bytesSent, secs, ((long) syncs), syncsPerSec)); + long update = getSyncs(); + System.out.println("Syncs " + (update - realSyncs)); + realSyncs = update; + + // Now with a much bigger total queue + c.instanceOperations().setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), "" + LARGE_QUEUE_SIZE); + c.tableOperations().flush(tableName, null, null, true); + sleepUninterruptibly(1, TimeUnit.SECONDS); + bw = c.createBatchWriter(tableName, cfg); + now = System.currentTimeMillis(); + bytesSent = 0; + for (int i = 0; i < N; i++) { + random.nextBytes(row); + Mutation m = new Mutation(row); + m.put("", "", ""); + bw.addMutation(m); + bytesSent += m.estimatedMemoryUsed(); + } + bw.close(); + diff = System.currentTimeMillis() - now; + secs = diff / 1000.; + syncs = bytesSent / LARGE_QUEUE_SIZE; + syncsPerSec = syncs / secs; + System.out.println(String.format("Sent %d bytes in %f secs approximately %d syncs (%f syncs per sec)", bytesSent, secs, ((long) syncs), syncsPerSec)); + update = getSyncs(); + System.out.println("Syncs " + (update - realSyncs)); + assertTrue(update - realSyncs < realSyncs); + } + + private long getSyncs() throws Exception { + Connector c = getConnector(); + ServerConfigurationFactory confFactory = new ServerConfigurationFactory(c.getInstance()); + AccumuloServerContext context = new AccumuloServerContext(confFactory); + for (String address : c.instanceOperations().getTabletServers()) { + TabletClientService.Client client = ThriftUtil.getTServerClient(HostAndPort.fromString(address), context); + TabletServerStatus status = client.getTabletServerStatus(null, context.rpcCreds()); + return status.syncs; + } + return 0; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/TracerRecoversAfterOfflineTableIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/TracerRecoversAfterOfflineTableIT.java index 15609f6,0000000..2c4d970 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/TracerRecoversAfterOfflineTableIT.java +++ b/test/src/main/java/org/apache/accumulo/test/TracerRecoversAfterOfflineTableIT.java @@@ -1,130 -1,0 +1,129 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + ++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.trace.DistributedTrace; +import org.apache.accumulo.core.trace.Span; +import org.apache.accumulo.core.trace.Trace; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.accumulo.tracer.TraceDump; +import org.apache.accumulo.tracer.TraceDump.Printer; +import org.apache.accumulo.tracer.TraceServer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.Test; + - import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; - +/** + * + */ +public class TracerRecoversAfterOfflineTableIT extends ConfigurableMacBase { + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + cfg.setNumTservers(1); + } + + @Override + public int defaultTimeoutSeconds() { + return 60; + } + + @Test + public void test() throws Exception { + Process tracer = null; + Connector conn = getConnector(); + if (!conn.tableOperations().exists("trace")) { + MiniAccumuloClusterImpl mac = cluster; + tracer = mac.exec(TraceServer.class); + while (!conn.tableOperations().exists("trace")) { + sleepUninterruptibly(1, TimeUnit.SECONDS); + } + sleepUninterruptibly(5, TimeUnit.SECONDS); + } + + log.info("Taking table offline"); + conn.tableOperations().offline("trace", true); + + String tableName = getUniqueNames(1)[0]; + conn.tableOperations().create(tableName); + + log.info("Start a distributed trace span"); + + DistributedTrace.enable("localhost", "testTrace", getClientConfig()); + Span root = Trace.on("traceTest"); + BatchWriter bw = conn.createBatchWriter(tableName, null); + Mutation m = new Mutation("m"); + m.put("a", "b", "c"); + bw.addMutation(m); + bw.close(); + root.stop(); + + log.info("Bringing trace table back online"); + conn.tableOperations().online("trace", true); + + log.info("Trace table is online, should be able to find trace"); + + final Scanner scanner = conn.createScanner("trace", Authorizations.EMPTY); + scanner.setRange(new Range(new Text(Long.toHexString(root.traceId())))); + while (true) { + final StringBuilder finalBuffer = new StringBuilder(); + int traceCount = TraceDump.printTrace(scanner, new Printer() { + @Override + public void print(final String line) { + try { + finalBuffer.append(line).append("\n"); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + }); + String traceOutput = finalBuffer.toString(); + log.info("Trace output:" + traceOutput); + if (traceCount > 0) { + int lastPos = 0; + for (String part : "traceTest,close,binMutations".split(",")) { + log.info("Looking in trace output for '" + part + "'"); + int pos = traceOutput.indexOf(part); + assertTrue("Did not find '" + part + "' in output", pos > 0); + assertTrue("'" + part + "' occurred earlier than the previous element unexpectedly", pos > lastPos); + lastPos = pos; + } + break; + } else { + log.info("Ignoring trace output as traceCount not greater than zero: " + traceCount); + Thread.sleep(1000); + } + } + if (tracer != null) { + tracer.destroy(); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/YieldScannersIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/YieldScannersIT.java index 4d86dd3,0000000..07a7c40 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/YieldScannersIT.java +++ b/test/src/main/java/org/apache/accumulo/test/YieldScannersIT.java @@@ -1,161 -1,0 +1,161 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + ++import java.util.Collections; ++import java.util.Iterator; ++import java.util.Map; ++ +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.YieldingIterator; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.Test; - - import java.util.Collections; - import java.util.Iterator; - import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// ACCUMULO-4643 +public class YieldScannersIT extends AccumuloClusterHarness { + Logger log = LoggerFactory.getLogger(YieldScannersIT.class); + private static final char START_ROW = 'a'; + + @Override + public int defaultTimeoutSeconds() { + return 60; + } + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(1); + } + + @Test + public void testScan() throws Exception { + // make a table + final String tableName = getUniqueNames(1)[0]; + final Connector conn = getConnector(); + conn.tableOperations().create(tableName); + final BatchWriter writer = conn.createBatchWriter(tableName, new BatchWriterConfig()); + for (int i = 0; i < 10; i++) { + byte[] row = new byte[] {(byte) (START_ROW + i)}; + Mutation m = new Mutation(new Text(row)); + m.put(new Text(), new Text(), new Value()); + writer.addMutation(m); + } + writer.flush(); + writer.close(); + + log.info("Creating scanner"); + // make a scanner for a table with 10 keys + final Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY); + final IteratorSetting cfg = new IteratorSetting(100, YieldingIterator.class); + scanner.addScanIterator(cfg); + + log.info("iterating"); + Iterator> it = scanner.iterator(); + int keyCount = 0; + int yieldNextCount = 0; + int yieldSeekCount = 0; + while (it.hasNext()) { + Map.Entry next = it.next(); + log.info(Integer.toString(keyCount) + ": Got key " + next.getKey() + " with value " + next.getValue()); + + // verify we got the expected key + char expected = (char) (START_ROW + keyCount); + Assert.assertEquals("Unexpected row", Character.toString(expected), next.getKey().getRow().toString()); + + // determine whether we yielded on a next and seek + if ((keyCount & 1) != 0) { + yieldNextCount++; + yieldSeekCount++; + } + String[] value = StringUtils.split(next.getValue().toString(), ','); + Assert.assertEquals("Unexpected yield next count", Integer.toString(yieldNextCount), value[0]); + Assert.assertEquals("Unexpected yield seek count", Integer.toString(yieldSeekCount), value[1]); + Assert.assertEquals("Unexpected rebuild count", Integer.toString(yieldNextCount + yieldSeekCount), value[2]); + + keyCount++; + } + Assert.assertEquals("Did not get the expected number of results", 10, keyCount); + } + + @Test + public void testBatchScan() throws Exception { + // make a table + final String tableName = getUniqueNames(1)[0]; + final Connector conn = getConnector(); + conn.tableOperations().create(tableName); + final BatchWriter writer = conn.createBatchWriter(tableName, new BatchWriterConfig()); + for (int i = 0; i < 10; i++) { + byte[] row = new byte[] {(byte) (START_ROW + i)}; + Mutation m = new Mutation(new Text(row)); + m.put(new Text(), new Text(), new Value()); + writer.addMutation(m); + } + writer.flush(); + writer.close(); + + log.info("Creating batch scanner"); + // make a scanner for a table with 10 keys + final BatchScanner scanner = conn.createBatchScanner(tableName, Authorizations.EMPTY, 1); + final IteratorSetting cfg = new IteratorSetting(100, YieldingIterator.class); + scanner.addScanIterator(cfg); + scanner.setRanges(Collections.singleton(new Range())); + + log.info("iterating"); + Iterator> it = scanner.iterator(); + int keyCount = 0; + int yieldNextCount = 0; + int yieldSeekCount = 0; + while (it.hasNext()) { + Map.Entry next = it.next(); + log.info(Integer.toString(keyCount) + ": Got key " + next.getKey() + " with value " + next.getValue()); + + // verify we got the expected key + char expected = (char) (START_ROW + keyCount); + Assert.assertEquals("Unexpected row", Character.toString(expected), next.getKey().getRow().toString()); + + // determine whether we yielded on a next and seek + if ((keyCount & 1) != 0) { + yieldNextCount++; + yieldSeekCount++; + } + String[] value = StringUtils.split(next.getValue().toString(), ','); + Assert.assertEquals("Unexpected yield next count", Integer.toString(yieldNextCount), value[0]); + Assert.assertEquals("Unexpected yield seek count", Integer.toString(yieldSeekCount), value[1]); + Assert.assertEquals("Unexpected rebuild count", Integer.toString(yieldNextCount + yieldSeekCount), value[2]); + + keyCount++; + } + Assert.assertEquals("Did not get the expected number of results", 10, keyCount); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java index e08be10,50595d7..73fe806 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java @@@ -16,6 -16,6 +16,8 @@@ */ package org.apache.accumulo.test.continuous; ++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; ++ import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java index 63709df,a77de3d..fa53a64 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java @@@ -16,6 -16,6 +16,7 @@@ */ package org.apache.accumulo.test.continuous; ++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.nio.charset.StandardCharsets.UTF_8; import java.util.Iterator; http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java index 0558c7f,0000000..7f9ac6d mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java @@@ -1,144 -1,0 +1,143 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + ++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.hadoop.io.Text; +import org.junit.Test; + - import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; - +public class AddSplitIT extends AccumuloClusterHarness { + + @Override + protected int defaultTimeoutSeconds() { + return 60; + } + + @Test + public void addSplitTest() throws Exception { + + String tableName = getUniqueNames(1)[0]; + Connector c = getConnector(); + c.tableOperations().create(tableName); + + insertData(tableName, 1l); + + TreeSet splits = new TreeSet<>(); + splits.add(new Text(String.format("%09d", 333))); + splits.add(new Text(String.format("%09d", 666))); + + c.tableOperations().addSplits(tableName, splits); + + sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + + Collection actualSplits = c.tableOperations().listSplits(tableName); + + if (!splits.equals(new TreeSet<>(actualSplits))) { + throw new Exception(splits + " != " + actualSplits); + } + + verifyData(tableName, 1l); + insertData(tableName, 2l); + + // did not clear splits on purpose, it should ignore existing split points + // and still create the three additional split points + + splits.add(new Text(String.format("%09d", 200))); + splits.add(new Text(String.format("%09d", 500))); + splits.add(new Text(String.format("%09d", 800))); + + c.tableOperations().addSplits(tableName, splits); + + sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + + actualSplits = c.tableOperations().listSplits(tableName); + + if (!splits.equals(new TreeSet<>(actualSplits))) { + throw new Exception(splits + " != " + actualSplits); + } + + verifyData(tableName, 2l); + } + + private void verifyData(String tableName, long ts) throws Exception { + Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY); + + Iterator> iter = scanner.iterator(); + + for (int i = 0; i < 10000; i++) { + if (!iter.hasNext()) { + throw new Exception("row " + i + " not found"); + } + + Entry entry = iter.next(); + + String row = String.format("%09d", i); + + if (!entry.getKey().getRow().equals(new Text(row))) { + throw new Exception("unexpected row " + entry.getKey() + " " + i); + } + + if (entry.getKey().getTimestamp() != ts) { + throw new Exception("unexpected ts " + entry.getKey() + " " + ts); + } + + if (Integer.parseInt(entry.getValue().toString()) != i) { + throw new Exception("unexpected value " + entry + " " + i); + } + } + + if (iter.hasNext()) { + throw new Exception("found more than expected " + iter.next()); + } + + } + + private void insertData(String tableName, long ts) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, MutationsRejectedException { + BatchWriter bw = getConnector().createBatchWriter(tableName, null); + + for (int i = 0; i < 10000; i++) { + String row = String.format("%09d", i); + + Mutation m = new Mutation(new Text(row)); + m.put(new Text("cf1"), new Text("cq1"), ts, new Value(Integer.toString(i).getBytes(UTF_8))); + bw.addMutation(m); + } + + bw.close(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java index c730f9b,0000000..64fc905 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java @@@ -1,108 -1,0 +1,108 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + ++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertEquals; + +import java.util.EnumSet; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +import com.google.common.collect.Iterators; - import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + +public class BadIteratorMincIT extends AccumuloClusterHarness { + + @Override + protected int defaultTimeoutSeconds() { + return 60; + } + + @Test + public void test() throws Exception { + Connector c = getConnector(); + + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + IteratorSetting is = new IteratorSetting(30, BadIterator.class); + c.tableOperations().attachIterator(tableName, is, EnumSet.of(IteratorScope.minc)); + BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig()); + + Mutation m = new Mutation(new Text("r1")); + m.put(new Text("acf"), new Text(tableName), new Value("1".getBytes(UTF_8))); + + bw.addMutation(m); + bw.close(); + + c.tableOperations().flush(tableName, null, null, false); + sleepUninterruptibly(1, TimeUnit.SECONDS); + + // minc should fail, so there should be no files + FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 0, 0); + + // try to scan table + Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY); + int count = Iterators.size(scanner.iterator()); + assertEquals("Did not see expected # entries " + count, 1, count); + + // remove the bad iterator + c.tableOperations().removeIterator(tableName, BadIterator.class.getSimpleName(), EnumSet.of(IteratorScope.minc)); + + sleepUninterruptibly(5, TimeUnit.SECONDS); + + // minc should complete + FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 1, 1); + + count = Iterators.size(scanner.iterator()); + + if (count != 1) + throw new Exception("Did not see expected # entries " + count); + + // now try putting bad iterator back and deleting the table + c.tableOperations().attachIterator(tableName, is, EnumSet.of(IteratorScope.minc)); + bw = c.createBatchWriter(tableName, new BatchWriterConfig()); + m = new Mutation(new Text("r2")); + m.put(new Text("acf"), new Text(tableName), new Value("1".getBytes(UTF_8))); + bw.addMutation(m); + bw.close(); + + // make sure property is given time to propagate + sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + + c.tableOperations().flush(tableName, null, null, false); + + // make sure the flush has time to start + sleepUninterruptibly(1, TimeUnit.SECONDS); + + // this should not hang + c.tableOperations().delete(tableName); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java index 48ce3fe,0000000..528f486 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java @@@ -1,131 -1,0 +1,130 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + ++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map.Entry; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + - import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; - +public class BatchScanSplitIT extends AccumuloClusterHarness { + private static final Logger log = LoggerFactory.getLogger(BatchScanSplitIT.class); + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.TSERV_MAJC_DELAY, "0"); + } + + @Override + protected int defaultTimeoutSeconds() { + return 2 * 60; + } + + @Test + public void test() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + + int numRows = 1 << 18; + + BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig()); + + for (int i = 0; i < numRows; i++) { + Mutation m = new Mutation(new Text(String.format("%09x", i))); + m.put(new Text("cf1"), new Text("cq1"), new Value(String.format("%016x", numRows - i).getBytes(UTF_8))); + bw.addMutation(m); + } + + bw.close(); + + getConnector().tableOperations().flush(tableName, null, null, true); + + getConnector().tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "4K"); + + Collection splits = getConnector().tableOperations().listSplits(tableName); + while (splits.size() < 2) { + sleepUninterruptibly(1, TimeUnit.MILLISECONDS); + splits = getConnector().tableOperations().listSplits(tableName); + } + + System.out.println("splits : " + splits); + + Random random = new Random(19011230); + HashMap expected = new HashMap<>(); + ArrayList ranges = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + int r = random.nextInt(numRows); + Text row = new Text(String.format("%09x", r)); + expected.put(row, new Value(String.format("%016x", numRows - r).getBytes(UTF_8))); + ranges.add(new Range(row)); + } + + // logger.setLevel(Level.TRACE); + + HashMap found = new HashMap<>(); + + for (int i = 0; i < 20; i++) { + BatchScanner bs = getConnector().createBatchScanner(tableName, Authorizations.EMPTY, 4); + + found.clear(); + + long t1 = System.currentTimeMillis(); + + bs.setRanges(ranges); + + for (Entry entry : bs) { + found.put(entry.getKey().getRow(), entry.getValue()); + } + bs.close(); + + long t2 = System.currentTimeMillis(); + + log.info(String.format("rate : %06.2f%n", ranges.size() / ((t2 - t1) / 1000.0))); + + if (!found.equals(expected)) + throw new Exception("Found and expected differ " + found + " " + expected); + } + + splits = getConnector().tableOperations().listSplits(tableName); + log.info("splits : " + splits); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java index f243562,0000000..21539b2 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java @@@ -1,144 -1,0 +1,143 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + ++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.cli.ClientOpts.Password; +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.VerifyIngest; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + - import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; - +/** + * This test verifies that when a lot of files are bulk imported into a table with one tablet and then splits that not all map files go to the children tablets. + */ + +public class BulkSplitOptimizationIT extends AccumuloClusterHarness { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.TSERV_MAJC_DELAY, "1s"); + } + + @Override + protected int defaultTimeoutSeconds() { + return 2 * 60; + } + + private String majcDelay; + + @Before + public void alterConfig() throws Exception { + Connector conn = getConnector(); + majcDelay = conn.instanceOperations().getSystemConfiguration().get(Property.TSERV_MAJC_DELAY.getKey()); + if (!"1s".equals(majcDelay)) { + conn.instanceOperations().setProperty(Property.TSERV_MAJC_DELAY.getKey(), "1s"); + getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + getClusterControl().startAllServers(ServerType.TABLET_SERVER); + } + } + + @After + public void resetConfig() throws Exception { + if (null != majcDelay) { + Connector conn = getConnector(); + conn.instanceOperations().setProperty(Property.TSERV_MAJC_DELAY.getKey(), majcDelay); + getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + getClusterControl().startAllServers(ServerType.TABLET_SERVER); + } + } + + static final int ROWS = 100000; + static final int SPLITS = 99; + + @Test + public void testBulkSplitOptimization() throws Exception { + final Connector c = getConnector(); + final String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1000"); + c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "1000"); + c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "1G"); + FileSystem fs = cluster.getFileSystem(); + Path testDir = new Path(getUsableDir(), "testmf"); + FunctionalTestUtils.createRFiles(c, fs, testDir.toString(), ROWS, SPLITS, 8); + FileStatus[] stats = fs.listStatus(testDir); + + System.out.println("Number of generated files: " + stats.length); + FunctionalTestUtils.bulkImport(c, fs, tableName, testDir.toString()); + FunctionalTestUtils.checkSplits(c, tableName, 0, 0); + FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 100, 100); + + // initiate splits + getConnector().tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "100K"); + + sleepUninterruptibly(2, TimeUnit.SECONDS); + + // wait until over split threshold -- should be 78 splits + while (getConnector().tableOperations().listSplits(tableName).size() < 75) { + sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + } + + FunctionalTestUtils.checkSplits(c, tableName, 50, 100); + VerifyIngest.Opts opts = new VerifyIngest.Opts(); + opts.timestamp = 1; + opts.dataSize = 50; + opts.random = 56; + opts.rows = 100000; + opts.startRow = 0; + opts.cols = 1; + opts.setTableName(tableName); + + AuthenticationToken adminToken = getAdminToken(); + if (adminToken instanceof PasswordToken) { + PasswordToken token = (PasswordToken) getAdminToken(); + opts.setPassword(new Password(new String(token.getPassword(), UTF_8))); + opts.setPrincipal(getAdminPrincipal()); + } else if (adminToken instanceof KerberosToken) { + ClientConfiguration clientConf = cluster.getClientConfig(); + opts.updateKerberosCredentials(clientConf); + } else { + Assert.fail("Unknown token type"); + } + + VerifyIngest.verifyIngest(c, opts, new ScannerOpts()); + + // ensure each tablet does not have all map files, should be ~2.5 files per tablet + FunctionalTestUtils.checkRFiles(c, tableName, 50, 100, 1, 4); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java index 0988795,5abae1e..0703694 --- a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java @@@ -16,6 -16,6 +16,7 @@@ */ package org.apache.accumulo.test.functional; ++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.nio.charset.StandardCharsets.UTF_8; import java.io.File; http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/CacheTestWriter.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/CacheTestWriter.java index 50a0b0e,84e55a5..9f1dc67 --- a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestWriter.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestWriter.java @@@ -16,6 -16,6 +16,7 @@@ */ package org.apache.accumulo.test.functional; ++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.nio.charset.StandardCharsets.UTF_8; import java.io.File; http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/ClassLoaderIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/ClassLoaderIT.java index c553c14,0000000..9c2b71f mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ClassLoaderIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ClassLoaderIT.java @@@ -1,124 -1,0 +1,123 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + ++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.Combiner; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; - import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.accumulo.test.categories.MiniClusterOnlyTests; ++import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.hamcrest.CoreMatchers; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + - import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; - +@Category(MiniClusterOnlyTests.class) +public class ClassLoaderIT extends AccumuloClusterHarness { + + private static final long ZOOKEEPER_PROPAGATION_TIME = 10 * 1000; + + @Override + protected int defaultTimeoutSeconds() { + return 2 * 60; + } + + private String rootPath; + + @Before + public void checkCluster() { + Assume.assumeThat(getClusterType(), CoreMatchers.is(ClusterType.MINI)); + MiniAccumuloClusterImpl mac = (MiniAccumuloClusterImpl) getCluster(); + rootPath = mac.getConfig().getDir().getAbsolutePath(); + } + + private static void copyStreamToFileSystem(FileSystem fs, String jarName, Path path) throws IOException { + byte[] buffer = new byte[10 * 1024]; + try (FSDataOutputStream dest = fs.create(path); InputStream stream = ClassLoaderIT.class.getResourceAsStream(jarName)) { + while (true) { + int n = stream.read(buffer, 0, buffer.length); + if (n <= 0) { + break; + } + dest.write(buffer, 0, n); + } + } + } + + @Test + public void test() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig()); + Mutation m = new Mutation("row1"); + m.put("cf", "col1", "Test"); + bw.addMutation(m); + bw.close(); + scanCheck(c, tableName, "Test"); + FileSystem fs = getCluster().getFileSystem(); + Path jarPath = new Path(rootPath + "/lib/ext/Test.jar"); + copyStreamToFileSystem(fs, "/TestCombinerX.jar", jarPath); + sleepUninterruptibly(1, TimeUnit.SECONDS); + IteratorSetting is = new IteratorSetting(10, "TestCombiner", "org.apache.accumulo.test.functional.TestCombiner"); + Combiner.setColumns(is, Collections.singletonList(new IteratorSetting.Column("cf"))); + c.tableOperations().attachIterator(tableName, is, EnumSet.of(IteratorScope.scan)); + sleepUninterruptibly(ZOOKEEPER_PROPAGATION_TIME, TimeUnit.MILLISECONDS); + scanCheck(c, tableName, "TestX"); + fs.delete(jarPath, true); + copyStreamToFileSystem(fs, "/TestCombinerY.jar", jarPath); + sleepUninterruptibly(5, TimeUnit.SECONDS); + scanCheck(c, tableName, "TestY"); + fs.delete(jarPath, true); + } + + private void scanCheck(Connector c, String tableName, String expected) throws Exception { + Scanner bs = c.createScanner(tableName, Authorizations.EMPTY); + Iterator> iterator = bs.iterator(); + assertTrue(iterator.hasNext()); + Entry next = iterator.next(); + assertFalse(iterator.hasNext()); + assertEquals(expected, next.getValue().toString()); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/ConcurrencyIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/ConcurrencyIT.java index d462b53,0000000..929bb61 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ConcurrencyIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ConcurrencyIT.java @@@ -1,159 -1,0 +1,159 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + ++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.util.EnumSet; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +import com.google.common.collect.Iterators; - import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + +public class ConcurrencyIT extends AccumuloClusterHarness { + + static class ScanTask extends Thread { + + int count = 0; + Scanner scanner; + + ScanTask(Connector conn, String tableName, long time) throws Exception { + scanner = conn.createScanner(tableName, Authorizations.EMPTY); + IteratorSetting slow = new IteratorSetting(30, "slow", SlowIterator.class); + SlowIterator.setSleepTime(slow, time); + scanner.addScanIterator(slow); + } + + @Override + public void run() { + count = Iterators.size(scanner.iterator()); + } + + } + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + Map siteConfig = cfg.getSiteConfig(); + siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "1"); + cfg.setSiteConfig(siteConfig); + } + + @Override + protected int defaultTimeoutSeconds() { + return 2 * 60; + } + + // @formatter:off + // Below is a diagram of the operations in this test over time. + // + // Scan 0 |------------------------------| + // Scan 1 |----------| + // Minc 1 |-----| + // Scan 2 |----------| + // Scan 3 |---------------| + // Minc 2 |-----| + // Majc 1 |-----| + // @formatter:on + @Test + public void run() throws Exception { + Connector c = getConnector(); + runTest(c, getUniqueNames(1)[0]); + } + + static void runTest(Connector c, String tableName) throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException, + MutationsRejectedException, Exception, InterruptedException { + c.tableOperations().create(tableName); + IteratorSetting is = new IteratorSetting(10, SlowIterator.class); + SlowIterator.setSleepTime(is, 50); + c.tableOperations().attachIterator(tableName, is, EnumSet.of(IteratorScope.minc, IteratorScope.majc)); + c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1.0"); + + BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig()); + for (int i = 0; i < 50; i++) { + Mutation m = new Mutation(new Text(String.format("%06d", i))); + m.put(new Text("cf1"), new Text("cq1"), new Value("foo".getBytes(UTF_8))); + bw.addMutation(m); + } + bw.flush(); + + ScanTask st0 = new ScanTask(c, tableName, 300); + st0.start(); + + ScanTask st1 = new ScanTask(c, tableName, 100); + st1.start(); + + sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + c.tableOperations().flush(tableName, null, null, true); + + for (int i = 0; i < 50; i++) { + Mutation m = new Mutation(new Text(String.format("%06d", i))); + m.put(new Text("cf1"), new Text("cq1"), new Value("foo".getBytes(UTF_8))); + bw.addMutation(m); + } + + bw.flush(); + + ScanTask st2 = new ScanTask(c, tableName, 100); + st2.start(); + + st1.join(); + st2.join(); + if (st1.count != 50) + throw new Exception("Thread 1 did not see 50, saw " + st1.count); + + if (st2.count != 50) + throw new Exception("Thread 2 did not see 50, saw " + st2.count); + + ScanTask st3 = new ScanTask(c, tableName, 150); + st3.start(); + + sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + c.tableOperations().flush(tableName, null, null, false); + + st3.join(); + if (st3.count != 50) + throw new Exception("Thread 3 did not see 50, saw " + st3.count); + + st0.join(); + if (st0.count != 50) + throw new Exception("Thread 0 did not see 50, saw " + st0.count); + + bw.close(); + } +}