accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [19/19] accumulo git commit: Merge branch 'javadoc-jdk8-1.7'
Date Sat, 09 Jan 2016 03:38:22 GMT
Merge branch 'javadoc-jdk8-1.7'


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8ff2ca81
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8ff2ca81
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8ff2ca81

Branch: refs/heads/master
Commit: 8ff2ca81cd6b2e7ddc76197bd60cfea64eac465f
Parents: c252d1a 0ccba14
Author: Christopher Tubbs <ctubbsii@apache.org>
Authored: Fri Jan 8 22:35:43 2016 -0500
Committer: Christopher Tubbs <ctubbsii@apache.org>
Committed: Fri Jan 8 22:35:43 2016 -0500

----------------------------------------------------------------------
 .../core/bloomfilter/DynamicBloomFilter.java    |  4 +--
 .../accumulo/core/client/BatchWriterConfig.java | 10 +++---
 .../core/client/ConditionalWriterConfig.java    |  4 +--
 .../accumulo/core/client/ScannerBase.java       |  2 --
 .../client/mapred/AccumuloFileOutputFormat.java |  4 +--
 .../mapreduce/AccumuloFileOutputFormat.java     |  4 +--
 .../lib/impl/FileOutputConfigurator.java        |  4 +--
 .../lib/util/FileOutputConfigurator.java        |  4 +--
 .../security/tokens/AuthenticationToken.java    |  2 +-
 .../core/constraints/VisibilityConstraint.java  |  1 -
 .../java/org/apache/accumulo/core/data/Key.java |  2 +-
 .../org/apache/accumulo/core/data/Range.java    |  6 ++--
 .../file/blockfile/cache/CachedBlockQueue.java  |  2 +-
 .../core/file/blockfile/cache/ClassSize.java    |  4 +--
 .../accumulo/core/file/rfile/bcfile/Utils.java  | 35 +++++++++++---------
 .../core/iterators/IteratorEnvironment.java     |  2 --
 .../user/WholeColumnFamilyIterator.java         |  4 +--
 .../core/metadata/ServicerForMetadataTable.java |  2 +-
 .../core/metadata/ServicerForRootTable.java     |  2 +-
 .../core/metadata/ServicerForUserTables.java    |  2 +-
 .../core/metadata/schema/MetadataSchema.java    |  2 +-
 .../core/replication/ReplicationSchema.java     |  6 ++--
 .../accumulo/core/sample/RowColumnSampler.java  |  4 +--
 .../core/security/ColumnVisibility.java         |  8 ++---
 .../security/crypto/CryptoModuleParameters.java |  7 +---
 .../org/apache/accumulo/core/util/OpTimer.java  |  7 ++--
 .../accumulo/core/conf/config-header.html       | 12 +++----
 .../examples/simple/filedata/ChunkCombiner.java | 18 +++++-----
 pom.xml                                         | 23 +++++++++++++
 .../apache/accumulo/server/ServerConstants.java |  2 +-
 .../server/master/balancer/GroupBalancer.java   |  4 +--
 .../master/balancer/RegexGroupBalancer.java     |  6 ++--
 .../server/security/SecurityOperation.java      |  6 ++--
 .../server/security/UserImpersonation.java      |  2 +-
 .../server/security/SystemCredentialsTest.java  |  2 +-
 .../replication/SequentialWorkAssigner.java     |  2 +-
 .../monitor/servlets/DefaultServlet.java        |  2 +-
 .../monitor/servlets/ReplicationServlet.java    |  2 +-
 .../monitor/servlets/TablesServlet.java         |  4 +--
 .../tserver/compaction/CompactionStrategy.java  |  6 ++--
 .../accumulo/test/functional/ScanIdIT.java      | 11 +++---
 .../test/replication/merkle/package-info.java   |  9 ++---
 .../replication/merkle/skvi/DigestIterator.java |  2 +-
 43 files changed, 135 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
