accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [19/21] accumulo git commit: Merge branch '1.7'
Date Wed, 03 Feb 2016 03:09:17 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c8088e9/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java
index f58db38,0000000..1fb56ef
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java
@@@ -1,153 -1,0 +1,152 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
++import static java.nio.charset.StandardCharsets.UTF_8;
 +import static org.junit.Assert.assertEquals;
 +
 +import java.util.Map;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.cluster.ClusterControl;
 +import org.apache.accumulo.cluster.ClusterUser;
 +import org.apache.accumulo.core.cli.ScannerOpts;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.minicluster.ServerType;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.test.TestIngest;
 +import org.apache.accumulo.test.VerifyIngest;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.RawLocalFileSystem;
 +import org.junit.After;
 +import org.junit.Before;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
- import com.google.common.base.Charsets;
- import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
- 
 +public class RestartStressIT extends AccumuloClusterHarness {
 +  private static final Logger log = LoggerFactory.getLogger(RestartStressIT.class);
 +
 +  @Override
 +  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite)
{
 +    Map<String,String> opts = cfg.getSiteConfig();
 +    opts.put(Property.TSERV_MAXMEM.getKey(), "100K");
 +    opts.put(Property.TSERV_MAJC_DELAY.getKey(), "100ms");
 +    opts.put(Property.TSERV_WALOG_MAX_SIZE.getKey(), "1M");
 +    opts.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s");
 +    opts.put(Property.MASTER_RECOVERY_DELAY.getKey(), "1s");
 +    cfg.setSiteConfig(opts);
 +    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
 +  }
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 10 * 60;
 +  }
 +
 +  private ExecutorService svc;
 +
 +  @Before
 +  public void setup() throws Exception {
 +    svc = Executors.newFixedThreadPool(1);
 +  }
 +
 +  @After
 +  public void teardown() throws Exception {
 +    if (null == svc) {
 +      return;
 +    }
 +
 +    if (!svc.isShutdown()) {
 +      svc.shutdown();
 +    }
 +
 +    while (!svc.awaitTermination(10, TimeUnit.SECONDS)) {
 +      log.info("Waiting for threadpool to terminate");
 +    }
 +  }
 +
 +  private static final VerifyIngest.Opts VOPTS;
 +  static {
 +    VOPTS = new VerifyIngest.Opts();
 +    VOPTS.rows = 10 * 1000;
 +  }
 +  private static final ScannerOpts SOPTS = new ScannerOpts();
 +
 +  @Test
 +  public void test() throws Exception {
 +    final Connector c = getConnector();
 +    final String tableName = getUniqueNames(1)[0];
 +    final AuthenticationToken token = getAdminToken();
 +    c.tableOperations().create(tableName);
 +    c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(),
"500K");
 +    final ClusterControl control = getCluster().getClusterControl();
 +    final String[] args;
 +    if (token instanceof PasswordToken) {
 +      byte[] password = ((PasswordToken) token).getPassword();
-       args = new String[] {"-u", getAdminPrincipal(), "-p", new String(password, Charsets.UTF_8),
"-i", cluster.getInstanceName(), "-z",
-           cluster.getZooKeepers(), "--rows", "" + VOPTS.rows, "--table", tableName};
++      args = new String[] {"-u", getAdminPrincipal(), "-p", new String(password, UTF_8),
"-i", cluster.getInstanceName(), "-z", cluster.getZooKeepers(),
++          "--rows", "" + VOPTS.rows, "--table", tableName};
 +    } else if (token instanceof KerberosToken) {
 +      ClusterUser rootUser = getAdminUser();
 +      args = new String[] {"-u", getAdminPrincipal(), "--keytab", rootUser.getKeytab().getAbsolutePath(),
"-i", cluster.getInstanceName(), "-z",
 +          cluster.getZooKeepers(), "--rows", "" + VOPTS.rows, "--table", tableName};
 +    } else {
 +      throw new RuntimeException("Unrecognized token");
 +    }
 +
 +    Future<Integer> retCode = svc.submit(new Callable<Integer>() {
 +      @Override
 +      public Integer call() {
 +        try {
 +          return control.exec(TestIngest.class, args);
 +        } catch (Exception e) {
 +          log.error("Error running TestIngest", e);
 +          return -1;
 +        }
 +      }
 +    });
 +
 +    for (int i = 0; i < 2; i++) {
 +      sleepUninterruptibly(10, TimeUnit.SECONDS);
 +      control.stopAllServers(ServerType.TABLET_SERVER);
 +      control.startAllServers(ServerType.TABLET_SERVER);
 +    }
 +    assertEquals(0, retCode.get().intValue());
 +    VOPTS.setTableName(tableName);
 +
 +    if (token instanceof PasswordToken) {
 +      VOPTS.setPrincipal(getAdminPrincipal());
 +    } else if (token instanceof KerberosToken) {
 +      VOPTS.updateKerberosCredentials(cluster.getClientConfig());
 +    } else {
 +      throw new RuntimeException("Unrecognized token");
 +    }
 +
 +    VerifyIngest.verifyIngest(c, VOPTS, SOPTS);
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c8088e9/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
index 4f78b77,0000000..71956f4
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
@@@ -1,390 -1,0 +1,390 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
- import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
 +import static org.junit.Assert.assertNotNull;
 +import static org.junit.Assert.assertTrue;
 +import static org.junit.Assert.fail;
 +
 +import java.util.EnumSet;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Random;
 +import java.util.Set;
 +import java.util.SortedSet;
 +import java.util.TreeSet;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.admin.ActiveScan;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.IteratorUtil;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.ColumnVisibility;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +
 +/**
 + * ACCUMULO-2641 Integration test. ACCUMULO-2641 Adds scan id to thrift protocol so that
{@code org.apache.accumulo.core.client.admin.ActiveScan.getScanid()}
 + * returns a unique scan id.
 + *
 + * <p>
 + * The test uses the Minicluster and the {@code org.apache.accumulo.test.functional.SlowIterator}
to create multiple scan sessions. The test exercises multiple
 + * tablet servers with splits and multiple ranges to force the scans to occur across multiple
tablet servers for completeness.
 + *
 + * <p>
 + * This patch modified thrift, the TraceRepoDeserializationTest test seems to fail unless
the following be added:
 + *
 + * <p>
 + * private static final long serialVersionUID = -4659975753252858243l;
 + *
 + * <p>
 + * back into org.apache.accumulo.trace.thrift.TInfo until that test signature is regenerated.
 + */
 +public class ScanIdIT extends AccumuloClusterHarness {
 +
 +  private static final Logger log = LoggerFactory.getLogger(ScanIdIT.class);
 +
 +  private static final int NUM_SCANNERS = 8;
 +
 +  private static final int NUM_DATA_ROWS = 100;
 +
 +  private static final Random random = new Random();
 +
 +  private static final ExecutorService pool = Executors.newFixedThreadPool(NUM_SCANNERS);
 +
 +  private static final AtomicBoolean testInProgress = new AtomicBoolean(true);
 +
 +  private static final Map<Integer,Value> resultsByWorker = new ConcurrentHashMap<Integer,Value>();
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 60;
 +  }
 +
 +  /**
 +   * @throws Exception
 +   *           any exception is a test failure.
 +   */
 +  @Test
 +  public void testScanId() throws Exception {
 +
 +    final String tableName = getUniqueNames(1)[0];
 +    Connector conn = getConnector();
 +    conn.tableOperations().create(tableName);
 +
 +    addSplits(conn, tableName);
 +
 +    log.info("Splits added");
 +
 +    generateSampleData(conn, tableName);
 +
 +    log.info("Generated data for {}", tableName);
 +
 +    attachSlowIterator(conn, tableName);
 +
 +    CountDownLatch latch = new CountDownLatch(NUM_SCANNERS);
 +
 +    for (int scannerIndex = 0; scannerIndex < NUM_SCANNERS; scannerIndex++) {
 +      ScannerThread st = new ScannerThread(conn, scannerIndex, tableName, latch);
 +      pool.submit(st);
 +    }
 +
 +    // wait for scanners to report a result.
 +    while (testInProgress.get()) {
 +
 +      if (resultsByWorker.size() < NUM_SCANNERS) {
 +        log.trace("Results reported {}", resultsByWorker.size());
 +        sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
 +      } else {
 +        // each worker has reported at least one result.
 +        testInProgress.set(false);
 +
 +        log.debug("Final result count {}", resultsByWorker.size());
 +
 +        // delay to allow scanners to react to end of test and cleanly close.
 +        sleepUninterruptibly(1, TimeUnit.SECONDS);
 +      }
 +
 +    }
 +
 +    // all scanner have reported at least 1 result, so check for unique scan ids.
 +    Set<Long> scanIds = new HashSet<Long>();
 +
 +    List<String> tservers = conn.instanceOperations().getTabletServers();
 +
 +    log.debug("tablet servers {}", tservers.toString());
 +
 +    for (String tserver : tservers) {
 +
 +      List<ActiveScan> activeScans = null;
 +      for (int i = 0; i < 10; i++) {
 +        try {
 +          activeScans = conn.instanceOperations().getActiveScans(tserver);
 +          break;
 +        } catch (AccumuloException e) {
 +          if (e.getCause() instanceof TableNotFoundException) {
 +            log.debug("Got TableNotFoundException, will retry");
 +            Thread.sleep(200);
 +            continue;
 +          }
 +          throw e;
 +        }
 +      }
 +
 +      assertNotNull("Repeatedly got exception trying to active scans", activeScans);
 +
 +      log.debug("TServer {} has {} active scans", tserver, activeScans.size());
 +
 +      for (ActiveScan scan : activeScans) {
 +        log.debug("Tserver {} scan id {}", tserver, scan.getScanid());
 +        scanIds.add(scan.getScanid());
 +      }
 +    }
 +
 +    assertTrue("Expected at least " + NUM_SCANNERS + " scanIds, but saw " + scanIds.size(),
NUM_SCANNERS <= scanIds.size());
 +
 +  }
 +
 +  /**
 +   * Runs scanner in separate thread to allow multiple scanners to execute in parallel.
 +   * <p/>
 +   * The thread run method is terminated when the testInProgress flag is set to false.
 +   */
 +  private static class ScannerThread implements Runnable {
 +
 +    private final Connector connector;
 +    private Scanner scanner = null;
 +    private final int workerIndex;
 +    private final String tablename;
 +    private final CountDownLatch latch;
 +
 +    public ScannerThread(final Connector connector, final int workerIndex, final String
tablename, final CountDownLatch latch) {
 +      this.connector = connector;
 +      this.workerIndex = workerIndex;
 +      this.tablename = tablename;
 +      this.latch = latch;
 +    }
 +
 +    /**
 +     * execute the scan across the sample data and put scan result into result map until
testInProgress flag is set to false.
 +     */
 +    @Override
 +    public void run() {
 +
 +      latch.countDown();
 +      try {
 +        latch.await();
 +      } catch (InterruptedException e) {
 +        log.error("Thread interrupted with id {}", workerIndex);
 +        Thread.currentThread().interrupt();
 +        return;
 +      }
 +
 +      log.debug("Creating scanner in worker thread {}", workerIndex);
 +
 +      try {
 +
 +        scanner = connector.createScanner(tablename, new Authorizations());
 +
 +        // Never start readahead
 +        scanner.setReadaheadThreshold(Long.MAX_VALUE);
 +        scanner.setBatchSize(1);
 +
 +        // create different ranges to try to hit more than one tablet.
 +        scanner.setRange(new Range(new Text(Integer.toString(workerIndex)), new Text("9")));
 +
 +      } catch (TableNotFoundException e) {
 +        throw new IllegalStateException("Initialization failure. Could not create scanner",
e);
 +      }
 +
 +      scanner.fetchColumnFamily(new Text("fam1"));
 +
 +      for (Map.Entry<Key,Value> entry : scanner) {
 +
 +        // exit when success condition is met.
 +        if (!testInProgress.get()) {
 +          scanner.clearScanIterators();
 +          scanner.close();
 +
 +          return;
 +        }
 +
 +        Text row = entry.getKey().getRow();
 +
 +        log.debug("worker {}, row {}", workerIndex, row.toString());
 +
 +        if (entry.getValue() != null) {
 +
 +          Value prevValue = resultsByWorker.put(workerIndex, entry.getValue());
 +
 +          // value should always being increasing
 +          if (prevValue != null) {
 +
 +            log.trace("worker {} values {}", workerIndex, String.format("%1$s < %2$s",
prevValue, entry.getValue()));
 +
 +            assertTrue(prevValue.compareTo(entry.getValue()) > 0);
 +          }
 +        } else {
 +          log.info("Scanner returned null");
 +          fail("Scanner returned unexpected null value");
 +        }
 +
 +      }
 +
 +      log.debug("Scanner ran out of data. (info only, not an error) ");
 +
 +    }
 +  }
 +
 +  /**
 +   * Create splits on table and force migration by taking table offline and then bring back
online for test.
 +   *
 +   * @param conn
 +   *          Accumulo connector Accumulo connector to test cluster or MAC instance.
 +   */
 +  private void addSplits(final Connector conn, final String tableName) {
 +
 +    SortedSet<Text> splits = createSplits();
 +
 +    try {
 +
 +      conn.tableOperations().addSplits(tableName, splits);
 +
 +      conn.tableOperations().offline(tableName, true);
 +
 +      sleepUninterruptibly(2, TimeUnit.SECONDS);
 +      conn.tableOperations().online(tableName, true);
 +
 +      for (Text split : conn.tableOperations().listSplits(tableName)) {
 +        log.trace("Split {}", split);
 +      }
 +
 +    } catch (AccumuloSecurityException e) {
 +      throw new IllegalStateException("Initialization failed. Could not add splits to "
+ tableName, e);
 +    } catch (TableNotFoundException e) {
 +      throw new IllegalStateException("Initialization failed. Could not add splits to "
+ tableName, e);
 +    } catch (AccumuloException e) {
 +      throw new IllegalStateException("Initialization failed. Could not add splits to "
+ tableName, e);
 +    }
 +
 +  }
 +
 +  /**
 +   * Create splits to distribute data across multiple tservers.
 +   *
 +   * @return splits in sorted set for addSplits.
 +   */
 +  private SortedSet<Text> createSplits() {
 +
 +    SortedSet<Text> splits = new TreeSet<Text>();
 +
 +    for (int split = 0; split < 10; split++) {
 +      splits.add(new Text(Integer.toString(split)));
 +    }
 +
 +    return splits;
 +  }
 +
 +  /**
 +   * Generate some sample data using random row id to distribute across splits.
 +   * <p/>
 +   * The primary goal is to determine that each scanner is assigned a unique scan id. This
test does check that the count value for fam1 increases if a scanner
 +   * reads multiple value, but this is secondary consideration for this test, that is included
for completeness.
 +   *
 +   * @param connector
 +   *          Accumulo connector Accumulo connector to test cluster or MAC instance.
 +   */
 +  private void generateSampleData(Connector connector, final String tablename) {
 +
 +    try {
 +
 +      BatchWriter bw = connector.createBatchWriter(tablename, new BatchWriterConfig());
 +
 +      ColumnVisibility vis = new ColumnVisibility("public");
 +
 +      for (int i = 0; i < NUM_DATA_ROWS; i++) {
 +
 +        Text rowId = new Text(String.format("%d", ((random.nextInt(10) * 100) + i)));
 +
 +        Mutation m = new Mutation(rowId);
 +        m.put(new Text("fam1"), new Text("count"), new Value(Integer.toString(i).getBytes(UTF_8)));
 +        m.put(new Text("fam1"), new Text("positive"), vis, new Value(Integer.toString(NUM_DATA_ROWS
- i).getBytes(UTF_8)));
 +        m.put(new Text("fam1"), new Text("negative"), vis, new Value(Integer.toString(i
- NUM_DATA_ROWS).getBytes(UTF_8)));
 +
 +        log.trace("Added row {}", rowId);
 +
 +        bw.addMutation(m);
 +      }
 +
 +      bw.close();
 +    } catch (TableNotFoundException ex) {
 +      throw new IllegalStateException("Initialization failed. Could not create test data",
ex);
 +    } catch (MutationsRejectedException ex) {
 +      throw new IllegalStateException("Initialization failed. Could not create test data",
ex);
 +    }
 +  }
 +
 +  /**
 +   * Attach the test slow iterator so that we have time to read the scan id without creating
a large dataset. Uses a fairly large sleep and delay times because
 +   * we are not concerned with how much data is read and we do not read all of the data
- the test stops once each scanner reports a scan id.
 +   *
 +   * @param connector
 +   *          Accumulo connector Accumulo connector to test cluster or MAC instance.
 +   */
 +  private void attachSlowIterator(Connector connector, final String tablename) {
 +    try {
 +
 +      IteratorSetting slowIter = new IteratorSetting(50, "slowIter", "org.apache.accumulo.test.functional.SlowIterator");
 +      slowIter.addOption("sleepTime", "200");
 +      slowIter.addOption("seekSleepTime", "200");
 +
 +      connector.tableOperations().attachIterator(tablename, slowIter, EnumSet.of(IteratorUtil.IteratorScope.scan));
 +
 +    } catch (AccumuloException ex) {
 +      throw new IllegalStateException("Initialization failed. Could not attach slow iterator",
ex);
 +    } catch (TableNotFoundException ex) {
 +      throw new IllegalStateException("Initialization failed. Could not attach slow iterator",
ex);
 +    } catch (AccumuloSecurityException ex) {
 +      throw new IllegalStateException("Initialization failed. Could not attach slow iterator",
ex);
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c8088e9/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
index 11694fd,0000000..25896ba
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
@@@ -1,224 -1,0 +1,223 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
++import static java.nio.charset.StandardCharsets.UTF_8;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertTrue;
 +
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.cluster.ClusterUser;
 +import org.apache.accumulo.core.cli.BatchWriterOpts;
 +import org.apache.accumulo.core.cli.ScannerOpts;
 +import org.apache.accumulo.core.client.ClientConfiguration;
 +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.admin.InstanceOperations;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.data.impl.KeyExtent;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.minicluster.ServerType;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.server.util.CheckForMetadataProblems;
 +import org.apache.accumulo.test.TestIngest;
 +import org.apache.accumulo.test.VerifyIngest;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.io.Text;
 +import org.junit.After;
 +import org.junit.Assume;
 +import org.junit.Before;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
- import com.google.common.base.Charsets;
- import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
- 
 +public class SplitIT extends AccumuloClusterHarness {
 +  private static final Logger log = LoggerFactory.getLogger(SplitIT.class);
 +
 +  @Override
 +  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite)
{
 +    cfg.setProperty(Property.TSERV_MAXMEM, "5K");
 +    cfg.setProperty(Property.TSERV_MAJC_DELAY, "100ms");
 +  }
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 4 * 60;
 +  }
 +
 +  private String tservMaxMem, tservMajcDelay;
 +
 +  @Before
 +  public void alterConfig() throws Exception {
 +    Assume.assumeTrue(ClusterType.MINI == getClusterType());
 +
 +    InstanceOperations iops = getConnector().instanceOperations();
 +    Map<String,String> config = iops.getSystemConfiguration();
 +    tservMaxMem = config.get(Property.TSERV_MAXMEM.getKey());
 +    tservMajcDelay = config.get(Property.TSERV_MAJC_DELAY.getKey());
 +
 +    if (!tservMajcDelay.equals("100ms")) {
 +      iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), "100ms");
 +    }
 +
 +    // Property.TSERV_MAXMEM can't be altered on a running server
 +    boolean restarted = false;
 +    if (!tservMaxMem.equals("5K")) {
 +      iops.setProperty(Property.TSERV_MAXMEM.getKey(), "5K");
 +      getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
 +      getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER);
 +      restarted = true;
 +    }
 +
 +    // If we restarted the tservers, we don't need to re-wait for the majc delay
 +    if (!restarted) {
 +      long millis = AccumuloConfiguration.getTimeInMillis(tservMajcDelay);
 +      log.info("Waiting for majc delay period: {}ms", millis);
 +      Thread.sleep(millis);
 +      log.info("Finished waiting for majc delay period");
 +    }
 +  }
 +
 +  @After
 +  public void resetConfig() throws Exception {
 +    if (null != tservMaxMem) {
 +      log.info("Resetting {}={}", Property.TSERV_MAXMEM.getKey(), tservMaxMem);
 +      getConnector().instanceOperations().setProperty(Property.TSERV_MAXMEM.getKey(), tservMaxMem);
 +      tservMaxMem = null;
 +      getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
 +      getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER);
 +    }
 +    if (null != tservMajcDelay) {
 +      log.info("Resetting {}={}", Property.TSERV_MAJC_DELAY.getKey(), tservMajcDelay);
 +      getConnector().instanceOperations().setProperty(Property.TSERV_MAJC_DELAY.getKey(),
tservMajcDelay);
 +      tservMajcDelay = null;
 +    }
 +  }
 +
 +  @Test
 +  public void tabletShouldSplit() throws Exception {
 +    Connector c = getConnector();
 +    String table = getUniqueNames(1)[0];
 +    c.tableOperations().create(table);
 +    c.tableOperations().setProperty(table, Property.TABLE_SPLIT_THRESHOLD.getKey(), "256K");
 +    c.tableOperations().setProperty(table, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(),
"1K");
 +    TestIngest.Opts opts = new TestIngest.Opts();
 +    VerifyIngest.Opts vopts = new VerifyIngest.Opts();
 +    opts.rows = 100000;
 +    opts.setTableName(table);
 +
 +    ClientConfiguration clientConfig = cluster.getClientConfig();
 +    if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false))
{
 +      opts.updateKerberosCredentials(clientConfig);
 +      vopts.updateKerberosCredentials(clientConfig);
 +    } else {
 +      opts.setPrincipal(getAdminPrincipal());
 +      vopts.setPrincipal(getAdminPrincipal());
 +    }
 +
 +    TestIngest.ingest(c, opts, new BatchWriterOpts());
 +    vopts.rows = opts.rows;
 +    vopts.setTableName(table);
 +    VerifyIngest.verifyIngest(c, vopts, new ScannerOpts());
 +    while (c.tableOperations().listSplits(table).size() < 10) {
 +      sleepUninterruptibly(15, TimeUnit.SECONDS);
 +    }
 +    String id = c.tableOperations().tableIdMap().get(table);
 +    Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +    KeyExtent extent = new KeyExtent(new Text(id), null, null);
 +    s.setRange(extent.toMetadataRange());
 +    MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(s);
 +    int count = 0;
 +    int shortened = 0;
 +    for (Entry<Key,Value> entry : s) {
 +      extent = new KeyExtent(entry.getKey().getRow(), entry.getValue());
 +      if (extent.getEndRow() != null && extent.getEndRow().toString().length() <
14)
 +        shortened++;
 +      count++;
 +    }
 +
 +    assertTrue("Shortened should be greater than zero: " + shortened, shortened > 0);
 +    assertTrue("Count should be cgreater than 10: " + count, count > 10);
 +
 +    String[] args;
 +    if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false))
{
 +      ClusterUser rootUser = getAdminUser();
 +      args = new String[] {"-i", cluster.getInstanceName(), "-u", rootUser.getPrincipal(),
"--keytab", rootUser.getKeytab().getAbsolutePath(), "-z",
 +          cluster.getZooKeepers()};
 +    } else {
 +      PasswordToken token = (PasswordToken) getAdminToken();
-       args = new String[] {"-i", cluster.getInstanceName(), "-u", "root", "-p", new String(token.getPassword(),
Charsets.UTF_8), "-z", cluster.getZooKeepers()};
++      args = new String[] {"-i", cluster.getInstanceName(), "-u", "root", "-p", new String(token.getPassword(),
UTF_8), "-z", cluster.getZooKeepers()};
 +    }
 +
 +    assertEquals(0, getCluster().getClusterControl().exec(CheckForMetadataProblems.class,
args));
 +  }
 +
 +  @Test
 +  public void interleaveSplit() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    c.tableOperations().create(tableName);
 +    c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(),
