accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [29/50] [abbrv] accumulo git commit: ACCUMULO-3167 Get mapreduce working against real cluster. Port other tests to AccumuloClusterIT
Date Wed, 26 Nov 2014 04:23:45 GMT
ACCUMULO-3167 Get mapreduce working against real cluster. Port other tests to AccumuloClusterIT

Had to change ExamplesIT to account for ACCUMULO-3364.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ee1694ca
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ee1694ca
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ee1694ca

Branch: refs/heads/master
Commit: ee1694caa4cd329a01a4bf591f20e30fb91e0658
Parents: b130b1d
Author: Josh Elser <elserj@apache.org>
Authored: Mon Nov 24 13:23:44 2014 -0500
Committer: Josh Elser <elserj@apache.org>
Committed: Mon Nov 24 18:09:24 2014 -0500

----------------------------------------------------------------------
 .../standalone/StandaloneClusterControl.java    | 36 +++++++--
 .../accumulo/harness/AccumuloClusterIT.java     |  6 ++
 .../accumulo/test/BulkImportVolumeIT.java       | 37 ++++++---
 .../test/functional/BinaryStressIT.java         | 65 ++++++++++++---
 .../accumulo/test/functional/CleanTmpIT.java    | 55 ++++++++++---
 .../accumulo/test/functional/CompactionIT.java  | 85 +++++++++++++++-----
 .../accumulo/test/functional/ExamplesIT.java    | 33 ++++++--
 7 files changed, 255 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/ee1694ca/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java
