Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 94D78200C00 for ; Tue, 3 Jan 2017 21:55:54 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 93874160B43; Tue, 3 Jan 2017 20:55:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 19853160B4A for ; Tue, 3 Jan 2017 21:55:51 +0100 (CET) Received: (qmail 84633 invoked by uid 500); 3 Jan 2017 20:55:51 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 84324 invoked by uid 99); 3 Jan 2017 20:55:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Jan 2017 20:55:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id ED870DFC64; Tue, 3 Jan 2017 20:55:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mwalch@apache.org To: commits@accumulo.apache.org Date: Tue, 03 Jan 2017 20:55:54 -0000 Message-Id: <167ddd485e0a4a88ae0784b5b24a4f31@git.apache.org> In-Reply-To: <2c3e82421dc74374849af337d13930d0@git.apache.org> References: <2c3e82421dc74374849af337d13930d0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/7] accumulo-testing git commit: ACCUMULO-4510 Adding Randomwalk code from Accumulo archived-at: Tue, 03 Jan 2017 20:55:54 -0000 http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Merge.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Merge.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Merge.java new file mode 100644 index 0000000..87a48f9 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Merge.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.concurrent; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Random; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.TableOfflineException; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.testing.core.randomwalk.Environment; +import org.apache.accumulo.testing.core.randomwalk.State; +import org.apache.accumulo.testing.core.randomwalk.Test; +import org.apache.hadoop.io.Text; + +public class Merge extends Test { + + @Override + public void visit(State state, Environment env, Properties props) throws Exception { + Connector conn = env.getConnector(); + + Random rand = (Random) state.get("rand"); + + @SuppressWarnings("unchecked") + List tableNames = (List) state.get("tables"); + tableNames = new ArrayList<>(tableNames); + tableNames.add(MetadataTable.NAME); + String tableName = tableNames.get(rand.nextInt(tableNames.size())); + + List range = ConcurrentFixture.generateRange(rand); + + try { + conn.tableOperations().merge(tableName, range.get(0), range.get(1)); + log.debug("merged " + tableName + " from " + range.get(0) + " to " + range.get(1)); + } catch (TableOfflineException toe) { + log.debug("merge " + tableName + " from " + range.get(0) + " to " + range.get(1) + " failed, table is not online"); + } catch (TableNotFoundException tne) { + log.debug("merge " + tableName + " from " + range.get(0) + " to " + range.get(1) + " failed, doesnt exist"); + } + + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/OfflineTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/OfflineTable.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/OfflineTable.java new file mode 100644 index 0000000..fd01d98 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/OfflineTable.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.concurrent; + +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.testing.core.randomwalk.Environment; +import org.apache.accumulo.testing.core.randomwalk.State; +import org.apache.accumulo.testing.core.randomwalk.Test; + +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + +public class OfflineTable extends Test { + + @Override + public void visit(State state, Environment env, Properties props) throws Exception { + Connector conn = env.getConnector(); + + Random rand = (Random) state.get("rand"); + + @SuppressWarnings("unchecked") + List tableNames = (List) state.get("tables"); + + String tableName = tableNames.get(rand.nextInt(tableNames.size())); + + try { + conn.tableOperations().offline(tableName, rand.nextBoolean()); + log.debug("Offlined " + tableName); + sleepUninterruptibly(rand.nextInt(200), TimeUnit.MILLISECONDS); + conn.tableOperations().online(tableName, rand.nextBoolean()); + log.debug("Onlined " + tableName); + } catch (TableNotFoundException tne) { + log.debug("offline or online failed " + tableName + ", doesnt exist"); + } + + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/RenameNamespace.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/RenameNamespace.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/RenameNamespace.java new file mode 100644 index 0000000..dab41bf --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/RenameNamespace.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.concurrent; + +import java.util.List; +import java.util.Properties; +import java.util.Random; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.NamespaceExistsException; +import org.apache.accumulo.core.client.NamespaceNotFoundException; +import org.apache.accumulo.testing.core.randomwalk.Environment; +import org.apache.accumulo.testing.core.randomwalk.State; +import org.apache.accumulo.testing.core.randomwalk.Test; + +public class RenameNamespace extends Test { + + @Override + public void visit(State state, Environment env, Properties props) throws Exception { + Connector conn = env.getConnector(); + + Random rand = (Random) state.get("rand"); + + @SuppressWarnings("unchecked") + List namespaces = (List) state.get("namespaces"); + + String srcName = namespaces.get(rand.nextInt(namespaces.size())); + String newName = namespaces.get(rand.nextInt(namespaces.size())); + + try { + conn.namespaceOperations().rename(srcName, newName); + log.debug("Renamed namespace " + srcName + " " + newName); + } catch (NamespaceExistsException e) { + log.debug("Rename namespace " + srcName + " failed, " + newName + " exists"); + } catch (NamespaceNotFoundException e) { + log.debug("Rename namespace " + srcName + " failed, doesn't exist"); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/RenameTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/RenameTable.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/RenameTable.java new file mode 100644 index 0000000..4c5a52f --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/RenameTable.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.concurrent; + +import java.util.List; +import java.util.Properties; +import java.util.Random; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.NamespaceNotFoundException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.testing.core.randomwalk.Environment; +import org.apache.accumulo.testing.core.randomwalk.State; +import org.apache.accumulo.testing.core.randomwalk.Test; + +public class RenameTable extends Test { + + @Override + public void visit(State state, Environment env, Properties props) throws Exception { + Connector conn = env.getConnector(); + + Random rand = (Random) state.get("rand"); + + @SuppressWarnings("unchecked") + List tableNames = (List) state.get("tables"); + + String srcTableName = tableNames.get(rand.nextInt(tableNames.size())); + String newTableName = tableNames.get(rand.nextInt(tableNames.size())); + + String srcNamespace = "", newNamespace = ""; + + int index = srcTableName.indexOf('.'); + if (-1 != index) { + srcNamespace = srcTableName.substring(0, index); + } + + index = newTableName.indexOf('.'); + if (-1 != index) { + newNamespace = newTableName.substring(0, index); + } + + try { + conn.tableOperations().rename(srcTableName, newTableName); + log.debug("Renamed table " + srcTableName + " " + newTableName); + } catch (TableExistsException e) { + log.debug("Rename " + srcTableName + " failed, " + newTableName + " exists"); + } catch (TableNotFoundException e) { + Throwable cause = e.getCause(); + if (null != cause) { + // Rename has to have failed on the destination namespace, because the source namespace + // couldn't be deleted with our table in it + if (cause.getClass().isAssignableFrom(NamespaceNotFoundException.class)) { + log.debug("Rename failed because new namespace doesn't exist: " + newNamespace, cause); + // Avoid the final src/dest namespace check + return; + } + } + + log.debug("Rename " + srcTableName + " failed, doesnt exist"); + } catch (IllegalArgumentException e) { + log.debug("Rename: " + e.toString()); + } catch (AccumuloException e) { + // Catch the expected failure when we try to rename a table into a new namespace + if (!srcNamespace.equals(newNamespace)) { + return; + } + log.debug("Rename " + srcTableName + " failed.", e); + } + + if (!srcNamespace.equals(newNamespace)) { + log.error("RenameTable operation should have failed when renaming across namespaces."); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Replication.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Replication.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Replication.java new file mode 100644 index 0000000..189d743 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Replication.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.concurrent; + +import static org.apache.accumulo.core.conf.Property.MASTER_REPLICATION_SCAN_INTERVAL; +import static org.apache.accumulo.core.conf.Property.REPLICATION_NAME; +import static org.apache.accumulo.core.conf.Property.REPLICATION_PEERS; +import static org.apache.accumulo.core.conf.Property.REPLICATION_PEER_PASSWORD; +import static org.apache.accumulo.core.conf.Property.REPLICATION_PEER_USER; +import static org.apache.accumulo.core.conf.Property.REPLICATION_WORK_ASSIGNMENT_SLEEP; +import static org.apache.accumulo.core.conf.Property.REPLICATION_WORK_PROCESSOR_DELAY; +import static org.apache.accumulo.core.conf.Property.REPLICATION_WORK_PROCESSOR_PERIOD; +import static org.apache.accumulo.core.conf.Property.TABLE_REPLICATION; +import static org.apache.accumulo.core.conf.Property.TABLE_REPLICATION_TARGET; +import static org.apache.accumulo.server.replication.ReplicaSystemFactory.getPeerConfigurationValue; + +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.InstanceOperations; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.replication.ReplicationTable; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.testing.core.randomwalk.Environment; +import org.apache.accumulo.testing.core.randomwalk.State; +import org.apache.accumulo.testing.core.randomwalk.Test; +import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem; +import org.apache.hadoop.io.Text; + +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + +public class Replication extends Test { + + final int ROWS = 1000; + final int COLS = 50; + + @Override + public void visit(State state, Environment env, Properties props) throws Exception { + final Connector c = env.getConnector(); + final Instance inst = c.getInstance(); + final String instName = inst.getInstanceName(); + final InstanceOperations iOps = c.instanceOperations(); + final TableOperations tOps = c.tableOperations(); + + // Replicate to ourselves + iOps.setProperty(REPLICATION_NAME.getKey(), instName); + iOps.setProperty(REPLICATION_PEERS.getKey() + instName, getPeerConfigurationValue(AccumuloReplicaSystem.class, instName + "," + inst.getZooKeepers())); + iOps.setProperty(REPLICATION_PEER_USER.getKey() + instName, env.getUserName()); + iOps.setProperty(REPLICATION_PEER_PASSWORD.getKey() + instName, env.getPassword()); + // Tweak some replication parameters to make the replication go faster + iOps.setProperty(MASTER_REPLICATION_SCAN_INTERVAL.getKey(), "1s"); + iOps.setProperty(REPLICATION_WORK_ASSIGNMENT_SLEEP.getKey(), "1s"); + iOps.setProperty(REPLICATION_WORK_PROCESSOR_DELAY.getKey(), "1s"); + iOps.setProperty(REPLICATION_WORK_PROCESSOR_PERIOD.getKey(), "1s"); + + // Ensure the replication table is online + ReplicationTable.setOnline(c); + boolean online = ReplicationTable.isOnline(c); + for (int i = 0; i < 10; i++) { + if (online) + break; + sleepUninterruptibly(2, TimeUnit.SECONDS); + online = ReplicationTable.isOnline(c); + } + assertTrue("Replication table was not online", online); + + // Make a source and destination table + final String sourceTable = ("repl-source-" + UUID.randomUUID()).replace('-', '_'); + final String destTable = ("repl-dest-" + UUID.randomUUID()).replace('-', '_'); + final String tables[] = new String[] {sourceTable, destTable}; + + for (String tableName : tables) { + log.debug("creating " + tableName); + tOps.create(tableName); + } + + // Point the source to the destination + final String destID = tOps.tableIdMap().get(destTable); + tOps.setProperty(sourceTable, TABLE_REPLICATION.getKey(), "true"); + tOps.setProperty(sourceTable, TABLE_REPLICATION_TARGET.getKey() + instName, destID); + + // zookeeper propagation wait + sleepUninterruptibly(5, TimeUnit.SECONDS); + + // Maybe split the tables + Random rand = new Random(System.currentTimeMillis()); + for (String tableName : tables) { + if (rand.nextBoolean()) { + splitTable(tOps, tableName); + } + } + + // write some checkable data + BatchWriter bw = c.createBatchWriter(sourceTable, null); + for (int row = 0; row < ROWS; row++) { + Mutation m = new Mutation(itos(row)); + for (int col = 0; col < COLS; col++) { + m.put("", itos(col), ""); + } + bw.addMutation(m); + } + bw.close(); + + // attempt to force the WAL to roll so replication begins + final Set origRefs = c.replicationOperations().referencedFiles(sourceTable); + // write some data we will ignore + while (true) { + final Set updatedFileRefs = c.replicationOperations().referencedFiles(sourceTable); + updatedFileRefs.retainAll(origRefs); + log.debug("updateFileRefs size " + updatedFileRefs.size()); + if (updatedFileRefs.isEmpty()) { + break; + } + bw = c.createBatchWriter(sourceTable, null); + for (int row = 0; row < ROWS; row++) { + Mutation m = new Mutation(itos(row)); + for (int col = 0; col < COLS; col++) { + m.put("ignored", itos(col), ""); + } + bw.addMutation(m); + } + bw.close(); + } + + // wait a little while for replication to take place + sleepUninterruptibly(30, TimeUnit.SECONDS); + + // check the data + Scanner scanner = c.createScanner(destTable, Authorizations.EMPTY); + scanner.fetchColumnFamily(new Text("")); + int row = 0; + int col = 0; + for (Entry entry : scanner) { + assertEquals(row, Integer.parseInt(entry.getKey().getRow().toString())); + assertEquals(col, Integer.parseInt(entry.getKey().getColumnQualifier().toString())); + col++; + if (col == COLS) { + row++; + col = 0; + } + } + assertEquals(ROWS, row); + assertEquals(0, col); + + // cleanup + for (String tableName : tables) { + log.debug("Deleting " + tableName); + tOps.delete(tableName); + } + } + + // junit isn't a dependency + private void assertEquals(int expected, int actual) { + if (expected != actual) + throw new RuntimeException(String.format("%d fails to match expected value %d", actual, expected)); + } + + // junit isn't a dependency + private void assertTrue(String string, boolean test) { + if (!test) + throw new RuntimeException(string); + } + + private static String itos(int i) { + return String.format("%08d", i); + } + + private void splitTable(TableOperations tOps, String tableName) throws Exception { + SortedSet splits = new TreeSet<>(); + for (int i = 1; i <= 9; i++) { + splits.add(new Text(itos(i * (ROWS / 10)))); + } + log.debug("Adding splits to " + tableName); + tOps.addSplits(tableName, splits); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/ScanTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/ScanTable.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/ScanTable.java new file mode 100644 index 0000000..ab89bea --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/ScanTable.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.concurrent; + +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Random; + +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableDeletedException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.TableOfflineException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.testing.core.randomwalk.Environment; +import org.apache.accumulo.testing.core.randomwalk.State; +import org.apache.accumulo.testing.core.randomwalk.Test; + +public class ScanTable extends Test { + + @Override + public void visit(State state, Environment env, Properties props) throws Exception { + Connector conn = env.getConnector(); + + Random rand = (Random) state.get("rand"); + + @SuppressWarnings("unchecked") + List tableNames = (List) state.get("tables"); + + String tableName = tableNames.get(rand.nextInt(tableNames.size())); + + try { + Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY); + Iterator> iter = scanner.iterator(); + while (iter.hasNext()) { + iter.next(); + } + log.debug("Scanned " + tableName); + } catch (TableDeletedException e) { + log.debug("Scan " + tableName + " failed, table deleted"); + } catch (TableNotFoundException e) { + log.debug("Scan " + tableName + " failed, doesnt exist"); + } catch (TableOfflineException e) { + log.debug("Scan " + tableName + " failed, offline"); + } catch (RuntimeException e) { + if (e.getCause() instanceof AccumuloSecurityException) { + log.debug("BatchScan " + tableName + " failed, permission error"); + } else { + throw e; + } + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Setup.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Setup.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Setup.java new file mode 100644 index 0000000..164fd4f --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Setup.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.concurrent; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Random; + +import org.apache.accumulo.testing.core.randomwalk.Environment; +import org.apache.accumulo.testing.core.randomwalk.State; +import org.apache.accumulo.testing.core.randomwalk.Test; + +public class Setup extends Test { + + @Override + public void visit(State state, Environment env, Properties props) throws Exception { + Random rand = new Random(); + state.set("rand", rand); + + int numTables = Integer.parseInt(props.getProperty("numTables", "9")); + int numNamespaces = Integer.parseInt(props.getProperty("numNamespaces", "2")); + log.debug("numTables = " + numTables); + log.debug("numNamespaces = " + numNamespaces); + List tables = new ArrayList<>(); + List namespaces = new ArrayList<>(); + + for (int i = 0; i < numNamespaces; i++) { + namespaces.add(String.format("nspc_%03d", i)); + } + + // Make tables in the default namespace + double tableCeil = Math.ceil((double) numTables / (numNamespaces + 1)); + for (int i = 0; i < tableCeil; i++) { + tables.add(String.format("ctt_%03d", i)); + } + + // Make tables in each namespace + double tableFloor = Math.floor(numTables / (numNamespaces + 1)); + for (String n : namespaces) { + for (int i = 0; i < tableFloor; i++) { + tables.add(String.format(n + ".ctt_%03d", i)); + } + } + + state.set("tables", tables); + state.set("namespaces", namespaces); + + int numUsers = Integer.parseInt(props.getProperty("numUsers", "5")); + log.debug("numUsers = " + numUsers); + List users = new ArrayList<>(); + for (int i = 0; i < numUsers; i++) + users.add(String.format("user%03d", i)); + state.set("users", users); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Shutdown.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Shutdown.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Shutdown.java new file mode 100644 index 0000000..dc2e670 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Shutdown.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.concurrent; + +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.impl.MasterClient; +import org.apache.accumulo.core.master.thrift.MasterClientService.Client; +import org.apache.accumulo.core.master.thrift.MasterGoalState; +import org.apache.accumulo.core.trace.Tracer; +import org.apache.accumulo.master.state.SetGoalState; +import org.apache.accumulo.server.AccumuloServerContext; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.ServerConfigurationFactory; +import org.apache.accumulo.testing.core.randomwalk.Environment; +import org.apache.accumulo.testing.core.randomwalk.State; +import org.apache.accumulo.testing.core.randomwalk.Test; + +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + +public class Shutdown extends Test { + + @Override + public void visit(State state, Environment env, Properties props) throws Exception { + log.info("shutting down"); + SetGoalState.main(new String[] {MasterGoalState.CLEAN_STOP.name()}); + + while (!env.getConnector().instanceOperations().getTabletServers().isEmpty()) { + sleepUninterruptibly(1, TimeUnit.SECONDS); + } + + while (true) { + try { + AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(HdfsZooInstance.getInstance())); + Client client = MasterClient.getConnection(context); + client.getMasterStats(Tracer.traceInfo(), context.rpcCreds()); + } catch (Exception e) { + // assume this is due to server shutdown + break; + } + sleepUninterruptibly(1, TimeUnit.SECONDS); + } + + log.info("servers stopped"); + sleepUninterruptibly(10, TimeUnit.SECONDS); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/StartAll.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/StartAll.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/StartAll.java new file mode 100644 index 0000000..df30487 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/StartAll.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.concurrent; + +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.impl.MasterClient; +import org.apache.accumulo.core.master.thrift.MasterClientService.Client; +import org.apache.accumulo.core.master.thrift.MasterGoalState; +import org.apache.accumulo.core.master.thrift.MasterMonitorInfo; +import org.apache.accumulo.core.trace.Tracer; +import org.apache.accumulo.master.state.SetGoalState; +import org.apache.accumulo.server.AccumuloServerContext; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.ServerConfigurationFactory; +import org.apache.accumulo.testing.core.randomwalk.Environment; +import org.apache.accumulo.testing.core.randomwalk.State; +import org.apache.accumulo.testing.core.randomwalk.Test; + +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + +public class StartAll extends Test { + + @Override + public void visit(State state, Environment env, Properties props) throws Exception { + log.info("Starting all servers"); + SetGoalState.main(new String[] {MasterGoalState.NORMAL.name()}); + Process exec = Runtime.getRuntime().exec(new String[] {System.getenv().get("ACCUMULO_HOME") + "/bin/start-all.sh"}); + exec.waitFor(); + while (true) { + try { + AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(HdfsZooInstance.getInstance())); + Client client = MasterClient.getConnection(context); + MasterMonitorInfo masterStats = client.getMasterStats(Tracer.traceInfo(), context.rpcCreds()); + if (!masterStats.tServerInfo.isEmpty()) + break; + } catch (Exception ex) { + sleepUninterruptibly(1, TimeUnit.SECONDS); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/StopTabletServer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/StopTabletServer.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/StopTabletServer.java new file mode 100644 index 0000000..8210dc4 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/StopTabletServer.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.concurrent; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.util.AddressUtil; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooReader; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.testing.core.randomwalk.Environment; +import org.apache.accumulo.testing.core.randomwalk.State; +import org.apache.accumulo.testing.core.randomwalk.Test; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +public class StopTabletServer extends Test { + + Set getTServers(Instance instance) throws KeeperException, InterruptedException { + Set result = new HashSet<>(); + ZooReader rdr = new ZooReader(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); + String base = ZooUtil.getRoot(instance) + Constants.ZTSERVERS; + for (String child : rdr.getChildren(base)) { + try { + List children = rdr.getChildren(base + "/" + child); + if (children.size() > 0) { + Collections.sort(children); + Stat stat = new Stat(); + byte[] data = rdr.getData(base + "/" + child + "/" + children.get(0), stat); + if (!"master".equals(new String(data, UTF_8))) { + result.add(new TServerInstance(AddressUtil.parseAddress(child, false), stat.getEphemeralOwner())); + } + } + } catch (KeeperException.NoNodeException ex) { + // someone beat us too it + } + } + return result; + } + + @Override + public void visit(State state, Environment env, Properties props) throws Exception { + + Instance instance = env.getInstance(); + + List currentServers = new ArrayList<>(getTServers(instance)); + Collections.shuffle(currentServers); + Runtime runtime = Runtime.getRuntime(); + if (currentServers.size() > 1) { + TServerInstance victim = currentServers.get(0); + log.info("Stopping " + victim.hostPort()); + Process exec = runtime.exec(new String[] {System.getenv("ACCUMULO_HOME") + "/bin/accumulo", "admin", "stop", victim.hostPort()}); + if (exec.waitFor() != 0) + throw new RuntimeException("admin stop returned a non-zero response: " + exec.exitValue()); + Set set = getTServers(instance); + if (set.contains(victim)) + throw new RuntimeException("Failed to stop " + victim); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Compact.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Compact.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Compact.java new file mode 100644 index 0000000..b0aa7e1 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Compact.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.conditional; + +import java.util.Properties; +import java.util.Random; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.testing.core.randomwalk.Environment; +import org.apache.accumulo.testing.core.randomwalk.State; +import org.apache.accumulo.testing.core.randomwalk.Test; +import org.apache.hadoop.io.Text; + +/** + * + */ +public class Compact extends Test { + @Override + public void visit(State state, Environment env, Properties props) throws Exception { + String table = state.getString("tableName"); + Random rand = (Random) state.get("rand"); + Connector conn = env.getConnector(); + Text row1 = new Text(Utils.getBank(rand.nextInt((Integer) state.get("numBanks")))); + Text row2 = new Text(Utils.getBank(rand.nextInt((Integer) state.get("numBanks")))); + + if (row1.compareTo(row2) >= 0) { + row1 = null; + row2 = null; + } + + log.debug("compacting " + row1 + " " + row2); + conn.tableOperations().compact(table, row1, row2, rand.nextBoolean(), rand.nextBoolean()); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Flush.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Flush.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Flush.java new file mode 100644 index 0000000..2c5448d --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Flush.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.conditional; + +import java.util.Properties; +import java.util.Random; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.testing.core.randomwalk.Environment; +import org.apache.accumulo.testing.core.randomwalk.State; +import org.apache.accumulo.testing.core.randomwalk.Test; +import org.apache.hadoop.io.Text; + +/** + * + */ +public class Flush extends Test { + @Override + public void visit(State state, Environment env, Properties props) throws Exception { + String table = state.getString("tableName"); + Random rand = (Random) state.get("rand"); + Connector conn = env.getConnector(); + Text row1 = new Text(Utils.getBank(rand.nextInt((Integer) state.get("numBanks")))); + Text row2 = new Text(Utils.getBank(rand.nextInt((Integer) state.get("numBanks")))); + + if (row1.compareTo(row2) >= 0) { + row1 = null; + row2 = null; + } + + log.debug("flushing " + row1 + " " + row2); + conn.tableOperations().flush(table, row1, row2, rand.nextBoolean()); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Init.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Init.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Init.java new file mode 100644 index 0000000..50a1e52 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Init.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.conditional; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Properties; +import java.util.Random; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.ConditionalWriter; +import org.apache.accumulo.core.client.ConditionalWriter.Status; +import org.apache.accumulo.core.data.Condition; +import org.apache.accumulo.core.data.ConditionalMutation; +import org.apache.accumulo.testing.core.randomwalk.Environment; +import org.apache.accumulo.testing.core.randomwalk.State; +import org.apache.accumulo.testing.core.randomwalk.Test; +import org.apache.hadoop.io.Text; + +/** + * + */ +public class Init extends Test { + + @Override + public void visit(State state, Environment env, Properties props) throws Exception { + + int numBanks = (Integer) state.get("numBanks"); + int numAccts = (Integer) state.get("numAccts"); + + // add some splits to spread ingest out a little + TreeSet splits = new TreeSet<>(); + for (int i = 1; i < 10; i++) + splits.add(new Text(Utils.getBank((int) (numBanks * .1 * i)))); + env.getConnector().tableOperations().addSplits((String) state.get("tableName"), splits); + log.debug("Added splits " + splits); + + ArrayList banks = new ArrayList<>(); + for (int i = 0; i < numBanks; i++) + banks.add(i); + // shuffle for case when multiple threads are adding banks + Collections.shuffle(banks, (Random) state.get("rand")); + + ConditionalWriter cw = (ConditionalWriter) state.get("cw"); + + for (int i : banks) { + ConditionalMutation m = new ConditionalMutation(Utils.getBank(i)); + int acceptedCount = 0; + for (int j = 0; j < numAccts; j++) { + String cf = Utils.getAccount(j); + m.addCondition(new Condition(cf, "seq")); + m.put(cf, "bal", "100"); + m.put(cf, "seq", Utils.getSeq(0)); + + if (j % 1000 == 0 && j > 0) { + Status status = cw.write(m).getStatus(); + + while (status == Status.UNKNOWN) + status = cw.write(m).getStatus(); + + if (status == Status.ACCEPTED) + acceptedCount++; + m = new ConditionalMutation(Utils.getBank(i)); + } + + } + if (m.getConditions().size() > 0) { + Status status = cw.write(m).getStatus(); + while (status == Status.UNKNOWN) + status = cw.write(m).getStatus(); + + if (status == Status.ACCEPTED) + acceptedCount++; + } + + log.debug("Added bank " + Utils.getBank(i) + " " + acceptedCount); + } + + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Merge.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Merge.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Merge.java new file mode 100644 index 0000000..2f5d52b --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Merge.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.conditional; + +import java.util.Properties; +import java.util.Random; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.testing.core.randomwalk.Environment; +import org.apache.accumulo.testing.core.randomwalk.State; +import org.apache.accumulo.testing.core.randomwalk.Test; +import org.apache.hadoop.io.Text; + +/** + * + */ +public class Merge extends Test { + @Override + public void visit(State state, Environment env, Properties props) throws Exception { + String table = state.getString("tableName"); + Random rand = (Random) state.get("rand"); + Connector conn = env.getConnector(); + Text row1 = new Text(Utils.getBank(rand.nextInt((Integer) state.get("numBanks")))); + Text row2 = new Text(Utils.getBank(rand.nextInt((Integer) state.get("numBanks")))); + + if (row1.compareTo(row2) >= 0) { + row1 = null; + row2 = null; + } + + log.debug("merging " + row1 + " " + row2); + conn.tableOperations().merge(table, row1, row2); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Setup.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Setup.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Setup.java new file mode 100644 index 0000000..1e4ad01 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Setup.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.conditional; + +import java.util.Properties; +import java.util.Random; + +import org.apache.accumulo.core.client.ConditionalWriter; +import org.apache.accumulo.core.client.ConditionalWriterConfig; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.testing.core.randomwalk.Environment; +import org.apache.accumulo.testing.core.randomwalk.State; +import org.apache.accumulo.testing.core.randomwalk.Test; + +public class Setup extends Test { + + @Override + public void visit(State state, Environment env, Properties props) throws Exception { + Random rand = new Random(); + state.set("rand", rand); + + int numBanks = Integer.parseInt(props.getProperty("numBanks", "1000")); + log.debug("numBanks = " + numBanks); + state.set("numBanks", numBanks); + + int numAccts = Integer.parseInt(props.getProperty("numAccts", "10000")); + log.debug("numAccts = " + numAccts); + state.set("numAccts", numAccts); + + String tableName = "banks"; + state.set("tableName", tableName); + + try { + env.getConnector().tableOperations().create(tableName); + log.debug("created table " + tableName); + boolean blockCache = rand.nextBoolean(); + env.getConnector().tableOperations().setProperty(tableName, Property.TABLE_BLOCKCACHE_ENABLED.getKey(), blockCache + ""); + log.debug("set " + Property.TABLE_BLOCKCACHE_ENABLED.getKey() + " " + blockCache); + } catch (TableExistsException tee) {} + + ConditionalWriter cw = env.getConnector().createConditionalWriter(tableName, new ConditionalWriterConfig().setMaxWriteThreads(1)); + state.set("cw", cw); + + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Split.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Split.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Split.java new file mode 100644 index 0000000..8ea9aab --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Split.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.conditional; + +import java.util.Arrays; +import java.util.Properties; +import java.util.Random; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.testing.core.randomwalk.Environment; +import org.apache.accumulo.testing.core.randomwalk.State; +import org.apache.accumulo.testing.core.randomwalk.Test; +import org.apache.hadoop.io.Text; + +/** + * + */ +public class Split extends Test { + @Override + public void visit(State state, Environment env, Properties props) throws Exception { + String table = state.getString("tableName"); + Random rand = (Random) state.get("rand"); + Connector conn = env.getConnector(); + String row = Utils.getBank(rand.nextInt((Integer) state.get("numBanks"))); + + log.debug("adding split " + row); + conn.tableOperations().addSplits(table, new TreeSet<>(Arrays.asList(new Text(row)))); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/TearDown.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/TearDown.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/TearDown.java new file mode 100644 index 0000000..cf72607 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/TearDown.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.conditional; + +import java.util.Properties; + +import org.apache.accumulo.core.client.ConditionalWriter; +import org.apache.accumulo.testing.core.randomwalk.Environment; +import org.apache.accumulo.testing.core.randomwalk.State; +import org.apache.accumulo.testing.core.randomwalk.Test; + +/** + * + */ +public class TearDown extends Test { + @Override + public void visit(State state, Environment env, Properties props) throws Exception { + ConditionalWriter cw = (ConditionalWriter) state.get("cw"); + cw.close(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Transfer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Transfer.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Transfer.java new file mode 100644 index 0000000..73a7d91 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Transfer.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.conditional; + +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Random; + +import org.apache.accumulo.core.client.ConditionalWriter; +import org.apache.accumulo.core.client.ConditionalWriter.Status; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Condition; +import org.apache.accumulo.core.data.ConditionalMutation; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.testing.core.randomwalk.Environment; +import org.apache.accumulo.testing.core.randomwalk.State; +import org.apache.accumulo.testing.core.randomwalk.Test; +import org.apache.commons.math3.distribution.ZipfDistribution; +import org.apache.hadoop.io.Text; + +/** + * + */ +public class Transfer extends Test { + + private static class Account { + int seq; + int bal; + + void setBal(String s) { + bal = Integer.parseInt(s); + } + + void setSeq(String s) { + seq = Integer.parseInt(s); + } + + @Override + public String toString() { + return seq + " " + bal; + } + } + + @Override + public void visit(State state, Environment env, Properties props) throws Exception { + String table = state.getString("tableName"); + Random rand = (Random) state.get("rand"); + Connector conn = env.getConnector(); + + int numAccts = (Integer) state.get("numAccts"); + // note: non integer exponents are slow + + ZipfDistribution zdiBanks = new ZipfDistribution((Integer) state.get("numBanks"), 1); + String bank = Utils.getBank(zdiBanks.inverseCumulativeProbability(rand.nextDouble())); + ZipfDistribution zdiAccts = new ZipfDistribution(numAccts, 1); + String acct1 = Utils.getAccount(zdiAccts.inverseCumulativeProbability(rand.nextDouble())); + String acct2 = Utils.getAccount(zdiAccts.inverseCumulativeProbability(rand.nextDouble())); + while (acct2.equals(acct1)) { + // intentionally not using zipf distribution to pick on retry + acct2 = Utils.getAccount(rand.nextInt(numAccts)); + } + + // TODO document how data should be read when using ConditionalWriter + try (Scanner scanner = new IsolatedScanner(conn.createScanner(table, Authorizations.EMPTY))) { + + scanner.setRange(new Range(bank)); + scanner.fetchColumnFamily(new Text(acct1)); + scanner.fetchColumnFamily(new Text(acct2)); + + Account a1 = new Account(); + Account a2 = new Account(); + Account a; + + for (Entry entry : scanner) { + String cf = entry.getKey().getColumnFamilyData().toString(); + String cq = entry.getKey().getColumnQualifierData().toString(); + + if (cf.equals(acct1)) + a = a1; + else if (cf.equals(acct2)) + a = a2; + else + throw new Exception("Unexpected column fam: " + cf); + + if (cq.equals("bal")) + a.setBal(entry.getValue().toString()); + else if (cq.equals("seq")) + a.setSeq(entry.getValue().toString()); + else + throw new Exception("Unexpected column qual: " + cq); + } + + int amt = rand.nextInt(50); + + log.debug("transfer req " + bank + " " + amt + " " + acct1 + " " + a1 + " " + acct2 + " " + a2); + + if (a1.bal >= amt) { + ConditionalMutation cm = new ConditionalMutation(bank, new Condition(acct1, "seq").setValue(Utils.getSeq(a1.seq)), + new Condition(acct2, "seq").setValue(Utils.getSeq(a2.seq))); + cm.put(acct1, "bal", (a1.bal - amt) + ""); + cm.put(acct2, "bal", (a2.bal + amt) + ""); + cm.put(acct1, "seq", Utils.getSeq(a1.seq + 1)); + cm.put(acct2, "seq", Utils.getSeq(a2.seq + 1)); + + ConditionalWriter cw = (ConditionalWriter) state.get("cw"); + Status status = cw.write(cm).getStatus(); + while (status == Status.UNKNOWN) { + log.debug("retrying transfer " + status); + status = cw.write(cm).getStatus(); + } + log.debug("transfer result " + bank + " " + status + " " + a1 + " " + a2); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Utils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Utils.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Utils.java new file mode 100644 index 0000000..5436c22 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Utils.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.conditional; + +/** + * + */ +public class Utils { + + static String getBank(int b) { + return String.format("b%03d", b); + } + + static String getAccount(int a) { + return "acct" + String.format("%06d", a); + } + + static String getSeq(int s) { + return String.format("%06d", s); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Verify.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Verify.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Verify.java new file mode 100644 index 0000000..fa516f1 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Verify.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.conditional; + +import java.util.Map.Entry; +import java.util.Properties; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.user.ColumnSliceFilter; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.testing.core.randomwalk.Environment; +import org.apache.accumulo.testing.core.randomwalk.State; +import org.apache.accumulo.testing.core.randomwalk.Test; + +/** + * + */ +public class Verify extends Test { + + @Override + public void visit(State state, Environment env, Properties props) throws Exception { + String table = state.getString("tableName"); + Connector conn = env.getConnector(); + + int numAccts = (Integer) state.get("numAccts"); + + for (int i = 0; i < (Integer) state.get("numBanks"); i++) + verifyBank(table, conn, Utils.getBank(i), numAccts); + + } + + private void verifyBank(String table, Connector conn, String row, int numAccts) throws TableNotFoundException, Exception { + log.debug("Verifying bank " + row); + + int count = 0; + int sum = 0; + int min = Integer.MAX_VALUE; + int max = Integer.MIN_VALUE; + + // TODO do not use IsolatedScanner, just enable isolation on scanner + try (Scanner scanner = new IsolatedScanner(conn.createScanner(table, Authorizations.EMPTY))) { + + scanner.setRange(new Range(row)); + IteratorSetting iterConf = new IteratorSetting(100, "cqsl", ColumnSliceFilter.class); + ColumnSliceFilter.setSlice(iterConf, "bal", true, "bal", true); + scanner.clearScanIterators(); + scanner.addScanIterator(iterConf); + + for (Entry entry : scanner) { + int bal = Integer.parseInt(entry.getValue().toString()); + sum += bal; + if (bal > max) + max = bal; + if (bal < min) + min = bal; + count++; + } + + } + + if (count > 0 && sum != numAccts * 100) { + throw new Exception("Sum is off " + sum); + } + + log.debug("Verified " + row + " count = " + count + " sum = " + sum + " min = " + min + " max = " + max); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/Commit.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/Commit.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/Commit.java new file mode 100644 index 0000000..09774ff --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/Commit.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.image; + +import java.util.Properties; + +import org.apache.accumulo.testing.core.randomwalk.Environment; +import org.apache.accumulo.testing.core.randomwalk.State; +import org.apache.accumulo.testing.core.randomwalk.Test; + +public class Commit extends Test { + + @Override + public void visit(State state, Environment env, Properties props) throws Exception { + env.getMultiTableBatchWriter().flush(); + + log.debug("Committed " + state.getLong("numWrites") + " writes. Total writes: " + state.getLong("totalWrites")); + state.set("numWrites", Long.valueOf(0)); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/ImageFixture.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/ImageFixture.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/ImageFixture.java new file mode 100644 index 0000000..687b2d1 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/ImageFixture.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.image; + +import java.net.InetAddress; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.testing.core.randomwalk.Environment; +import org.apache.accumulo.testing.core.randomwalk.Fixture; +import org.apache.accumulo.testing.core.randomwalk.State; +import org.apache.hadoop.io.Text; + +public class ImageFixture extends Fixture { + + String imageTableName; + String indexTableName; + + @Override + public void setUp(State state, Environment env) throws Exception { + + Connector conn = env.getConnector(); + Instance instance = env.getInstance(); + + SortedSet splits = new TreeSet<>(); + for (int i = 1; i < 256; i++) { + splits.add(new Text(String.format("%04x", i << 8))); + } + + String hostname = InetAddress.getLocalHost().getHostName().replaceAll("[-.]", "_"); + String pid = env.getPid(); + + imageTableName = String.format("img_%s_%s_%d", hostname, pid, System.currentTimeMillis()); + state.set("imageTableName", imageTableName); + + indexTableName = String.format("img_ndx_%s_%s_%d", hostname, pid, System.currentTimeMillis()); + state.set("indexTableName", indexTableName); + + try { + conn.tableOperations().create(imageTableName); + conn.tableOperations().addSplits(imageTableName, splits); + log.debug("Created table " + imageTableName + " (id:" + Tables.getNameToIdMap(instance).get(imageTableName) + ")"); + } catch (TableExistsException e) { + log.error("Table " + imageTableName + " already exists."); + throw e; + } + + try { + conn.tableOperations().create(indexTableName); + log.debug("Created table " + indexTableName + " (id:" + Tables.getNameToIdMap(instance).get(indexTableName) + ")"); + } catch (TableExistsException e) { + log.error("Table " + imageTableName + " already exists."); + throw e; + } + + Random rand = new Random(); + if (rand.nextInt(10) < 5) { + // setup locality groups + Map> groups = getLocalityGroups(); + + conn.tableOperations().setLocalityGroups(imageTableName, groups); + log.debug("Configured locality groups for " + imageTableName + " groups = " + groups); + } + + state.set("numWrites", Long.valueOf(0)); + state.set("totalWrites", Long.valueOf(0)); + state.set("verified", Integer.valueOf(0)); + state.set("lastIndexRow", new Text("")); + } + + static Map> getLocalityGroups() { + Map> groups = new HashMap<>(); + + HashSet lg1 = new HashSet<>(); + lg1.add(Write.CONTENT_COLUMN_FAMILY); + groups.put("lg1", lg1); + + HashSet lg2 = new HashSet<>(); + lg2.add(Write.META_COLUMN_FAMILY); + groups.put("lg2", lg2); + return groups; + } + + @Override + public void tearDown(State state, Environment env) throws Exception { + // We have resources we need to clean up + if (env.isMultiTableBatchWriterInitialized()) { + MultiTableBatchWriter mtbw = env.getMultiTableBatchWriter(); + try { + mtbw.close(); + } catch (MutationsRejectedException e) { + log.error("Ignoring mutations that weren't flushed", e); + } + + // Reset the MTBW on the state to null + env.resetMultiTableBatchWriter(); + } + + // Now we can safely delete the tables + log.debug("Dropping tables: " + imageTableName + " " + indexTableName); + + Connector conn = env.getConnector(); + + conn.tableOperations().delete(imageTableName); + conn.tableOperations().delete(indexTableName); + + log.debug("Final total of writes: " + state.getLong("totalWrites")); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/ScanMeta.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/ScanMeta.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/ScanMeta.java new file mode 100644 index 0000000..dbd89e8 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/ScanMeta.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.image; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Random; +import java.util.UUID; + +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.testing.core.randomwalk.Environment; +import org.apache.accumulo.testing.core.randomwalk.State; +import org.apache.accumulo.testing.core.randomwalk.Test; +import org.apache.hadoop.io.Text; + +public class ScanMeta extends Test { + + @Override + public void visit(State state, Environment env, Properties props) throws Exception { + + // scan just the metadata of the images table to find N hashes... use the batch scanner to lookup those N hashes in the index table + // this scan will test locality groups.... + + String indexTableName = state.getString("indexTableName"); + String imageTableName = state.getString("imageTableName"); + + String uuid = UUID.randomUUID().toString(); + + Connector conn = env.getConnector(); + + Scanner imageScanner = conn.createScanner(imageTableName, new Authorizations()); + + imageScanner.setRange(new Range(new Text(uuid), null)); + imageScanner.fetchColumn(Write.META_COLUMN_FAMILY, Write.SHA1_COLUMN_QUALIFIER); + + int minScan = Integer.parseInt(props.getProperty("minScan")); + int maxScan = Integer.parseInt(props.getProperty("maxScan")); + + Random rand = new Random(); + int numToScan = rand.nextInt(maxScan - minScan) + minScan; + + Map hashes = new HashMap<>(); + + Iterator> iter = imageScanner.iterator(); + + while (iter.hasNext() && numToScan > 0) { + + Entry entry = iter.next(); + + hashes.put(new Text(entry.getValue().get()), entry.getKey().getRow()); + + numToScan--; + } + + log.debug("Found " + hashes.size() + " hashes starting at " + uuid); + + if (hashes.isEmpty()) { + return; + } + + // use batch scanner to verify all of these exist in index + BatchScanner indexScanner = conn.createBatchScanner(indexTableName, Authorizations.EMPTY, 3); + ArrayList ranges = new ArrayList<>(); + for (Text row : hashes.keySet()) { + ranges.add(new Range(row)); + } + + indexScanner.setRanges(ranges); + + Map hashes2 = new HashMap<>(); + + for (Entry entry : indexScanner) + hashes2.put(entry.getKey().getRow(), new Text(entry.getValue().get())); + + log.debug("Looked up " + ranges.size() + " ranges, found " + hashes2.size()); + + if (!hashes.equals(hashes2)) { + log.error("uuids from doc table : " + hashes.values()); + log.error("uuids from index : " + hashes2.values()); + throw new Exception("Mismatch between document table and index " + indexTableName + " " + imageTableName); + } + + indexScanner.close(); + + } + +} http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/TableOp.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/TableOp.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/TableOp.java new file mode 100644 index 0000000..1d14a90 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/TableOp.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.core.randomwalk.image; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.Set; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.testing.core.randomwalk.Environment; +import org.apache.accumulo.testing.core.randomwalk.State; +import org.apache.accumulo.testing.core.randomwalk.Test; +import org.apache.hadoop.io.Text; + +public class TableOp extends Test { + + @Override + public void visit(State state, Environment env, Properties props) throws Exception { + + // choose a table + Random rand = new Random(); + String tableName; + if (rand.nextInt(10) < 8) { + tableName = state.getString("imageTableName"); + } else { + tableName = state.getString("indexTableName"); + } + + // check if chosen table exists + Connector conn = env.getConnector(); + TableOperations tableOps = conn.tableOperations(); + if (tableOps.exists(tableName) == false) { + log.error("Table " + tableName + " does not exist!"); + return; + } + + // choose a random action + int num = rand.nextInt(10); + if (num > 6) { + log.debug("Retrieving info for " + tableName); + tableOps.getLocalityGroups(tableName); + tableOps.getProperties(tableName); + tableOps.listSplits(tableName); + tableOps.list(); + } else { + log.debug("Clearing locator cache for " + tableName); + tableOps.clearLocatorCache(tableName); + } + + if (rand.nextInt(10) < 3) { + Map> groups = tableOps.getLocalityGroups(state.getString("imageTableName")); + + if (groups.size() == 0) { + log.debug("Adding locality groups to " + state.getString("imageTableName")); + groups = ImageFixture.getLocalityGroups(); + } else { + log.debug("Removing locality groups from " + state.getString("imageTableName")); + groups = new HashMap<>(); + } + + tableOps.setLocalityGroups(state.getString("imageTableName"), groups); + } + } +}