index aed67bc,b5692d2..51f6fae
--- a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
@@@ -175,98 -174,4 +175,96 @@@ public interface ScannerBase extends It
     * @return The authorizations set on the scanner instance
     */
    Authorizations getAuthorizations();
 +
 +  /**
 +   * Setting this will cause the scanner to read sample data, as long as that sample data
was generated with the given configuration. By default this is not set
 +   * and all data is read.
 +   *
 +   * <p>
 +   * One way to use this method is as follows, where the sampler configuration is obtained
from the table configuration. Sample data can be generated in many
 +   * different ways, so its important to verify the sample data configuration meets expectations.
 +   *
-    * <p>
-    *
 +   * <pre>
 +   * <code>
 +   *   // could cache this if creating many scanners to avoid RPCs.
 +   *   SamplerConfiguration samplerConfig = connector.tableOperations().getSamplerConfiguration(table);
 +   *   // verify table's sample data is generated in an expected way before using
 +   *   userCode.verifySamplerConfig(samplerConfig);
 +   *   scanner.setSamplerCongiguration(samplerConfig);
 +   * </code>
 +   * </pre>
 +   *
 +   * <p>
 +   * Of course this is not the only way to obtain a {@link SamplerConfiguration}, it could
be a constant, configuration, etc.
 +   *
 +   * <p>
 +   * If sample data is not present or sample data was generated with a different configuration,
then the scanner iterator will throw a
 +   * {@link SampleNotPresentException}. Also if a table's sampler configuration is changed
while a scanner is iterating over a table, a
 +   * {@link SampleNotPresentException} may be thrown.
 +   *
 +   * @since 1.8.0
 +   */
 +  void setSamplerConfiguration(SamplerConfiguration samplerConfig);
 +
 +  /**
 +   * @return currently set sampler configuration. Returns null if no sampler configuration
is set.
 +   * @since 1.8.0
 +   */
 +  SamplerConfiguration getSamplerConfiguration();
 +
 +  /**
 +   * Clears sampler configuration making a scanner read all data. After calling this, {@link
#getSamplerConfiguration()} should return null.
 +   *
 +   * @since 1.8.0
 +   */
 +  void clearSamplerConfiguration();
 +
 +  /**
 +   * This setting determines how long a scanner will wait to fill the returned batch. By
default, a scanner wait until the batch is full.
 +   *
 +   * <p>
 +   * Setting the timeout to zero (with any time unit) or {@link Long#MAX_VALUE} (with {@link
TimeUnit#MILLISECONDS}) means no timeout.
 +   *
 +   * @param timeOut
 +   *          the length of the timeout
 +   * @param timeUnit
 +   *          the units of the timeout
 +   * @since 1.8.0
 +   */
 +  void setBatchTimeout(long timeOut, TimeUnit timeUnit);
 +
 +  /**
 +   * Returns the timeout to fill a batch in the given TimeUnit.
 +   *
 +   * @return the batch timeout configured for this scanner
 +   * @since 1.8.0
 +   */
 +  long getBatchTimeout(TimeUnit timeUnit);
 +
 +  /**
 +   * Sets the name of the classloader context on this scanner. See the administration chapter
of the user manual for details on how to configure and use
 +   * classloader contexts.
 +   *
 +   * @param classLoaderContext
 +   *          name of the classloader context
 +   * @throws NullPointerException
 +   *           if context is null
 +   * @since 1.8.0
 +   */
 +  void setClassLoaderContext(String classLoaderContext);
 +
 +  /**
 +   * Clears the current classloader context set on this scanner
 +   *
 +   * @since 1.8.0
 +   */
 +  void clearClassLoaderContext();
 +
 +  /**
 +   * Returns the name of the current classloader context set on this scanner
 +   *
 +   * @return name of the current context
 +   * @since 1.8.0
 +   */
 +  String getClassLoaderContext();
  }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/FileOutputConfigurator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java
