accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [3/4] accumulo git commit: Merge branch '1.6'
Date Thu, 11 Dec 2014 01:14:14 GMT
Merge branch '1.6'

Conflicts:
	test/src/test/java/org/apache/accumulo/test/functional/AbstractMacIT.java
	test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java
	test/src/test/java/org/apache/accumulo/test/functional/SimpleMacIT.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6e16d55f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6e16d55f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6e16d55f

Branch: refs/heads/master
Commit: 6e16d55f1e13177a2c59ef73a076e8f41d8a695b
Parents: 524a813 0532b62
Author: Christopher Tubbs <ctubbsii@apache.org>
Authored: Wed Dec 10 18:52:24 2014 -0500
Committer: Christopher Tubbs <ctubbsii@apache.org>
Committed: Wed Dec 10 18:52:24 2014 -0500

----------------------------------------------------------------------
 .../accumulo/harness/AccumuloClusterIT.java     |   2 +-
 .../accumulo/harness/SharedMiniClusterIT.java   |   4 +-
 .../test/ArbitraryTablePropertiesIT.java        |   5 +-
 .../apache/accumulo/test/ImportExportIT.java    |   5 +-
 .../accumulo/test/SplitCancelsMajCIT.java       |   5 +-
 .../accumulo/test/functional/AbstractMacIT.java |  93 ------------
 .../test/functional/ConfigurableMacIT.java      |  79 ++++++++---
 .../functional/DeletedTablesDontFlushIT.java    |   4 +-
 .../test/functional/HalfDeadTServerIT.java      |   3 +-
 .../accumulo/test/functional/SimpleMacIT.java   | 142 +------------------
 .../test/replication/CyclicReplicationIT.java   |   4 +-
 ...bageCollectorCommunicatesWithTServersIT.java |   5 +-
 .../test/replication/StatusCombinerMacIT.java   |   5 +-
 13 files changed, 84 insertions(+), 272 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e16d55f/test/src/test/java/org/apache/accumulo/test/ArbitraryTablePropertiesIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/ArbitraryTablePropertiesIT.java
index 7cc136f,0000000..aa5c164
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/ArbitraryTablePropertiesIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/ArbitraryTablePropertiesIT.java
@@@ -1,198 -1,0 +1,197 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test;
 +
 +import java.util.Map.Entry;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.security.TablePermission;
- import org.apache.accumulo.test.functional.SimpleMacIT;
++import org.apache.accumulo.harness.SharedMiniClusterIT;
 +import org.apache.log4j.Logger;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