----------------------------------------------------------------------
diff --git a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java
b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java
index 29cc1e8..378cb6b 100644
--- a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java
@@ -20,6 +20,8 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
+import java.net.URL;
+import java.security.CodeSource;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -43,14 +45,14 @@ import com.google.common.collect.Maps;
 public class StandaloneClusterControl implements ClusterControl {
   private static final Logger log = LoggerFactory.getLogger(StandaloneClusterControl.class);
 
-  private static final String START_SERVER_SCRIPT = "start-server.sh", ACCUMULO_SCRIPT =
"accumulo";
+  private static final String START_SERVER_SCRIPT = "start-server.sh", ACCUMULO_SCRIPT =
"accumulo", TOOL_SCRIPT = "tool.sh";
   private static final String MASTER_HOSTS_FILE = "masters", GC_HOSTS_FILE = "gc", TSERVER_HOSTS_FILE
= "slaves", TRACER_HOSTS_FILE = "tracers",
       MONITOR_HOSTS_FILE = "monitor";
 
   protected String accumuloHome, accumuloConfDir;
   protected RemoteShellOptions options;
 
-  protected String startServerPath, accumuloPath;
+  protected String startServerPath, accumuloPath, toolPath;
 
   public StandaloneClusterControl() {
     this(System.getenv("ACCUMULO_HOME"), System.getenv("ACCUMULO_CONF_DIR"));
@@ -62,10 +64,9 @@ public class StandaloneClusterControl implements ClusterControl {
     this.accumuloConfDir = accumuloConfDir;
 
     File bin = new File(accumuloHome, "bin");
-    File startServer = new File(bin, "start-server.sh");
-    this.startServerPath = startServer.getAbsolutePath();
-    File accumulo = new File(bin, "accumulo");
-    this.accumuloPath = accumulo.getAbsolutePath();
+    this.startServerPath = new File(bin, START_SERVER_SCRIPT).getAbsolutePath();
+    this.accumuloPath = new File(bin, ACCUMULO_SCRIPT).getAbsolutePath();
+    this.toolPath = new File(bin, TOOL_SCRIPT).getAbsolutePath();
   }
 
   protected Entry<Integer,String> exec(String hostname, String[] command) throws IOException
{
@@ -101,6 +102,29 @@ public class StandaloneClusterControl implements ClusterControl {
     return exec(master, cmd);
   }
 
+  public Entry<Integer,String> execMapreduceWithStdout(Class<?> clz, String[]
args) throws IOException {
+    File confDir = getConfDir();
+    String master = getHosts(new File(confDir, "masters")).get(0);
+    String[] cmd = new String[3 + args.length];
+    cmd[0] = toolPath;
+    CodeSource source = clz.getProtectionDomain().getCodeSource();
+    if (null == source) {
+      throw new RuntimeException("Could not get CodeSource for class");
+    }
+    URL jarUrl = source.getLocation();
+    String jar = jarUrl.getPath();
+    if (!jar.endsWith(".jar")) {
+      throw new RuntimeException("Need to have a jar to run mapreduce: " + jar);
+    }
+    cmd[1] = jar;
+    cmd[2] = clz.getName();
+    for (int i = 0, j = 3; i < args.length; i++, j++) {
+      cmd[j] = "'" + args[i] + "'";
+    }
+    log.info("Running: '{}' on {}", StringUtils.join(cmd, " "), master);
+    return exec(master, cmd);
+  }
+
   @Override
   public void adminStopAll() throws IOException {
     File confDir = getConfDir();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ee1694ca/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java b/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java
index 38b615f..6c496e9 100644
--- a/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java
+++ b/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.harness.conf.AccumuloClusterPropertyConfiguration;
 import org.apache.accumulo.harness.conf.StandaloneAccumuloClusterConfiguration;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.After;
 import org.junit.Assume;
@@ -166,6 +167,11 @@ public abstract class AccumuloClusterIT extends AccumuloIT implements
MiniCluste
     return clusterConf.getToken();
   }
 
+  public static FileSystem getFileSystem() throws IOException {
+    Preconditions.checkState(initialized);
+    return cluster.getFileSystem();
+  }
+
   public Connector getConnector() {
     try {
       return cluster.getConnector(getPrincipal(), getToken());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ee1694ca/test/src/test/java/org/apache/accumulo/test/BulkImportVolumeIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/BulkImportVolumeIT.java b/test/src/test/java/org/apache/accumulo/test/BulkImportVolumeIT.java
index 79a2513..78cdfe6 100644
--- a/test/src/test/java/org/apache/accumulo/test/BulkImportVolumeIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/BulkImportVolumeIT.java
@@ -23,15 +23,19 @@ import java.io.File;
 
 import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.harness.AccumuloClusterIT;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 // ACCUMULO-118/ACCUMULO-2504
-public class BulkImportVolumeIT extends ConfigurableMacIT {
+public class BulkImportVolumeIT extends AccumuloClusterIT {
+  private static final Logger log = LoggerFactory.getLogger(BulkImportVolumeIT.class);
 
   File volDirBase = null;
   Path v1, v2;
@@ -42,7 +46,7 @@ public class BulkImportVolumeIT extends ConfigurableMacIT {
   }
 
   @Override
-  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite)
{
     File baseDir = cfg.getDir();
     volDirBase = new File(baseDir, "volumes");
     File v1f = new File(volDirBase, "v1");
@@ -64,15 +68,26 @@ public class BulkImportVolumeIT extends ConfigurableMacIT {
     String tableName = getUniqueNames(1)[0];
     TableOperations to = getConnector().tableOperations();
     to.create(tableName);
-    File bulk = new File(rootPath() + "/bulk");
-    System.out.println("bulk: " + bulk);
-    assertTrue(bulk.mkdirs());
-    File err = new File(rootPath() + "/err");
-    assertTrue(err.mkdirs());
-    File bogus = new File(bulk + "/bogus.rf");
-    assertTrue(bogus.createNewFile());
+    FileSystem fs = getFileSystem();
+    String rootPath = getUsableDir();
+    Path bulk = new Path(rootPath, "bulk");
+    log.info("bulk: {}", bulk);
+    if (fs.exists(bulk)) {
+      fs.delete(bulk, true);
+    }
+    assertTrue(fs.mkdirs(bulk));
+    Path err = new Path(rootPath, "err");
+    log.info("err: {}", err);
+    if (fs.exists(err)) {
+      fs.delete(err, true);
+    }
+    assertTrue(fs.mkdirs(err));
+    Path bogus = new Path(bulk, "bogus.rf");
+    fs.create(bogus).close();
+    log.info("bogus: {}", bogus);
+    assertTrue(fs.exists(bogus));
     to.importDirectory(tableName, bulk.toString(), err.toString(), false);
-    assertEquals(1, err.list().length);
+    assertEquals(1, fs.listStatus(err).length);
   }
 
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ee1694ca/test/src/test/java/org/apache/accumulo/test/functional/BinaryStressIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/BinaryStressIT.java b/test/src/test/java/org/apache/accumulo/test/functional/BinaryStressIT.java
index 2a086a4..29f1f57 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/BinaryStressIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/BinaryStressIT.java
@@ -19,19 +19,30 @@ package org.apache.accumulo.test.functional;
 import static org.junit.Assert.assertTrue;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.InstanceOperations;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterIT;
+import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
-public class BinaryStressIT extends ConfigurableMacIT {
+public class BinaryStressIT extends AccumuloClusterIT {
 
   @Override
   protected int defaultTimeoutSeconds() {
@@ -39,13 +50,45 @@ public class BinaryStressIT extends ConfigurableMacIT {
   }
 
   @Override
-  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite)
{
     Map<String,String> siteConfig = new HashMap<String,String>();
     siteConfig.put(Property.TSERV_MAXMEM.getKey(), "50K");
     siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "0");
     cfg.setSiteConfig(siteConfig );
   }
 
+  private String majcDelay, maxMem;
+
+  @Before
+  public void alterConfig() throws Exception {
+    if (ClusterType.MINI == getClusterType()) {
+      return;
+    }
+
+    InstanceOperations iops = getConnector().instanceOperations();
+    Map<String,String> conf = iops.getSystemConfiguration();
+    majcDelay = conf.get(Property.TSERV_MAJC_DELAY.getKey());
+    maxMem = conf.get(Property.TSERV_MAXMEM.getKey());
+
+    iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), "0");
+    iops.setProperty(Property.TSERV_MAXMEM.getKey(), "50K");
+
+    getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+    getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+  }
+
+  @After
+  public void resetConfig() throws Exception {
+    if (null != majcDelay) {
+      InstanceOperations iops = getConnector().instanceOperations();
+      iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), majcDelay);
+      iops.setProperty(Property.TSERV_MAXMEM.getKey(), maxMem);
+      
+      getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+      getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+    }
+  }
+
   @Test
   public void binaryStressTest() throws Exception {
     Connector c = getConnector();
@@ -54,9 +97,13 @@ public class BinaryStressIT extends ConfigurableMacIT {
     c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K");
     BinaryIT.runTest(c, tableName);
     String id = c.tableOperations().tableIdMap().get(tableName);
-    FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
-    FileStatus[] dir = fs.listStatus(new Path(cluster.getConfig().getDir() + "/accumulo/tables/"
+ id));
-    assertTrue(dir.length  > 7);
+    Set<Text> tablets = new HashSet<Text>();
+    Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(Range.prefix(id));
+    for (Entry<Key,Value> entry : s) {
+      tablets.add(entry.getKey().getRow());
+    }
+    assertTrue("Expected at least 8 tablets, saw " + tablets.size(), tablets.size() >
7);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ee1694ca/test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java b/test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java
index 676f6d7..1387983 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java
@@ -16,30 +16,44 @@
  */
 package org.apache.accumulo.test.functional;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterIT;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.minicluster.impl.ProcessReference;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class CleanTmpIT extends ConfigurableMacIT {
+import com.google.common.collect.Iterables;
+
+public class CleanTmpIT extends AccumuloClusterIT {
+  private static final Logger log = LoggerFactory.getLogger(CleanTmpIT.class);
 
   @Override
-  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite)
{
     Map<String,String> props = new HashMap<String,String>();
     props.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "3s");
     cfg.setSiteConfig(props);
@@ -63,20 +77,39 @@ public class CleanTmpIT extends ConfigurableMacIT {
     Mutation m = new Mutation("row");
     m.put("cf", "cq", "value");
     bw.addMutation(m);
+    bw.flush();
+
+    // Compact memory to make a file
+    c.tableOperations().compact(tableName, null, null, true, true);
+
+    // Make sure that we'll have a WAL
+    m = new Mutation("row2");
+    m.put("cf", "cq", "value");
+    bw.addMutation(m);
     bw.close();
 
     // create a fake _tmp file in its directory
     String id = c.tableOperations().tableIdMap().get(tableName);
-    FileSystem fs = getCluster().getFileSystem();
-    Path tmp = new Path("/accumulo/tables/" + id + "/default_tablet/junk.rf_tmp");
+    Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(Range.prefix(id));
+    s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+    Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+    Path file = new Path(entry.getKey().getColumnQualifier().toString());
+
+    FileSystem fs = getFileSystem();
+    assertTrue("Could not find file: " + file, fs.exists(file));
+    Path tabletDir = file.getParent();
+    assertNotNull("Tablet dir should not be null", tabletDir);
+    Path tmp = new Path(tabletDir, "junk.rf_tmp");
+    // Make the file
     fs.create(tmp).close();
-    for (ProcessReference tserver : getCluster().getProcesses().get(ServerType.TABLET_SERVER))
{
-      getCluster().killProcess(ServerType.TABLET_SERVER, tserver);
-    }
-    getCluster().start();
+    log.info("Created tmp file {}", tmp.toString());
+    getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+    getClusterControl().startAllServers(ServerType.TABLET_SERVER);
 
     Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY);
-    FunctionalTestUtils.count(scanner);
-    assertFalse(fs.exists(tmp));
+    assertEquals(2, FunctionalTestUtils.count(scanner));
+    // If we performed log recovery, we should have cleaned up any stray files
+    assertFalse("File still exists: " + tmp, fs.exists(tmp));
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ee1694ca/test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java
index b659913..dc877dc 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -16,11 +16,9 @@
  */
 package org.apache.accumulo.test.functional;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -30,27 +28,29 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.accumulo.core.cli.ScannerOpts;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.InstanceOperations;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.harness.AccumuloClusterIT;
+import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.util.Admin;
 import org.apache.accumulo.test.VerifyIngest;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.junit.Rule;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class CompactionIT extends ConfigurableMacIT {
-
-  @Rule
-  public TemporaryFolder folder = new TemporaryFolder(new File(System.getProperty("user.dir")
+ "/target"));
+public class CompactionIT extends AccumuloClusterIT {
+  private static final Logger log = LoggerFactory.getLogger(CompactionIT.class);
 
   @Override
-  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite)
{
     Map<String,String> map = new HashMap<String,String>();
     map.put(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "4");
     map.put(Property.TSERV_MAJC_DELAY.getKey(), "1");
@@ -63,14 +63,52 @@ public class CompactionIT extends ConfigurableMacIT {
     return 4 * 60;
   }
 
+  private String majcThreadMaxOpen, majcDelay, majcMaxConcurrent;
+
+  @Before
+  public void alterConfig() throws Exception {
+    if (ClusterType.STANDALONE == getClusterType()) {
+      InstanceOperations iops = getConnector().instanceOperations();
+      Map<String,String> config = iops.getSystemConfiguration();
+      majcThreadMaxOpen = config.get(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey());
+      majcDelay = config.get(Property.TSERV_MAJC_DELAY.getKey());
+      majcMaxConcurrent = config.get(Property.TSERV_MAJC_MAXCONCURRENT.getKey());
+
+      iops.setProperty(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "4");
+      iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), "1");
+      iops.setProperty(Property.TSERV_MAJC_MAXCONCURRENT.getKey(), "1");
+
+      getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+      getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+    }
+  }
+
+  @After
+  public void resetConfig() throws Exception {
+    // We set the values..
+    if (null != majcThreadMaxOpen) {
+      InstanceOperations iops = getConnector().instanceOperations();
+
+      iops.setProperty(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), majcThreadMaxOpen);
+      iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), majcDelay);
+      iops.setProperty(Property.TSERV_MAJC_MAXCONCURRENT.getKey(), majcMaxConcurrent);
+
+      getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+      getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+    }
+  }
+
   @Test
   public void test() throws Exception {
     final Connector c = getConnector();
-    c.tableOperations().create("test_ingest");
-    c.tableOperations().setProperty("test_ingest", Property.TABLE_MAJC_RATIO.getKey(), "1.0");
-    FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
-    FunctionalTestUtils.createRFiles(c, fs, folder.getRoot() + "/testrf", 500000, 59, 4);
-    FunctionalTestUtils.bulkImport(c, fs, "test_ingest", folder.getRoot() + "/testrf");
+    final String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1.0");
+    FileSystem fs = getFileSystem();
+    String root = getUsableDir();
+    Path testrf = new Path(root, "testrf");
+    FunctionalTestUtils.createRFiles(c, fs, testrf.toString(), 500000, 59, 4);
+    FunctionalTestUtils.bulkImport(c, fs, tableName, testrf.toString());
     int beforeCount = countFiles(c);
 
     final AtomicBoolean fail = new AtomicBoolean(false);
@@ -89,8 +127,10 @@ public class CompactionIT extends ConfigurableMacIT {
               opts.random = 56;
               opts.dataSize = 50;
               opts.cols = 1;
+              opts.tableName = tableName;
               VerifyIngest.verifyIngest(c, opts, new ScannerOpts());
             } catch (Exception ex) {
+              log.warn("Got exception verifying data", ex);
               fail.set(true);
             }
           }
@@ -100,12 +140,21 @@ public class CompactionIT extends ConfigurableMacIT {
       }
       for (Thread t : threads)
         t.join();
-      assertFalse(fail.get());
+      assertFalse("Failed to successfully run all threads, Check the test output for error",
fail.get());
     }
 
     int finalCount = countFiles(c);
     assertTrue(finalCount < beforeCount);
-    assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+    try {
+      getClusterControl().adminStopAll();
+    } finally {
+      // Make sure the internal state in the cluster is reset (e.g. processes in MAC)
+      getCluster().stop();
+      if (ClusterType.STANDALONE == getClusterType()) {
+        // Then restart things for the next test if it's a standalone
+        getCluster().start();
+      }
+    }
   }
 
   private int countFiles(Connector c) throws Exception {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ee1694ca/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java
index 047e69d..fbaf243 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java
@@ -31,6 +31,7 @@ import java.util.Map.Entry;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.accumulo.cluster.standalone.StandaloneClusterControl;
 import org.apache.accumulo.core.cli.BatchWriterOpts;
 import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.BatchWriter;
@@ -86,6 +87,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.Tool;
 import org.apache.log4j.Logger;
 import org.junit.After;
 import org.junit.Assert;
@@ -243,8 +245,11 @@ public class ExamplesIT extends AccumuloClusterIT {
 
     c.tableOperations().attachIterator(table, is);
     bw = c.createBatchWriter(table, bwc);
+    // Write two mutations otherwise the NativeMap would dedupe them into a single update
     Mutation m = new Mutation("foo");
     m.put("a", "b", "1");
+    bw.addMutation(m);
+    m = new Mutation("foo");
     m.put("a", "b", "3");
     bw.addMutation(m);
     bw.flush();
@@ -349,12 +354,21 @@ public class ExamplesIT extends AccumuloClusterIT {
   @Test
   public void testTeraSortAndRead() throws Exception {
     String tableName = getUniqueNames(1)[0];
-    goodExec(TeraSortIngest.class, "--count", (1000 * 1000) + "", "-nk", "10", "-xk", "10",
"-nv", "10", "-xv", "10", "-t", tableName, "-i", instance, "-z",
+    goodExec(TeraSortIngest.class, "--count", (1000 * 1000) + "", "-nk", "10", "-xk", "10",
"-nv", "10", "-xv", "10", "-t", tableName, "-i", instance,
+        "-z",
         keepers, "-u", user, "-p", passwd, "--splits", "4");
-    goodExec(RegexExample.class, "-i", instance, "-z", keepers, "-u", user, "-p", passwd,
"-t", tableName, "--rowRegex", ".*999.*", "--output", dir
-        + "/tmp/nines");
+    Path output = new Path(dir, "tmp/nines");
+    if (fs.exists(output)) {
+      fs.delete(output, true);
+    }
+    goodExec(RegexExample.class, "-i", instance, "-z", keepers, "-u", user, "-p", passwd,
"-t", tableName, "--rowRegex", ".*999.*", "--output",
+        output.toString());
     goodExec(RowHash.class, "-i", instance, "-z", keepers, "-u", user, "-p", passwd, "-t",
tableName, "--column", "c:");
-    goodExec(TableToFile.class, "-i", instance, "-z", keepers, "-u", user, "-p", passwd,
"-t", tableName, "--output", dir + "/tmp/tableFile");
+    output = new Path(dir, "tmp/tableFile");
+    if (fs.exists(output)) {
+      fs.delete(output, true);
+    }
+    goodExec(TableToFile.class, "-i", instance, "-z", keepers, "-u", user, "-p", passwd,
"-t", tableName, "--output", output.toString());
   }
 
   @Test
@@ -422,9 +436,14 @@ public class ExamplesIT extends AccumuloClusterIT {
   }
 
   private void goodExec(Class<?> theClass, String... args) throws InterruptedException,
IOException {
-    // We're already slurping stdout into memory (not redirecting to file). Might as well
add it to error message.
-    Entry<Integer,String> pair = getClusterControl().execWithStdout(theClass, args);
+    Entry<Integer,String> pair;
+    if (Tool.class.isAssignableFrom(theClass) && ClusterType.STANDALONE == getClusterType())
{
+      StandaloneClusterControl control = (StandaloneClusterControl) getClusterControl();
+      pair = control.execMapreduceWithStdout(theClass, args);
+    } else {
+      // We're already slurping stdout into memory (not redirecting to file). Might as well
add it to error message.
+      pair = getClusterControl().execWithStdout(theClass, args);
+    }
     Assert.assertEquals("stdout=" + pair.getValue(), 0, pair.getKey().intValue());
   }
-
 }


Mime
View raw message