index 5dbafa6,5a53e93..5c265e2
--- a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorEnvironment.java
@@@ -39,52 -37,4 +39,50 @@@ public interface IteratorEnvironment 
    void registerSideChannel(SortedKeyValueIterator<Key,Value> iter);
  
    Authorizations getAuthorizations();
 +
 +  /**
 +   * Returns a new iterator environment object that can be used to create deep copies over
sample data. The new object created will use the current sampling
 +   * configuration for the table. The existing iterator environment object will not be modified.
 +   *
 +   * <p>
 +   * Since sample data could be created in many different ways, a good practice for an iterator
is to verify the sampling configuration is as expected.
 +   *
-    * <p>
-    *
 +   * <pre>
 +   * <code>
 +   *   class MyIter implements SortedKeyValueIterator&lt;Key,Value&gt; {
 +   *     SortedKeyValueIterator&lt;Key,Value&gt; source;
 +   *     SortedKeyValueIterator&lt;Key,Value&gt; sampleIter;
 +   *     &#64;Override
 +   *     void init(SortedKeyValueIterator&lt;Key,Value&gt; source, Map&lt;String,String&gt;
options, IteratorEnvironment env) {
 +   *       IteratorEnvironment sampleEnv = env.cloneWithSamplingEnabled();
 +   *       //do some sanity checks on sampling config
 +   *       validateSamplingConfiguration(sampleEnv.getSamplerConfiguration());
 +   *       sampleIter = source.deepCopy(sampleEnv);
 +   *       this.source = source;
 +   *     }
 +   *   }
 +   * </code>
 +   * </pre>
 +   *
 +   * @throws SampleNotPresentException
 +   *           when sampling is not configured for table.
 +   * @since 1.8.0
 +   */
 +  IteratorEnvironment cloneWithSamplingEnabled();
 +
 +  /**
 +   * There are at least two conditions under which sampling will be enabled for an environment.
One condition is when sampling is enabled for the scan that
 +   * starts everything. Another possibility is for a deep copy created with an environment
created by calling {@link #cloneWithSamplingEnabled()}
 +   *
 +   * @return true if sampling is enabled for this environment.
 +   * @since 1.8.0
 +   */
 +  boolean isSamplingEnabled();
 +
 +  /**
 +   *
 +   * @return sampling configuration is sampling is enabled for environment, otherwise returns
null.
 +   * @since 1.8.0
 +   */
 +  SamplerConfiguration getSamplerConfiguration();
  }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/core/src/main/java/org/apache/accumulo/core/sample/RowColumnSampler.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/sample/RowColumnSampler.java