- @SuppressWarnings("deprecation")
- public class ArbitraryTablePropertiesIT extends SimpleMacIT {
++public class ArbitraryTablePropertiesIT extends SharedMiniClusterIT {
 +  private static final Logger log = Logger.getLogger(ArbitraryTablePropertiesIT.class);
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 30;
 +  };
 +
 +  // Test set, get, and remove arbitrary table properties on the root account
 +  @Test
 +  public void setGetRemoveTablePropertyRoot() throws Exception {
 +    log.debug("Starting setGetRemoveTablePropertyRoot test ------------------------");
 +
 +    // make a table
 +    final String tableName = getUniqueNames(1)[0];
 +    final Connector conn = getConnector();
 +    conn.tableOperations().create(tableName);
 +
 +    // Set variables for the property name to use and the initial value
 +    String propertyName = "table.custom.description";
 +    String description1 = "Description";
 +
 +    // Make sure the property name is valid
 +    Assert.assertTrue(Property.isValidPropertyKey(propertyName));
 +    // Set the property to the desired value
 +    conn.tableOperations().setProperty(tableName, propertyName, description1);
 +
 +    // Loop through properties to make sure the new property is added to the list
 +    int count = 0;
 +    for (Entry<String,String> property : conn.tableOperations().getProperties(tableName)) {
 +      if (property.getKey().equals(propertyName) && property.getValue().equals(description1))
 +        count++;
 +    }
 +    Assert.assertEquals(count, 1);
 +
 +    // Set the property as something different
 +    String description2 = "set second";
 +    conn.tableOperations().setProperty(tableName, propertyName, description2);
 +
 +    // / Loop through properties to make sure the new property is added to the list
 +    count = 0;
 +    for (Entry<String,String> property : conn.tableOperations().getProperties(tableName)) {
 +      if (property.getKey().equals(propertyName) && property.getValue().equals(description2))
 +        count++;
 +    }
 +    Assert.assertEquals(count, 1);
 +
 +    // Remove the property and make sure there is no longer a value associated with it
 +    conn.tableOperations().removeProperty(tableName, propertyName);
 +
 +    // / Loop through properties to make sure the new property is added to the list
 +    count = 0;
 +    for (Entry<String,String> property : conn.tableOperations().getProperties(tableName)) {
 +      if (property.getKey().equals(propertyName))
 +        count++;
 +    }
 +    Assert.assertEquals(count, 0);
 +  }
 +
 +  // Tests set, get, and remove of user added arbitrary properties using a non-root account with permissions to alter tables
 +  @Test
 +  public void userSetGetRemoveTablePropertyWithPermission() throws Exception {
 +    log.debug("Starting userSetGetRemoveTablePropertyWithPermission test ------------------------");
 +
 +    // Make a test username and password
 +    String testUser = makeUserName();
 +    PasswordToken testPasswd = new PasswordToken("test_password");
 +
 +    // Create a root user and create the table
 +    // Create a test user and grant that user permission to alter the table
 +    final String tableName = getUniqueNames(1)[0];
 +    final Connector c = getConnector();
 +    c.securityOperations().createLocalUser(testUser, testPasswd);
 +    Connector conn = c.getInstance().getConnector(testUser, testPasswd);
 +    c.tableOperations().create(tableName);
 +    c.securityOperations().grantTablePermission(testUser, tableName, TablePermission.ALTER_TABLE);
 +
 +    // Set variables for the property name to use and the initial value
 +    String propertyName = "table.custom.description";
 +    String description1 = "Description";
 +
 +    // Make sure the property name is valid
 +    Assert.assertTrue(Property.isValidPropertyKey(propertyName));
 +    // Set the property to the desired value
 +    conn.tableOperations().setProperty(tableName, propertyName, description1);
 +
 +    // Loop through properties to make sure the new property is added to the list
 +    int count = 0;
 +    for (Entry<String,String> property : conn.tableOperations().getProperties(tableName)) {
 +      if (property.getKey().equals(propertyName) && property.getValue().equals(description1))
 +        count++;
 +    }
 +    Assert.assertEquals(count, 1);
 +
 +    // Set the property as something different
 +    String description2 = "set second";
 +    conn.tableOperations().setProperty(tableName, propertyName, description2);
 +
 +    // / Loop through properties to make sure the new property is added to the list
 +    count = 0;
 +    for (Entry<String,String> property : conn.tableOperations().getProperties(tableName)) {
 +      if (property.getKey().equals(propertyName) && property.getValue().equals(description2))
 +        count++;
 +    }
 +    Assert.assertEquals(count, 1);
 +
 +    // Remove the property and make sure there is no longer a value associated with it
 +    conn.tableOperations().removeProperty(tableName, propertyName);
 +
 +    // / Loop through properties to make sure the new property is added to the list
 +    count = 0;
 +    for (Entry<String,String> property : conn.tableOperations().getProperties(tableName)) {
 +      if (property.getKey().equals(propertyName))
 +        count++;
 +    }
 +    Assert.assertEquals(count, 0);
 +
 +  }
 +
 +  // Tests set and get of user added arbitrary properties using a non-root account without permissions to alter tables
 +  @Test
 +  public void userSetGetTablePropertyWithoutPermission() throws Exception {
 +    log.debug("Starting userSetGetTablePropertyWithoutPermission test ------------------------");
 +
 +    // Make a test username and password
 +    String testUser = makeUserName();
 +    PasswordToken testPasswd = new PasswordToken("test_password");
 +
 +    // Create a root user and create the table
 +    // Create a test user and grant that user permission to alter the table
 +    final String tableName = getUniqueNames(1)[0];
 +    final Connector c = getConnector();
 +    c.securityOperations().createLocalUser(testUser, testPasswd);
 +    Connector conn = c.getInstance().getConnector(testUser, testPasswd);
 +    c.tableOperations().create(tableName);
 +
 +    // Set variables for the property name to use and the initial value
 +    String propertyName = "table.custom.description";
 +    String description1 = "Description";
 +
 +    // Make sure the property name is valid
 +    Assert.assertTrue(Property.isValidPropertyKey(propertyName));
 +
 +    // Try to set the property to the desired value.
 +    // If able to set it, the test fails, since permission was never granted
 +    try {
 +      conn.tableOperations().setProperty(tableName, propertyName, description1);
 +      Assert.fail("Was able to set property without permissions");
 +    } catch (AccumuloSecurityException e) {}
 +
 +    // Loop through properties to make sure the new property is not added to the list
 +    int count = 0;
 +    for (Entry<String,String> property : conn.tableOperations().getProperties(tableName)) {
 +      if (property.getKey().equals(propertyName))
 +        count++;
 +    }
 +    Assert.assertEquals(count, 0);
 +  }
 +
 +  static AtomicInteger userId = new AtomicInteger(0);
 +
 +  static String makeUserName() {
 +    return "user_" + userId.getAndIncrement();
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e16d55f/test/src/test/java/org/apache/accumulo/test/SplitCancelsMajCIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/SplitCancelsMajCIT.java
index 4d8855d,0000000..53354b1
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/SplitCancelsMajCIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/SplitCancelsMajCIT.java
@@@ -1,85 -1,0 +1,84 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test;
 +
 +import static org.junit.Assert.assertTrue;
 +
 +import java.util.EnumSet;
 +import java.util.SortedSet;
 +import java.util.TreeSet;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 +import org.apache.accumulo.core.util.UtilWaitThread;
- import org.apache.accumulo.test.functional.SimpleMacIT;
++import org.apache.accumulo.harness.SharedMiniClusterIT;
 +import org.apache.accumulo.test.functional.SlowIterator;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Test;
 +
 +// ACCUMULO-2862
- @SuppressWarnings("deprecation")
- public class SplitCancelsMajCIT extends SimpleMacIT{
++public class SplitCancelsMajCIT extends SharedMiniClusterIT {
 +
 +  @Test(timeout = 2 * 60 * 1000)
 +  public void test() throws Exception {
 +    final String tableName = getUniqueNames(1)[0];
 +    final Connector c = getConnector();
 +    c.tableOperations().create(tableName);
 +    // majc should take 100 * .5 secs
 +    IteratorSetting it = new IteratorSetting(100, SlowIterator.class);
 +    SlowIterator.setSleepTime(it, 500);
 +    c.tableOperations().attachIterator(tableName, it, EnumSet.of(IteratorScope.majc));
 +    BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
 +    for (int i = 0; i < 100; i++) {
 +      Mutation m = new Mutation("" + i);
 +      m.put("", "", new Value());
 +      bw.addMutation(m);
 +    }
 +    bw.flush();
 +    // start majc
 +    final AtomicReference<Exception> ex = new AtomicReference<Exception>();
 +    Thread thread = new Thread() {
 +      @Override
 +      public void run() {
 +        try {
 +          c.tableOperations().compact(tableName, null, null, true, true);
 +        } catch (Exception e) {
 +          ex.set(e);
 +        }
 +      }
 +    };
 +    thread.start();
 +
 +    long now = System.currentTimeMillis();
 +    UtilWaitThread.sleep(10 * 1000);
 +    // split the table, interrupts the compaction
 +    SortedSet<Text> partitionKeys = new TreeSet<Text>();
 +    partitionKeys.add(new Text("10"));
 +    c.tableOperations().addSplits(tableName, partitionKeys);
 +    thread.join();
 +    // wait for the restarted compaction
 +    assertTrue(System.currentTimeMillis() - now > 59 * 1000);
 +    if (ex.get() != null)
 +      throw ex.get();
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e16d55f/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java
index e2f08cd,1c8b1f0..a921538
--- a/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java
@@@ -34,21 -37,65 +38,66 @@@ import org.apache.accumulo.minicluster.
  import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
  import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
  import org.apache.accumulo.minicluster.impl.ZooKeeperBindException;
+ import org.apache.accumulo.test.util.CertUtils;
 +import org.apache.commons.configuration.PropertiesConfiguration;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.log4j.Logger;
  import org.apache.zookeeper.KeeperException;
  import org.junit.After;
  import org.junit.Before;
  
- public class ConfigurableMacIT extends AbstractMacIT {
-   protected static final Logger log = Logger.getLogger(ConfigurableMacIT.class);
+ /**
+  * General Integration-Test base class that provides access to a {@link MiniAccumuloCluster} for testing. Tests using these typically do very disruptive things
+  * to the instance, and require specific configuration. Most tests don't need this level of control and should extend {@link AccumuloClusterIT} instead.
+  */
+ public class ConfigurableMacIT extends AccumuloIT {
+   public static final Logger log = Logger.getLogger(ConfigurableMacIT.class);
+ 
+   protected MiniAccumuloClusterImpl cluster;
+ 
+   protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {}
+ 
+   protected void beforeClusterStart(MiniAccumuloConfigImpl cfg) throws Exception {}
  
-   public MiniAccumuloClusterImpl cluster;
+   protected static final String ROOT_PASSWORD = "testRootPassword1";
  
-   public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {}
 -  private static void configureForEnvironment(MiniAccumuloConfigImpl cfg, Class<?> testClass, File folder) {
++  public static void configureForEnvironment(MiniAccumuloConfigImpl cfg, Class<?> testClass, File folder) {
+     if ("true".equals(System.getProperty("org.apache.accumulo.test.functional.useSslForIT"))) {
+       configureForSsl(cfg, folder);
+     }
+     if ("true".equals(System.getProperty("org.apache.accumulo.test.functional.useCredProviderForIT"))) {
+       cfg.setUseCredentialProvider(true);
+     }
+   }
  
-   public void beforeClusterStart(MiniAccumuloConfigImpl cfg) throws Exception {}
+   protected static void configureForSsl(MiniAccumuloConfigImpl cfg, File folder) {
+     Map<String,String> siteConfig = cfg.getSiteConfig();
+     if ("true".equals(siteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey()))) {
+       // already enabled; don't mess with it
+       return;
+     }
+ 
+     File sslDir = new File(folder, "ssl");
+     sslDir.mkdirs();
+     File rootKeystoreFile = new File(sslDir, "root-" + cfg.getInstanceName() + ".jks");
+     File localKeystoreFile = new File(sslDir, "local-" + cfg.getInstanceName() + ".jks");
+     File publicTruststoreFile = new File(sslDir, "public-" + cfg.getInstanceName() + ".jks");
+     final String rootKeystorePassword = "root_keystore_password", truststorePassword = "truststore_password";
+     try {
+       new CertUtils(Property.RPC_SSL_KEYSTORE_TYPE.getDefaultValue(), "o=Apache Accumulo,cn=MiniAccumuloCluster", "RSA", 2048, "sha1WithRSAEncryption")
+           .createAll(rootKeystoreFile, localKeystoreFile, publicTruststoreFile, cfg.getInstanceName(), rootKeystorePassword, cfg.getRootPassword(),
+               truststorePassword);
+     } catch (Exception e) {
+       throw new RuntimeException("error creating MAC keystore", e);
+     }
+ 
+     siteConfig.put(Property.INSTANCE_RPC_SSL_ENABLED.getKey(), "true");
+     siteConfig.put(Property.RPC_SSL_KEYSTORE_PATH.getKey(), localKeystoreFile.getAbsolutePath());
+     siteConfig.put(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey(), cfg.getRootPassword());
+     siteConfig.put(Property.RPC_SSL_TRUSTSTORE_PATH.getKey(), publicTruststoreFile.getAbsolutePath());
+     siteConfig.put(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey(), truststorePassword);
+     cfg.setSiteConfig(siteConfig);
+   }
  
    @Before
    public void setUp() throws Exception {
@@@ -118,9 -162,4 +164,8 @@@
      return MonitorUtil.getLocation(instance);
    }
  
-   @Override
 +  protected ClientConfiguration getClientConfig() throws Exception {
 +    return new ClientConfiguration(new PropertiesConfiguration(getCluster().getConfig().getClientConfFile()));
 +  }
 +
  }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e16d55f/test/src/test/java/org/apache/accumulo/test/functional/DeletedTablesDontFlushIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/DeletedTablesDontFlushIT.java
index d342bc2,0000000..97c696e
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/DeletedTablesDontFlushIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/DeletedTablesDontFlushIT.java
@@@ -1,57 -1,0 +1,57 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import java.util.EnumSet;
 +
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 +import org.apache.accumulo.fate.util.UtilWaitThread;
++import org.apache.accumulo.harness.SharedMiniClusterIT;
 +import org.junit.Test;
 +
 +// ACCUMULO-2880
- @SuppressWarnings("deprecation")
- public class DeletedTablesDontFlushIT extends SimpleMacIT {
++public class DeletedTablesDontFlushIT extends SharedMiniClusterIT {
 +
 +  @Test(timeout = 60 * 1000)
 +  public void test() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    c.tableOperations().create(tableName);
 +    IteratorSetting setting = new IteratorSetting(100, SlowIterator.class);
 +    SlowIterator.setSleepTime(setting, 1000);
 +    c.tableOperations().attachIterator(tableName, setting, EnumSet.of(IteratorScope.minc));
 +    // let the configuration change propagate through zookeeper
 +    UtilWaitThread.sleep(1000);
 +
 +    Mutation m = new Mutation("xyzzy");
 +    for (int i = 0; i < 100; i++) {
 +      m.put("cf", "" + i, new Value(new byte[]{}));
 +    }
 +    BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
 +    bw.addMutation(m);
 +    bw.close();
 +    // should go fast
 +    c.tableOperations().delete(tableName);
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e16d55f/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
index a85f9fb,0000000..8520f66
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
@@@ -1,331 -1,0 +1,331 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.replication;
 +
 +import java.io.BufferedOutputStream;
 +import java.io.File;
 +import java.io.FileOutputStream;
 +import java.io.OutputStream;
 +import java.util.HashMap;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.LongCombiner.Type;
 +import org.apache.accumulo.core.iterators.user.SummingCombiner;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.TablePermission;
 +import org.apache.accumulo.minicluster.ServerType;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.minicluster.impl.ProcessReference;
 +import org.apache.accumulo.minicluster.impl.ZooKeeperBindException;
- import org.apache.accumulo.test.functional.AbstractMacIT;
++import org.apache.accumulo.test.functional.ConfigurableMacIT;
 +import org.apache.accumulo.tserver.TabletServer;
 +import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
 +import org.apache.commons.io.FileUtils;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.RawLocalFileSystem;
 +import org.junit.Assert;
 +import org.junit.Rule;
 +import org.junit.Test;
 +import org.junit.rules.TestName;
 +import org.junit.rules.Timeout;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.collect.Iterables;
 +
 +/**
 + *
 + */
 +public class CyclicReplicationIT {
 +  private static final Logger log = LoggerFactory.getLogger(CyclicReplicationIT.class);
 +
 +  @Rule
 +  public Timeout getTimeout() {
 +    int scalingFactor = 1;
 +    try {
 +      scalingFactor = Integer.parseInt(System.getProperty("timeout.factor"));
 +    } catch (NumberFormatException exception) {
 +      log.warn("Could not parse timeout.factor, not scaling timeout");
 +    }
 +
 +    return new Timeout(scalingFactor * 5 * 60 * 1000);
 +  }
 +
 +  @Rule
 +  public TestName testName = new TestName();
 +
 +  private File createTestDir(String name) {
 +    File baseDir = new File(System.getProperty("user.dir") + "/target/mini-tests");
 +    baseDir.mkdirs();
 +    File testDir = new File(baseDir, this.getClass().getName() + "_" + testName.getMethodName() + "_" + name);
 +    FileUtils.deleteQuietly(testDir);
 +    testDir.mkdir();
 +    return testDir;
 +  }
 +
 +  private void setCoreSite(MiniAccumuloClusterImpl cluster) throws Exception {
 +    File csFile = new File(cluster.getConfig().getConfDir(), "core-site.xml");
 +    if (csFile.exists())
 +      throw new RuntimeException(csFile + " already exist");
 +
 +    Configuration coreSite = new Configuration(false);
 +    coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
 +    OutputStream out = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "core-site.xml")));
 +    coreSite.writeXml(out);
 +    out.close();
 +  }
 +
 +  /**
 +   * Use the same SSL and credential provider configuration that is set up by AbstractMacIT for the other MAC used for replication
 +   */
 +  private void updatePeerConfigFromPrimary(MiniAccumuloConfigImpl primaryCfg, MiniAccumuloConfigImpl peerCfg) {
 +    // Set the same SSL information from the primary when present
 +    Map<String,String> primarySiteConfig = primaryCfg.getSiteConfig();
 +    if ("true".equals(primarySiteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey()))) {
 +      Map<String,String> peerSiteConfig = new HashMap<String,String>();
 +      peerSiteConfig.put(Property.INSTANCE_RPC_SSL_ENABLED.getKey(), "true");
 +      String keystorePath = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PATH.getKey());
 +      Assert.assertNotNull("Keystore Path was null", keystorePath);
 +      peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PATH.getKey(), keystorePath);
 +      String truststorePath = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PATH.getKey());
 +      Assert.assertNotNull("Truststore Path was null", truststorePath);
 +      peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PATH.getKey(), truststorePath);
 +
 +      // Passwords might be stored in CredentialProvider
 +      String keystorePassword = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey());
 +      if (null != keystorePassword) {
 +        peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey(), keystorePassword);
 +      }
 +      String truststorePassword = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey());
 +      if (null != truststorePassword) {
 +        peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey(), truststorePassword);
 +      }
 +
 +      System.out.println("Setting site configuration for peer " + peerSiteConfig);
 +      peerCfg.setSiteConfig(peerSiteConfig);
 +    }
 +
 +    // Use the CredentialProvider if the primary also uses one
 +    String credProvider = primarySiteConfig.get(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey());
 +    if (null != credProvider) {
 +      Map<String,String> peerSiteConfig = peerCfg.getSiteConfig();
 +      peerSiteConfig.put(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey(), credProvider);
 +      peerCfg.setSiteConfig(peerSiteConfig);
 +    }
 +  }
 +
 +  @Test
 +  public void dataIsNotOverReplicated() throws Exception {
 +    File master1Dir = createTestDir("master1"), master2Dir = createTestDir("master2");
 +    String password = "password";
 +
 +    MiniAccumuloConfigImpl master1Cfg;
 +    MiniAccumuloClusterImpl master1Cluster;
 +    while (true) {
 +      master1Cfg= new MiniAccumuloConfigImpl(master1Dir, password);
 +      master1Cfg.setNumTservers(1);
 +      master1Cfg.setInstanceName("master1");
 +
 +      // Set up SSL if needed
-       AbstractMacIT.configureForEnvironment(master1Cfg, this.getClass(), AbstractMacIT.createSharedTestDir(this.getClass().getName() + "-ssl"));
++      ConfigurableMacIT.configureForEnvironment(master1Cfg, this.getClass(), ConfigurableMacIT.createSharedTestDir(this.getClass().getName() + "-ssl"));
 +
 +      master1Cfg.setProperty(Property.REPLICATION_NAME, master1Cfg.getInstanceName());
 +      master1Cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
 +      master1Cfg.setProperty(Property.REPLICATION_THREADCHECK, "5m");
 +      master1Cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
 +      master1Cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
 +      master1Cluster = new MiniAccumuloClusterImpl(master1Cfg);
 +      setCoreSite(master1Cluster);
 +
 +      try {
 +        master1Cluster.start();
 +        break;
 +      } catch (ZooKeeperBindException e) {
 +        log.warn("Failed to start ZooKeeper on " + master1Cfg.getZooKeeperPort() + ", will retry");
 +      }
 +    }
 +
 +    MiniAccumuloConfigImpl master2Cfg;
 +    MiniAccumuloClusterImpl master2Cluster;
 +    while (true) {
 +      master2Cfg = new MiniAccumuloConfigImpl(master2Dir, password);
 +      master2Cfg.setNumTservers(1);
 +      master2Cfg.setInstanceName("master2");
 +
 +      // Set up SSL if needed. Need to share the same SSL truststore as master1
 +      this.updatePeerConfigFromPrimary(master1Cfg, master2Cfg);
 +
 +      master2Cfg.setProperty(Property.REPLICATION_NAME, master2Cfg.getInstanceName());
 +      master2Cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
 +      master2Cfg.setProperty(Property.REPLICATION_THREADCHECK, "5m");
 +      master2Cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
 +      master2Cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
 +      master2Cluster = new MiniAccumuloClusterImpl(master2Cfg);
 +      setCoreSite(master2Cluster);
 +
 +      try {
 +        master2Cluster.start();
 +        break;
 +      } catch (ZooKeeperBindException e) {
 +        log.warn("Failed to start ZooKeeper on " + master2Cfg.getZooKeeperPort() + ", will retry");
 +      }
 +    }
 +
 +    try {
 +      Connector connMaster1 = master1Cluster.getConnector("root", new PasswordToken(password)), connMaster2 = master2Cluster.getConnector("root",
 +          new PasswordToken(password));
 +
 +      String master1UserName = "master1", master1Password = "foo";
 +      String master2UserName = "master2", master2Password = "bar";
 +      String master1Table = master1Cluster.getInstanceName(), master2Table = master2Cluster.getInstanceName();
 +
 +      connMaster1.securityOperations().createLocalUser(master1UserName, new PasswordToken(master1Password));
 +      connMaster2.securityOperations().createLocalUser(master2UserName, new PasswordToken(master2Password));
 +
 +      // Configure the credentials we should use to authenticate ourselves to the peer for replication
 +      connMaster1.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + master2Cluster.getInstanceName(), master2UserName);
 +      connMaster1.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + master2Cluster.getInstanceName(), master2Password);
 +
 +      connMaster2.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + master1Cluster.getInstanceName(), master1UserName);
 +      connMaster2.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + master1Cluster.getInstanceName(), master1Password);
 +
 +      connMaster1.instanceOperations().setProperty(
 +          Property.REPLICATION_PEERS.getKey() + master2Cluster.getInstanceName(),
 +          ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
 +              AccumuloReplicaSystem.buildConfiguration(master2Cluster.getInstanceName(), master2Cluster.getZooKeepers())));
 +
 +      connMaster2.instanceOperations().setProperty(
 +          Property.REPLICATION_PEERS.getKey() + master1Cluster.getInstanceName(),
 +          ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
 +              AccumuloReplicaSystem.buildConfiguration(master1Cluster.getInstanceName(), master1Cluster.getZooKeepers())));
 +
 +      connMaster1.tableOperations().create(master1Table, false);
 +      String master1TableId = connMaster1.tableOperations().tableIdMap().get(master1Table);
 +      Assert.assertNotNull(master1TableId);
 +
 +      connMaster2.tableOperations().create(master2Table, false);
 +      String master2TableId = connMaster2.tableOperations().tableIdMap().get(master2Table);
 +      Assert.assertNotNull(master2TableId);
 +
 +      // Replicate master1 in the master1 cluster to master2 in the master2 cluster
 +      connMaster1.tableOperations().setProperty(master1Table, Property.TABLE_REPLICATION.getKey(), "true");
 +      connMaster1.tableOperations().setProperty(master1Table,
 +          Property.TABLE_REPLICATION_TARGET.getKey() + master2Cluster.getInstanceName(), master2TableId);
 +
 +      // Replicate master2 in the master2 cluster to master1 in the master2 cluster
 +      connMaster2.tableOperations().setProperty(master2Table, Property.TABLE_REPLICATION.getKey(), "true");
 +      connMaster2.tableOperations().setProperty(master2Table,
 +          Property.TABLE_REPLICATION_TARGET.getKey() + master1Cluster.getInstanceName(), master1TableId);
 +
 +      // Give our replication user the ability to write to the respective table
 +      connMaster1.securityOperations().grantTablePermission(master1UserName, master1Table, TablePermission.WRITE);
 +      connMaster2.securityOperations().grantTablePermission(master2UserName, master2Table, TablePermission.WRITE);
 +
 +      IteratorSetting summingCombiner = new IteratorSetting(50, SummingCombiner.class);
 +      SummingCombiner.setEncodingType(summingCombiner, Type.STRING);
 +      SummingCombiner.setCombineAllColumns(summingCombiner, true);
 +
 +      // Set a combiner on both instances that will sum multiple values
 +      // We can use this to verify that the mutation was not sent multiple times
 +      connMaster1.tableOperations().attachIterator(master1Table, summingCombiner);
 +      connMaster2.tableOperations().attachIterator(master2Table, summingCombiner);
 +
 +      // Write a single entry
 +      BatchWriter bw = connMaster1.createBatchWriter(master1Table, new BatchWriterConfig());
 +      Mutation m = new Mutation("row");
 +      m.put("count", "", "1");
 +      bw.addMutation(m);
 +      bw.close();
 +
 +      Set<String> files = connMaster1.replicationOperations().referencedFiles(master1Table);
 +
 +      log.info("Found {} that need replication from master1", files);
 +
 +      // Kill and restart the tserver to close the WAL on master1
 +      for (ProcessReference proc : master1Cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
 +        master1Cluster.killProcess(ServerType.TABLET_SERVER, proc);
 +      }
 +
 +      master1Cluster.exec(TabletServer.class);
 +
 +      log.info("Restarted tserver on master1");
 +
 +      // Try to avoid ACCUMULO-2964
 +      Thread.sleep(1000);
 +
 +      // Sanity check that the element is there on master1
 +      Scanner s = connMaster1.createScanner(master1Table, Authorizations.EMPTY);
 +      Entry<Key,Value> entry = Iterables.getOnlyElement(s);
 +      Assert.assertEquals("1", entry.getValue().toString());
 +
 +      // Wait for this table to replicate
 +      connMaster1.replicationOperations().drain(master1Table, files);
 +
 +      Thread.sleep(5000);
 +
 +      // Check that the element made it to master2 only once
 +      s = connMaster2.createScanner(master2Table, Authorizations.EMPTY);
 +      entry = Iterables.getOnlyElement(s);
 +      Assert.assertEquals("1", entry.getValue().toString());
 +
 +      // Wait for master2 to finish replicating it back
 +      files = connMaster2.replicationOperations().referencedFiles(master2Table);
 +
 +      // Kill and restart the tserver to close the WAL on master2
 +      for (ProcessReference proc : master2Cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
 +        master2Cluster.killProcess(ServerType.TABLET_SERVER, proc);
 +      }
 +
 +      master2Cluster.exec(TabletServer.class);
 +
 +      // Try to avoid ACCUMULO-2964
 +      Thread.sleep(1000);
 +
 +      // Check that the element made it to master2 only once
 +      s = connMaster2.createScanner(master2Table, Authorizations.EMPTY);
 +      entry = Iterables.getOnlyElement(s);
 +      Assert.assertEquals("1", entry.getValue().toString());
 +
 +      connMaster2.replicationOperations().drain(master2Table, files);
 +
 +      Thread.sleep(5000);
 +
 +      // Verify that the entry wasn't sent back to master1
 +      s = connMaster1.createScanner(master1Table, Authorizations.EMPTY);
 +      entry = Iterables.getOnlyElement(s);
 +      Assert.assertEquals("1", entry.getValue().toString());
 +    } finally {
 +      master1Cluster.stop();
 +      master2Cluster.stop();
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e16d55f/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
index 7f42aa0,0000000..544fb36
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
@@@ -1,443 -1,0 +1,442 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.replication;
 +
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Scanner;
- import org.apache.accumulo.core.client.impl.ClientExecReturn;
 +import org.apache.accumulo.core.client.impl.ClientContext;
++import org.apache.accumulo.core.client.impl.ClientExecReturn;
 +import org.apache.accumulo.core.client.impl.MasterClient;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.master.thrift.MasterClientService;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 +import org.apache.accumulo.core.protobuf.ProtobufUtil;
 +import org.apache.accumulo.core.replication.ReplicationTable;
 +import org.apache.accumulo.core.replication.proto.Replication.Status;
 +import org.apache.accumulo.core.rpc.ThriftUtil;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.Credentials;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 +import org.apache.accumulo.core.trace.Tracer;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
- import org.apache.accumulo.test.functional.AbstractMacIT;
 +import org.apache.accumulo.test.functional.ConfigurableMacIT;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.fs.RawLocalFileSystem;
 +import org.apache.hadoop.io.Text;
 +import org.bouncycastle.util.Arrays;
 +import org.junit.Assert;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +/**
 + * ACCUMULO-3302 series of tests which ensure that a WAL is prematurely closed when a TServer may still continue to use it. Checking that no tablet references a
 + * WAL is insufficient to determine if a WAL will never be used in the future.
 + */
 +public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacIT {
 +  private static final Logger log = LoggerFactory.getLogger(GarbageCollectorCommunicatesWithTServersIT.class);
 +
 +  private final int GC_PERIOD_SECONDS = 1;
 +
 +  @Override
 +  public int defaultTimeoutSeconds() {
 +    return 2 * 60;
 +  }
 +
 +  @Override
 +  public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
 +    cfg.setNumTservers(1);
 +    cfg.setProperty(Property.GC_CYCLE_DELAY, GC_PERIOD_SECONDS + "s");
 +    // Wait longer to try to let the replication table come online before a cycle runs
 +    cfg.setProperty(Property.GC_CYCLE_START, "10s");
 +    cfg.setProperty(Property.REPLICATION_NAME, "master");
 +    // Set really long delays for the master to do stuff for replication. We don't need
 +    // it to be doing anything, so just let it sleep
 +    cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "240s");
 +    cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "240s");
 +    cfg.setProperty(Property.REPLICATION_DRIVER_DELAY, "240s");
 +    // Pull down the maximum size of the wal so we can test close()'ing it.
 +    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
 +    coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
 +  }
 +
 +  /**
 +   * Fetch all of the WALs referenced by tablets in the metadata table for this table
 +   */
 +  private Set<String> getWalsForTable(String tableName) throws Exception {
 +    final Connector conn = getConnector();
 +    final String tableId = conn.tableOperations().tableIdMap().get(tableName);
 +
 +    Assert.assertNotNull("Could not determine table ID for " + tableName, tableId);
 +
 +    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +    Range r = MetadataSchema.TabletsSection.getRange(tableId);
 +    s.setRange(r);
 +    s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME);
 +
 +    Set<String> wals = new HashSet<String>();
 +    for (Entry<Key,Value> entry : s) {
 +      log.debug("Reading WALs: {}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
 +      // hostname:port/uri://path/to/wal
 +      String cq = entry.getKey().getColumnQualifier().toString();
 +      int index = cq.indexOf('/');
 +      // Normalize the path
 +      String path = new Path(cq.substring(index + 1)).toString();
 +      log.debug("Extracted file: " + path);
 +      wals.add(path);
 +    }
 +
 +    return wals;
 +  }
 +
 +  /**
 +   * Fetch all of the rfiles referenced by tablets in the metadata table for this table
 +   */
 +  private Set<String> getFilesForTable(String tableName) throws Exception {
 +    final Connector conn = getConnector();
 +    final String tableId = conn.tableOperations().tableIdMap().get(tableName);
 +
 +    Assert.assertNotNull("Could not determine table ID for " + tableName, tableId);
 +
 +    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +    Range r = MetadataSchema.TabletsSection.getRange(tableId);
 +    s.setRange(r);
 +    s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
 +
 +    Set<String> rfiles = new HashSet<String>();
 +    for (Entry<Key,Value> entry : s) {
 +      log.debug("Reading RFiles: {}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
 +      // uri://path/to/wal
 +      String cq = entry.getKey().getColumnQualifier().toString();
 +      String path = new Path(cq).toString();
 +      log.debug("Normalize path to rfile: {}", path);
 +      rfiles.add(path);
 +    }
 +
 +    return rfiles;
 +  }
 +
 +  /**
 +   * Get the replication status messages for the given table that exist in the metadata table (~repl entries)
 +   */
 +  private Map<String,Status> getMetadataStatusForTable(String tableName) throws Exception {
 +    final Connector conn = getConnector();
 +    final String tableId = conn.tableOperations().tableIdMap().get(tableName);
 +
 +    Assert.assertNotNull("Could not determine table ID for " + tableName, tableId);
 +
 +    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +    Range r = MetadataSchema.ReplicationSection.getRange();
 +    s.setRange(r);
 +    s.fetchColumn(MetadataSchema.ReplicationSection.COLF, new Text(tableId));
 +
 +    Map<String,Status> fileToStatus = new HashMap<String,Status>();
 +    for (Entry<Key,Value> entry : s) {
 +      Text file = new Text();
 +      MetadataSchema.ReplicationSection.getFile(entry.getKey(), file);
 +      Status status = Status.parseFrom(entry.getValue().get());
 +      log.info("Got status for {}: {}", file, ProtobufUtil.toString(status));
 +      fileToStatus.put(file.toString(), status);
 +    }
 +
 +    return fileToStatus;
 +  }
 +
 +  @Test
 +  public void testActiveWalPrecludesClosing() throws Exception {
 +    final String table = getUniqueNames(1)[0];
 +    final Connector conn = getConnector();
 +
 +    // Bring the replication table online first and foremost
 +    ReplicationTable.setOnline(conn);
 +
 +    log.info("Creating {}", table);
 +    conn.tableOperations().create(table);
 +
 +    conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true");
 +
 +    log.info("Writing a few mutations to the table");
 +
 +    BatchWriter bw = conn.createBatchWriter(table, null);
 +
 +    byte[] empty = new byte[0];
 +    for (int i = 0; i < 5; i++) {
 +      Mutation m = new Mutation(Integer.toString(i));
 +      m.put(empty, empty, empty);
 +      bw.addMutation(m);
 +    }
 +
 +    log.info("Flushing mutations to the server");
 +    bw.flush();
 +
 +    log.info("Checking that metadata only has one WAL recorded for this table");
 +
 +    Set<String> wals = getWalsForTable(table);
 +    Assert.assertEquals("Expected to only find one WAL for the table", 1, wals.size());
 +
 +    log.info("Compacting the table which will remove all WALs from the tablets");
 +
 +    // Flush our test table to remove the WAL references in it
 +    conn.tableOperations().flush(table, null, null, true);
 +    // Flush the metadata table too because it will have a reference to the WAL
 +    conn.tableOperations().flush(MetadataTable.NAME, null, null, true);
 +
 +    log.info("Waiting for replication table to come online");
 +
 +    log.info("Fetching replication statuses from metadata table");
 +
 +    Map<String,Status> fileToStatus = getMetadataStatusForTable(table);
 +
 +    Assert.assertEquals("Expected to only find one replication status message", 1, fileToStatus.size());
 +
 +    String walName = fileToStatus.keySet().iterator().next();
 +    Assert.assertEquals("Expected log file name from tablet to equal replication entry", wals.iterator().next(), walName);
 +
 +    Status status = fileToStatus.get(walName);
 +
 +    Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed());
 +
 +    log.info("Checking to see that log entries are removed from tablet section after MinC");
 +    // After compaction, the log column should be gone from the tablet
 +    Set<String> walsAfterMinc = getWalsForTable(table);
 +    Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size());
 +
 +    Set<String> filesForTable = getFilesForTable(table);
 +    Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size());
 +    log.info("Files for table before MajC: {}", filesForTable);
 +
 +    // Issue a MajC to roll a new file in HDFS
 +    conn.tableOperations().compact(table, null, null, false, true);
 +
 +    Set<String> filesForTableAfterCompaction = getFilesForTable(table);
 +
 +    log.info("Files for table after MajC: {}", filesForTableAfterCompaction);
 +
 +    Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTableAfterCompaction.size());
 +    Assert.assertNotEquals("Expected the files before and after compaction to differ", filesForTableAfterCompaction, filesForTable);
 +
 +    // Use the rfile which was just replaced by the MajC to determine when the GC has ran
 +    Path fileToBeDeleted = new Path(filesForTable.iterator().next());
 +    FileSystem fs = fileToBeDeleted.getFileSystem(new Configuration());
 +
 +    boolean fileExists = fs.exists(fileToBeDeleted);
 +    while (fileExists) {
 +      log.info("File which should get deleted still exists: {}", fileToBeDeleted);
 +      Thread.sleep(2000);
 +      fileExists = fs.exists(fileToBeDeleted);
 +    }
 +
 +    // At this point in time, we *know* that the GarbageCollector has run which means that the Status
 +    // for our WAL should not be altered.
 +
 +    log.info("Re-checking that WALs are still not referenced for our table");
 +
 +    Set<String> walsAfterMajc = getWalsForTable(table);
 +    Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc, 0, walsAfterMajc.size());
 +
 +    Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table);
 +    Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size());
 +
 +    Assert.assertEquals("Status before and after MinC should be identical", fileToStatus, fileToStatusAfterMinc);
 +  }
 +
 +  @Test
 +  public void testUnreferencedWalInTserverIsClosed() throws Exception {
 +    final String[] names = getUniqueNames(2);
 +    // `table` will be replicated, `otherTable` is only used to roll the WAL on the tserver
 +    final String table = names[0], otherTable = names[1];
 +    final Connector conn = getConnector();
 +
 +    // Bring the replication table online first and foremost
 +    ReplicationTable.setOnline(conn);
 +
 +    log.info("Creating {}", table);
 +    conn.tableOperations().create(table);
 +
 +    conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true");
 +
 +    log.info("Writing a few mutations to the table");
 +
 +    BatchWriter bw = conn.createBatchWriter(table, null);
 +
 +    byte[] empty = new byte[0];
 +    for (int i = 0; i < 5; i++) {
 +      Mutation m = new Mutation(Integer.toString(i));
 +      m.put(empty, empty, empty);
 +      bw.addMutation(m);
 +    }
 +
 +    log.info("Flushing mutations to the server");
 +    bw.close();
 +
 +    log.info("Checking that metadata only has one WAL recorded for this table");
 +
 +    Set<String> wals = getWalsForTable(table);
 +    Assert.assertEquals("Expected to only find one WAL for the table", 1, wals.size());
 +
 +    log.info("Compacting the table which will remove all WALs from the tablets");
 +
 +    // Flush our test table to remove the WAL references in it
 +    conn.tableOperations().flush(table, null, null, true);
 +    // Flush the metadata table too because it will have a reference to the WAL
 +    conn.tableOperations().flush(MetadataTable.NAME, null, null, true);
 +
 +    log.info("Fetching replication statuses from metadata table");
 +
 +    Map<String,Status> fileToStatus = getMetadataStatusForTable(table);
 +
 +    Assert.assertEquals("Expected to only find one replication status message", 1, fileToStatus.size());
 +
 +    String walName = fileToStatus.keySet().iterator().next();
 +    Assert.assertEquals("Expected log file name from tablet to equal replication entry", wals.iterator().next(), walName);
 +
 +    Status status = fileToStatus.get(walName);
 +
 +    Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed());
 +
 +    log.info("Checking to see that log entries are removed from tablet section after MinC");
 +    // After compaction, the log column should be gone from the tablet
 +    Set<String> walsAfterMinc = getWalsForTable(table);
 +    Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size());
 +
 +    Set<String> filesForTable = getFilesForTable(table);
 +    Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size());
 +    log.info("Files for table before MajC: {}", filesForTable);
 +
 +    // Issue a MajC to roll a new file in HDFS
 +    conn.tableOperations().compact(table, null, null, false, true);
 +
 +    Set<String> filesForTableAfterCompaction = getFilesForTable(table);
 +
 +    log.info("Files for table after MajC: {}", filesForTableAfterCompaction);
 +
 +    Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTableAfterCompaction.size());
 +    Assert.assertNotEquals("Expected the files before and after compaction to differ", filesForTableAfterCompaction, filesForTable);
 +
 +    // Use the rfile which was just replaced by the MajC to determine when the GC has ran
 +    Path fileToBeDeleted = new Path(filesForTable.iterator().next());
 +    FileSystem fs = fileToBeDeleted.getFileSystem(new Configuration());
 +
 +    boolean fileExists = fs.exists(fileToBeDeleted);
 +    while (fileExists) {
 +      log.info("File which should get deleted still exists: {}", fileToBeDeleted);
 +      Thread.sleep(2000);
 +      fileExists = fs.exists(fileToBeDeleted);
 +    }
 +
 +    // At this point in time, we *know* that the GarbageCollector has run which means that the Status
 +    // for our WAL should not be altered.
 +
 +    log.info("Re-checking that WALs are still not referenced for our table");
 +
 +    Set<String> walsAfterMajc = getWalsForTable(table);
 +    Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc, 0, walsAfterMajc.size());
 +
 +    Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table);
 +    Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size());
 +
 +    Assert.assertEquals("Status before and after MinC should be identical", fileToStatus, fileToStatusAfterMinc);
 +
 +    /*
 +     * To verify that the WALs is still getting closed, we have to force the tserver to close the existing WAL and open a new one instead. The easiest way to do
 +     * this is to write a load of data that will exceed the 1.33% full threshold that the logger keeps track of
 +     */
 +
 +    conn.tableOperations().create(otherTable);
 +    bw = conn.createBatchWriter(otherTable, null);
 +    // 500k
 +    byte[] bigValue = new byte[1024 * 500];
 +    Arrays.fill(bigValue, (byte)1);
 +    // 500k * 50
 +    for (int i = 0; i < 50; i++) {
 +      Mutation m = new Mutation(Integer.toString(i));
 +      m.put(empty, empty, bigValue);
 +      bw.addMutation(m);
 +      if (i % 10 == 0) {
 +        bw.flush();
 +      }
 +    }
 +
 +    bw.close();
 +
 +    conn.tableOperations().flush(otherTable, null, null, true);
 +
 +    // Get the tservers which the master deems as active