"10K");
 +    c.tableOperations().setProperty(tableName, Property.TABLE_FILE_COMPRESSION_TYPE.getKey(),
"none");
 +    sleepUninterruptibly(5, TimeUnit.SECONDS);
 +    ReadWriteIT.interleaveTest(c, tableName);
 +    sleepUninterruptibly(5, TimeUnit.SECONDS);
 +    int numSplits = c.tableOperations().listSplits(tableName).size();
 +    while (numSplits <= 20) {
 +      log.info("Waiting for splits to happen");
 +      Thread.sleep(2000);
 +      numSplits = c.tableOperations().listSplits(tableName).size();
 +    }
 +    assertTrue("Expected at least 20 splits, saw " + numSplits, numSplits > 20);
 +  }
 +
 +  @Test
 +  public void deleteSplit() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    c.tableOperations().create(tableName);
 +    c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(),
"10K");
 +    ClientConfiguration clientConfig = getCluster().getClientConfig();
 +    String password = null, keytab = null;
 +    if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false))
{
 +      keytab = getAdminUser().getKeytab().getAbsolutePath();
 +    } else {
-       password = new String(((PasswordToken) getAdminToken()).getPassword(), Charsets.UTF_8);
++      password = new String(((PasswordToken) getAdminToken()).getPassword(), UTF_8);
 +    }
 +    DeleteIT.deleteTest(c, getCluster(), getAdminPrincipal(), password, tableName, keytab);
 +    c.tableOperations().flush(tableName, null, null, true);
 +    for (int i = 0; i < 5; i++) {
 +      sleepUninterruptibly(10, TimeUnit.SECONDS);
 +      if (c.tableOperations().listSplits(tableName).size() > 20)
 +        break;
 +    }
 +    assertTrue(c.tableOperations().listSplits(tableName).size() > 20);
 +  }
 +
 +}


Mime
View raw message