index ad68cf6,0000000..c3464ab
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/sample/RowColumnSampler.java
+++ b/core/src/main/java/org/apache/accumulo/core/sample/RowColumnSampler.java
@@@ -1,124 -1,0 +1,124 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.accumulo.core.sample;
 +
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.client.admin.SamplerConfiguration;
 +import org.apache.accumulo.core.data.ByteSequence;
 +import org.apache.accumulo.core.data.Key;
 +
 +import com.google.common.collect.ImmutableSet;
 +import com.google.common.hash.HashCode;
 +import com.google.common.hash.HashFunction;
 +import com.google.common.hash.Hasher;
 +
 +/**
 + * This sampler can hash any subset of a Key's fields. The fields that hashed for the sample
are determined by the configuration options passed in
 + * {@link #init(SamplerConfiguration)}. The following key values are valid options.
 + *
-  * <UL>
++ * <ul>
 + * <li>row=true|false
 + * <li>family=true|false
 + * <li>qualifier=true|false
 + * <li>visibility=true|false
-  * </UL>
++ * </ul>
 + *
 + * <p>
 + * If not specified in the options, fields default to false.
 + *
 + * <p>
 + * To determine what options are valid for hashing see {@link AbstractHashSampler}
 + *
 + * <p>
 + * To configure Accumulo to generate sample data on one thousandth of the column qualifiers,
the following SamplerConfiguration could be created and used to
 + * configure a table.
 + *
 + * <p>
 + * {@code new SamplerConfiguration(RowColumnSampler.class.getName()).setOptions(ImmutableMap.of("hasher","murmur3_32","modulus","1009","qualifier","true"))}
 + *
 + * <p>
 + * With this configuration, if a column qualifier is selected then all key values contains
that column qualifier will end up in the sample data.
 + *
 + * @since 1.8.0
 + */
 +
 +public class RowColumnSampler extends AbstractHashSampler {
 +
 +  private boolean row = true;
 +  private boolean family = true;
 +  private boolean qualifier = true;
 +  private boolean visibility = true;
 +
 +  private static final Set<String> VALID_OPTIONS = ImmutableSet.of("row", "family",
"qualifier", "visibility");
 +
 +  private boolean hashField(SamplerConfiguration config, String field) {
 +    String optValue = config.getOptions().get(field);
 +    if (optValue != null) {
 +      return Boolean.parseBoolean(optValue);
 +    }
 +
 +    return false;
 +  }
 +
 +  @Override
 +  protected boolean isValidOption(String option) {
 +    return super.isValidOption(option) || VALID_OPTIONS.contains(option);
 +  }
 +
 +  @Override
 +  public void init(SamplerConfiguration config) {
 +    super.init(config);
 +
 +    row = hashField(config, "row");
 +    family = hashField(config, "family");
 +    qualifier = hashField(config, "qualifier");
 +    visibility = hashField(config, "visibility");
 +
 +    if (!row && !family && !qualifier && !visibility) {
 +      throw new IllegalStateException("Must hash at least one key field");
 +    }
 +  }
 +
 +  private void putByteSquence(ByteSequence data, Hasher hasher) {
 +    hasher.putBytes(data.getBackingArray(), data.offset(), data.length());
 +  }
 +
 +  @Override
 +  protected HashCode hash(HashFunction hashFunction, Key k) {
 +    Hasher hasher = hashFunction.newHasher();
 +
 +    if (row) {
 +      putByteSquence(k.getRowData(), hasher);
 +    }
 +
 +    if (family) {
 +      putByteSquence(k.getColumnFamilyData(), hasher);
 +    }
 +
 +    if (qualifier) {
 +      putByteSquence(k.getColumnQualifierData(), hasher);
 +    }
 +
 +    if (visibility) {
 +      putByteSquence(k.getColumnVisibilityData(), hasher);
 +    }
 +
 +    return hasher.hash();
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/core/src/main/java/org/apache/accumulo/core/util/OpTimer.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/util/OpTimer.java
index 0fb8cc0,564a824..33ece1a
--- a/core/src/main/java/org/apache/accumulo/core/util/OpTimer.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/OpTimer.java
@@@ -57,75 -41,12 +57,78 @@@ public class OpTimer 
      return this;
    }
  
 -  public void stop(String msg) {
 -    if (log.isEnabledFor(level)) {
 -      long t2 = System.currentTimeMillis();
 -      String duration = String.format("%.3f secs", (t2 - t1) / 1000.0);
 -      msg = msg.replace("%DURATION%", duration);
 -      log.log(level, "tid=" + Thread.currentThread().getId() + " oid=" + opid + "  " + msg);
 +  /**
 +   * Stop the timer instance.
 +   *
 +   * @return this instance for fluent chaining.
 +   * @throws IllegalStateException
 +   *           if stop is called on instance that is not running.
 +   */
 +  public OpTimer stop() throws IllegalStateException {
 +    if (!isStarted) {
 +      throw new IllegalStateException("OpTimer is already stopped");
      }
 +    long now = System.nanoTime();
 +    isStarted = false;
 +    currentElapsedNanos += now - startNanos;
 +    return this;
    }
 +
 +  /**
 +   * Stops timer instance and current elapsed time to 0.
 +   *
 +   * @return this instance for fluent chaining
 +   */
 +  public OpTimer reset() {
 +    currentElapsedNanos = 0;
 +    isStarted = false;
 +    return this;
 +  }
 +
 +  /**
 +   * Converts current timer value to specific unit. The conversion to courser granularities
truncate with loss of precision.
 +   *
 +   * @param timeUnit
 +   *          the time unit that will converted to.
 +   * @return truncated time in unit of specified time unit.
 +   */
 +  public long now(TimeUnit timeUnit) {
 +    return timeUnit.convert(now(), TimeUnit.NANOSECONDS);
 +  }
 +
 +  /**
 +   * Returns the current elapsed time scaled to the provided time unit. This method does
not truncate like {@link #now(TimeUnit)} but returns the value as a
-    * double. </p> Note: this method is not included in the hadoop 2.7 org.apache.hadoop.util.StopWatch
class. If that class is adopted, then provisions will be
-    * required to replace this method.
++   * double.
++   *
++   * <p>
++   * Note: this method is not included in the hadoop 2.7 org.apache.hadoop.util.StopWatch
class. If that class is adopted, then provisions will be required to
++   * replace this method.
 +   *
 +   * @param timeUnit
 +   *          the time unit to scale the elapsed time to.
 +   * @return the elapsed time of this instance scaled to the provided time unit.
 +   */
 +  public double scale(TimeUnit timeUnit) {
 +    return (double) now() / TimeUnit.NANOSECONDS.convert(1L, timeUnit);
 +  }
 +
 +  /**
 +   * Returns current timer elapsed time as nanoseconds.
 +   *
 +   * @return elapsed time in nanoseconds.
 +   */
 +  public long now() {
 +    return isStarted ? System.nanoTime() - startNanos + currentElapsedNanos : currentElapsedNanos;
 +  }
 +
 +  /**
 +   * Return the current elapsed time in nanoseconds as a string.
 +   *
 +   * @return timer elapsed time as nanoseconds.
 +   */
 +  @Override
 +  public String toString() {
 +    return String.valueOf(now());
 +  }
 +
  }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index 0f8ec29,644f506..4149d7a
--- a/pom.xml
+++ b/pom.xml
@@@ -1399,31 -1405,27 +1399,54 @@@
        </properties>
      </profile>
      <profile>
+       <id>jdk8</id>
+       <activation>
+         <jdk>[1.8,1.9)</jdk>
+       </activation>
+       <build>
+         <pluginManagement>
+           <plugins>
+             <plugin>
+               <groupId>org.apache.maven.plugins</groupId>
+               <artifactId>maven-javadoc-plugin</artifactId>
+               <configuration>
+                 <encoding>${project.reporting.outputEncoding}</encoding>
+                 <quiet>true</quiet>
+                 <javadocVersion>1.8</javadocVersion>
+                 <additionalJOption>-J-Xmx512m</additionalJOption>
+                 <additionalparam>-Xdoclint:all,-Xdoclint:-missing</additionalparam>
+               </configuration>
+             </plugin>
+           </plugins>
+         </pluginManagement>
+       </build>
+     </profile>
++    <profile>
 +      <id>performanceTests</id>
 +      <build>
 +        <pluginManagement>
 +          <plugins>
 +            <!-- Add an additional execution for performance tests -->
 +            <plugin>
 +              <groupId>org.apache.maven.plugins</groupId>
 +              <artifactId>maven-failsafe-plugin</artifactId>
 +              <executions>
 +                <execution>
 +                  <!-- Run only the performance tests -->
 +                  <id>run-performance-tests</id>
 +                  <goals>
 +                    <goal>integration-test</goal>
 +                    <goal>verify</goal>
 +                  </goals>
 +                  <configuration>
 +                    <groups>${accumulo.performanceTests}</groups>
 +                  </configuration>
 +                </execution>
 +              </executions>
 +            </plugin>
 +          </plugins>
 +        </pluginManagement>
 +      </build>
 +    </profile>
    </profiles>
  </project>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java
----------------------------------------------------------------------
diff --cc server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java
index 57c68c4,274ec76..a29e3dc
--- a/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/security/SystemCredentialsTest.java
@@@ -58,16 -55,9 +58,16 @@@ public class SystemCredentialsTest 
      }
    }
  
 +  @Before
 +  public void setupInstance() {
 +    inst = EasyMock.createMock(Instance.class);
 +    EasyMock.expect(inst.getInstanceID()).andReturn(UUID.nameUUIDFromBytes(new byte[] {1,
2, 3, 4, 5, 6, 7, 8, 9, 0}).toString()).anyTimes();
 +    EasyMock.replay(inst);
 +  }
 +
    /**
     * This is a test to ensure the string literal in {@link ConnectorImpl#ConnectorImpl(org.apache.accumulo.core.client.impl.ClientContext)}
is kept up-to-date
-    * if we move the {@link SystemToken}<br/>
+    * if we move the {@link SystemToken}<br>
     * This check will not be needed after ACCUMULO-1578
     */
    @Test

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8ff2ca81/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
index 7830939,0000000..4f78b77
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
@@@ -1,387 -1,0 +1,390 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import static com.google.common.base.Charsets.UTF_8;
 +import static org.junit.Assert.assertNotNull;
 +import static org.junit.Assert.assertTrue;
 +import static org.junit.Assert.fail;
 +
 +import java.util.EnumSet;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Random;
 +import java.util.Set;
 +import java.util.SortedSet;
 +import java.util.TreeSet;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.admin.ActiveScan;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.IteratorUtil;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.ColumnVisibility;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +
 +/**
 + * ACCUMULO-2641 Integration test. ACCUMULO-2641 Adds scan id to thrift protocol so that
{@code org.apache.accumulo.core.client.admin.ActiveScan.getScanid()}
 + * returns a unique scan id.
++ *
 + * <p>
-  * <p/>
 + * The test uses the Minicluster and the {@code org.apache.accumulo.test.functional.SlowIterator}
to create multiple scan sessions. The test exercises multiple
 + * tablet servers with splits and multiple ranges to force the scans to occur across multiple
tablet servers for completeness.
-  * <p/>
++ *
++ * <p>
 + * This patch modified thrift, the TraceRepoDeserializationTest test seems to fail unless
the following be added:
-  * <p/>
++ *
++ * <p>
 + * private static final long serialVersionUID = -4659975753252858243l;
-  * <p/>
++ *
++ * <p>
 + * back into org.apache.accumulo.trace.thrift.TInfo until that test signature is regenerated.
 + */
 +public class ScanIdIT extends AccumuloClusterHarness {
 +
 +  private static final Logger log = LoggerFactory.getLogger(ScanIdIT.class);
 +
 +  private static final int NUM_SCANNERS = 8;
 +
 +  private static final int NUM_DATA_ROWS = 100;
 +
 +  private static final Random random = new Random();
 +
 +  private static final ExecutorService pool = Executors.newFixedThreadPool(NUM_SCANNERS);
 +
 +  private static final AtomicBoolean testInProgress = new AtomicBoolean(true);
 +
 +  private static final Map<Integer,Value> resultsByWorker = new ConcurrentHashMap<Integer,Value>();
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 60;
 +  }
 +
 +  /**
 +   * @throws Exception
 +   *           any exception is a test failure.
 +   */
 +  @Test
 +  public void testScanId() throws Exception {
 +
 +    final String tableName = getUniqueNames(1)[0];
 +    Connector conn = getConnector();
 +    conn.tableOperations().create(tableName);
 +
 +    addSplits(conn, tableName);
 +
 +    log.info("Splits added");
 +
 +    generateSampleData(conn, tableName);
 +
 +    log.info("Generated data for {}", tableName);
 +
 +    attachSlowIterator(conn, tableName);
 +
 +    CountDownLatch latch = new CountDownLatch(NUM_SCANNERS);
 +
 +    for (int scannerIndex = 0; scannerIndex < NUM_SCANNERS; scannerIndex++) {
 +      ScannerThread st = new ScannerThread(conn, scannerIndex, tableName, latch);
 +      pool.submit(st);
 +    }
 +
 +    // wait for scanners to report a result.
 +    while (testInProgress.get()) {
 +
 +      if (resultsByWorker.size() < NUM_SCANNERS) {
 +        log.trace("Results reported {}", resultsByWorker.size());
 +        sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
 +      } else {
 +        // each worker has reported at least one result.
 +        testInProgress.set(false);
 +
 +        log.debug("Final result count {}", resultsByWorker.size());
 +
 +        // delay to allow scanners to react to end of test and cleanly close.
 +        sleepUninterruptibly(1, TimeUnit.SECONDS);
 +      }
 +
 +    }
 +
 +    // all scanner have reported at least 1 result, so check for unique scan ids.
 +    Set<Long> scanIds = new HashSet<Long>();
 +
 +    List<String> tservers = conn.instanceOperations().getTabletServers();
 +
 +    log.debug("tablet servers {}", tservers.toString());
 +
 +    for (String tserver : tservers) {
 +
 +      List<ActiveScan> activeScans = null;
 +      for (int i = 0; i < 10; i++) {
 +        try {
 +          activeScans = conn.instanceOperations().getActiveScans(tserver);
 +          break;
 +        } catch (AccumuloException e) {
 +          if (e.getCause() instanceof TableNotFoundException) {
 +            log.debug("Got TableNotFoundException, will retry");
 +            Thread.sleep(200);
 +            continue;
 +          }
 +          throw e;
 +        }
 +      }
 +
 +      assertNotNull("Repeatedly got exception trying to active scans", activeScans);
 +
 +      log.debug("TServer {} has {} active scans", tserver, activeScans.size());
 +
 +      for (ActiveScan scan : activeScans) {
 +        log.debug("Tserver {} scan id {}", tserver, scan.getScanid());
 +        scanIds.add(scan.getScanid());
 +      }
 +    }
 +
 +    assertTrue("Expected at least " + NUM_SCANNERS + " scanIds, but saw " + scanIds.size(),
NUM_SCANNERS <= scanIds.size());
 +
 +  }
 +
 +  /**
 +   * Runs scanner in separate thread to allow multiple scanners to execute in parallel.
 +   * <p/>
 +   * The thread run method is terminated when the testInProgress flag is set to false.
 +   */
 +  private static class ScannerThread implements Runnable {
 +
 +    private final Connector connector;
 +    private Scanner scanner = null;
 +    private final int workerIndex;
 +    private final String tablename;
 +    private final CountDownLatch latch;
 +
 +    public ScannerThread(final Connector connector, final int workerIndex, final String
tablename, final CountDownLatch latch) {
 +      this.connector = connector;
 +      this.workerIndex = workerIndex;
 +      this.tablename = tablename;
 +      this.latch = latch;
 +    }
 +
 +    /**
 +     * execute the scan across the sample data and put scan result into result map until
testInProgress flag is set to false.
 +     */
 +    @Override
 +    public void run() {
 +
 +      latch.countDown();
 +      try {
 +        latch.await();
 +      } catch (InterruptedException e) {
 +        log.error("Thread interrupted with id {}", workerIndex);
 +        Thread.currentThread().interrupt();
 +        return;
 +      }
 +
 +      log.debug("Creating scanner in worker thread {}", workerIndex);
 +
 +      try {
 +
 +        scanner = connector.createScanner(tablename, new Authorizations());
 +
 +        // Never start readahead
 +        scanner.setReadaheadThreshold(Long.MAX_VALUE);
 +        scanner.setBatchSize(1);
 +
 +        // create different ranges to try to hit more than one tablet.
 +        scanner.setRange(new Range(new Text(Integer.toString(workerIndex)), new Text("9")));
 +
 +      } catch (TableNotFoundException e) {
 +        throw new IllegalStateException("Initialization failure. Could not create scanner",
e);
 +      }
 +
 +      scanner.fetchColumnFamily(new Text("fam1"));
 +
 +      for (Map.Entry<Key,Value> entry : scanner) {
 +
 +        // exit when success condition is met.
 +        if (!testInProgress.get()) {
 +          scanner.clearScanIterators();
 +          scanner.close();
 +
 +          return;
 +        }
 +
 +        Text row = entry.getKey().getRow();
 +
 +        log.debug("worker {}, row {}", workerIndex, row.toString());
 +
 +        if (entry.getValue() != null) {
 +
 +          Value prevValue = resultsByWorker.put(workerIndex, entry.getValue());
 +
 +          // value should always being increasing
 +          if (prevValue != null) {
 +
 +            log.trace("worker {} values {}", workerIndex, String.format("%1$s < %2$s",
prevValue, entry.getValue()));
 +
 +            assertTrue(prevValue.compareTo(entry.getValue()) > 0);
 +          }
 +        } else {
 +          log.info("Scanner returned null");
 +          fail("Scanner returned unexpected null value");
 +        }
 +
 +      }
 +
 +      log.debug("Scanner ran out of data. (info only, not an error) ");
 +
 +    }
 +  }
 +
 +  /**
 +   * Create splits on table and force migration by taking table offline and then bring back
online for test.
 +   *
 +   * @param conn
 +   *          Accumulo connector Accumulo connector to test cluster or MAC instance.
 +   */
 +  private void addSplits(final Connector conn, final String tableName) {
 +
 +    SortedSet<Text> splits = createSplits();
 +
 +    try {
 +
 +      conn.tableOperations().addSplits(tableName, splits);
 +
 +      conn.tableOperations().offline(tableName, true);
 +
 +      sleepUninterruptibly(2, TimeUnit.SECONDS);
 +      conn.tableOperations().online(tableName, true);
 +
 +      for (Text split : conn.tableOperations().listSplits(tableName)) {
 +        log.trace("Split {}", split);
 +      }
 +
 +    } catch (AccumuloSecurityException e) {
 +      throw new IllegalStateException("Initialization failed. Could not add splits to "
+ tableName, e);
 +    } catch (TableNotFoundException e) {
 +      throw new IllegalStateException("Initialization failed. Could not add splits to "
+ tableName, e);
 +    } catch (AccumuloException e) {
 +      throw new IllegalStateException("Initialization failed. Could not add splits to "
+ tableName, e);
 +    }
 +
 +  }
 +
 +  /**
 +   * Create splits to distribute data across multiple tservers.
 +   *
 +   * @return splits in sorted set for addSplits.
 +   */
 +  private SortedSet<Text> createSplits() {
 +
 +    SortedSet<Text> splits = new TreeSet<Text>();
 +
 +    for (int split = 0; split < 10; split++) {
 +      splits.add(new Text(Integer.toString(split)));
 +    }
 +
 +    return splits;
 +  }
 +
 +  /**
 +   * Generate some sample data using random row id to distribute across splits.
 +   * <p/>
 +   * The primary goal is to determine that each scanner is assigned a unique scan id. This
test does check that the count value for fam1 increases if a scanner
 +   * reads multiple value, but this is secondary consideration for this test, that is included
for completeness.
 +   *
 +   * @param connector
 +   *          Accumulo connector Accumulo connector to test cluster or MAC instance.
 +   */
 +  private void generateSampleData(Connector connector, final String tablename) {
 +
 +    try {
 +
 +      BatchWriter bw = connector.createBatchWriter(tablename, new BatchWriterConfig());
 +
 +      ColumnVisibility vis = new ColumnVisibility("public");
 +
 +      for (int i = 0; i < NUM_DATA_ROWS; i++) {
 +
 +        Text rowId = new Text(String.format("%d", ((random.nextInt(10) * 100) + i)));
 +
 +        Mutation m = new Mutation(rowId);
 +        m.put(new Text("fam1"), new Text("count"), new Value(Integer.toString(i).getBytes(UTF_8)));
 +        m.put(new Text("fam1"), new Text("positive"), vis, new Value(Integer.toString(NUM_DATA_ROWS
- i).getBytes(UTF_8)));
 +        m.put(new Text("fam1"), new Text("negative"), vis, new Value(Integer.toString(i
- NUM_DATA_ROWS).getBytes(UTF_8)));
 +
 +        log.trace("Added row {}", rowId);
 +
 +        bw.addMutation(m);
 +      }
 +
 +      bw.close();
 +    } catch (TableNotFoundException ex) {
 +      throw new IllegalStateException("Initialization failed. Could not create test data",
ex);
 +    } catch (MutationsRejectedException ex) {
 +      throw new IllegalStateException("Initialization failed. Could not create test data",
ex);
 +    }
 +  }
 +
 +  /**
 +   * Attach the test slow iterator so that we have time to read the scan id without creating
a large dataset. Uses a fairly large sleep and delay times because
 +   * we are not concerned with how much data is read and we do not read all of the data
- the test stops once each scanner reports a scan id.
 +   *
 +   * @param connector
 +   *          Accumulo connector Accumulo connector to test cluster or MAC instance.
 +   */
 +  private void attachSlowIterator(Connector connector, final String tablename) {
 +    try {
 +
 +      IteratorSetting slowIter = new IteratorSetting(50, "slowIter", "org.apache.accumulo.test.functional.SlowIterator");
 +      slowIter.addOption("sleepTime", "200");
 +      slowIter.addOption("seekSleepTime", "200");
 +
 +      connector.tableOperations().attachIterator(tablename, slowIter, EnumSet.of(IteratorUtil.IteratorScope.scan));
 +
 +    } catch (AccumuloException ex) {
 +      throw new IllegalStateException("Initialization failed. Could not attach slow iterator",
ex);
 +    } catch (TableNotFoundException ex) {
 +      throw new IllegalStateException("Initialization failed. Could not attach slow iterator",
ex);
 +    } catch (AccumuloSecurityException ex) {
 +      throw new IllegalStateException("Initialization failed. Could not attach slow iterator",
ex);
 +    }
 +  }
 +
 +}


Mime
View raw message