-     final ClientContext context = new ClientContext(conn.getInstance(), new Credentials("root", new PasswordToken(AbstractMacIT.ROOT_PASSWORD)),
++    final ClientContext context = new ClientContext(conn.getInstance(), new Credentials("root", new PasswordToken(ConfigurableMacIT.ROOT_PASSWORD)),
 +        getClientConfig());
 +    List<String> tservers = MasterClient.execute(context, new ClientExecReturn<List<String>,MasterClientService.Client>() {
 +      @Override
 +      public List<String> execute(MasterClientService.Client client) throws Exception {
 +        return client.getActiveTservers(Tracer.traceInfo(), context.rpcCreds());
 +      }
 +    });
 +
 +    Assert.assertEquals("Expected only one active tservers", 1, tservers.size());
 +
 +    String tserver = tservers.get(0);
 +
 +    // Get the active WALs from that server
 +    log.info("Fetching active WALs from {}", tserver);
 +
 +    Client client = ThriftUtil.getTServerClient(tserver, context);
 +    List<String> activeWalsForTserver = client.getActiveLogs(Tracer.traceInfo(), context.rpcCreds());
 +
 +    log.info("Active wals: {}", activeWalsForTserver);
 +
 +    Assert.assertEquals("Expected to find only one active WAL", 1, activeWalsForTserver.size());
 +
 +    String activeWal = new Path(activeWalsForTserver.get(0)).toString();
 +
 +    Assert.assertNotEquals("Current active WAL on tserver should not be the original WAL we saw", walName, activeWal);
 +
 +    log.info("Ensuring that replication status does get closed after WAL is no longer in use by Tserver");
 +
 +    do {
 +      Map<String,Status> replicationStatuses = getMetadataStatusForTable(table);
 +
 +      log.info("Got replication status messages {}", replicationStatuses);
 +      Assert.assertEquals("Did not expect to find additional status records", 1, replicationStatuses.size());
 +
 +      status = replicationStatuses.values().iterator().next();
 +      log.info("Current status: {}", ProtobufUtil.toString(status));
 +
 +      if (status.getClosed()) {
 +        return;
 +      }
 +
 +      log.info("Status is not yet closed, waiting for garbage collector to close it");
 +
 +      Thread.sleep(2000);
 +    } while (true);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e16d55f/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacIT.java
index 68e57d1,0000000..6363ba5
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacIT.java
@@@ -1,115 -1,0 +1,114 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.replication;
 +
 +import java.util.EnumSet;
 +import java.util.HashMap;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.admin.TableOperations;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 +import org.apache.accumulo.core.protobuf.ProtobufUtil;
 +import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 +import org.apache.accumulo.core.replication.ReplicationTable;
 +import org.apache.accumulo.core.replication.StatusUtil;
 +import org.apache.accumulo.core.replication.proto.Replication.Status;
 +import org.apache.accumulo.core.security.TablePermission;
++import org.apache.accumulo.harness.SharedMiniClusterIT;
 +import org.apache.accumulo.server.util.ReplicationTableUtil;
- import org.apache.accumulo.test.functional.SimpleMacIT;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import com.google.common.collect.Iterables;
 +
 +/**
 + *
 + */
- @SuppressWarnings("deprecation")
- public class StatusCombinerMacIT extends SimpleMacIT {
++public class StatusCombinerMacIT extends SharedMiniClusterIT {
 +
 +  @Test
 +  public void testCombinerSetOnMetadata() throws Exception {
 +    TableOperations tops = getConnector().tableOperations();
 +    Map<String,EnumSet<IteratorScope>> iterators = tops.listIterators(MetadataTable.NAME);
 +
 +    Assert.assertTrue(iterators.containsKey(ReplicationTableUtil.COMBINER_NAME));
 +    EnumSet<IteratorScope> scopes = iterators.get(ReplicationTableUtil.COMBINER_NAME);
 +    Assert.assertEquals(3, scopes.size());
 +    Assert.assertTrue(scopes.contains(IteratorScope.scan));
 +    Assert.assertTrue(scopes.contains(IteratorScope.minc));
 +    Assert.assertTrue(scopes.contains(IteratorScope.majc));
 +
 +    Iterable<Entry<String,String>> propIter = tops.getProperties(MetadataTable.NAME);
 +    HashMap<String,String> properties = new HashMap<String,String>();
 +    for (Entry<String,String> entry : propIter) {
 +      properties.put(entry.getKey(), entry.getValue());
 +    }
 +
 +    for (IteratorScope scope : scopes) {
 +      String key = Property.TABLE_ITERATOR_PREFIX.getKey() + scope.name() + "." + ReplicationTableUtil.COMBINER_NAME + ".opt.columns";
 +      Assert.assertTrue("Properties did not contain key : " + key, properties.containsKey(key));
 +      Assert.assertEquals(MetadataSchema.ReplicationSection.COLF.toString(), properties.get(key));
 +    }
 +  }
 +
 +  @Test
 +  public void test() throws Exception {
 +    Connector conn = getConnector();
 +
 +    ReplicationTable.setOnline(conn);
 +    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
 +    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
 +    long createTime = System.currentTimeMillis();
 +    try {
 +      Mutation m = new Mutation("file:/accumulo/wal/HW10447.local+56808/93cdc17e-7521-44fa-87b5-37f45bcb92d3");
 +      StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(createTime));
 +      bw.addMutation(m);
 +    } finally {
 +      bw.close();
 +    }
 +
 +    Scanner s = ReplicationTable.getScanner(conn);
 +    Entry<Key,Value> entry = Iterables.getOnlyElement(s);
 +    Assert.assertEquals(StatusUtil.fileCreatedValue(createTime), entry.getValue());
 +
 +    bw = ReplicationTable.getBatchWriter(conn);
 +    try {
 +      Mutation m = new Mutation("file:/accumulo/wal/HW10447.local+56808/93cdc17e-7521-44fa-87b5-37f45bcb92d3");
 +      StatusSection.add(m, new Text("1"), ProtobufUtil.toValue(StatusUtil.replicated(Long.MAX_VALUE)));
 +      bw.addMutation(m);
 +    } finally {
 +      bw.close();
 +    }
 +
 +    s = ReplicationTable.getScanner(conn);
 +    entry = Iterables.getOnlyElement(s);
 +    Status stat = Status.parseFrom(entry.getValue().get());
 +    Assert.assertEquals(Long.MAX_VALUE, stat.getBegin());
 +  }
 +
 +}


Mime
View raw message