accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlmar...@apache.org
Subject [3/3] accumulo git commit: ACCUMULO-3948: Add ability to set contextClassLoader on Scanners and BatchScanners
Date Tue, 17 Nov 2015 18:33:58 GMT
ACCUMULO-3948: Add ability to set contextClassLoader on Scanners and BatchScanners


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0e1da5a5
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0e1da5a5
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0e1da5a5

Branch: refs/heads/master
Commit: 0e1da5a58b57a491fc3c468edacdc4b53fc64010
Parents: 3521b35
Author: Dave Marion <dlmarion@apache.org>
Authored: Tue Nov 17 13:32:19 2015 -0500
Committer: Dave Marion <dlmarion@apache.org>
Committed: Tue Nov 17 13:32:19 2015 -0500

----------------------------------------------------------------------
 .../core/client/ConditionalWriterConfig.java    |  39 ++
 .../accumulo/core/client/ScannerBase.java       |  27 ++
 .../core/client/impl/ConditionalWriterImpl.java |   4 +-
 .../core/client/impl/ScannerIterator.java       |   2 +-
 .../core/client/impl/ScannerOptions.java        |  20 +
 .../impl/TabletServerBatchReaderIterator.java   |   2 +-
 .../core/client/impl/ThriftScanner.java         |  15 +-
 .../core/client/mapred/AbstractInputFormat.java |  30 +-
 .../client/mapreduce/AbstractInputFormat.java   |  30 +-
 .../mapreduce/lib/impl/InputConfigurator.java   |  32 +-
 .../core/client/mock/MockScannerBase.java       |   5 +
 .../accumulo/core/iterators/IteratorUtil.java   |  24 +-
 .../core/metadata/MetadataLocationObtainer.java |   4 +-
 .../core/tabletserver/thrift/ActiveScan.java    | 108 ++++-
 .../thrift/TabletClientService.java             | 396 +++++++++++++++++--
 core/src/main/thrift/tabletserver.thrift        |   9 +-
 .../main/asciidoc/chapters/administration.txt   |  43 ++
 .../server/util/VerifyTabletAssignments.java    |   2 +-
 .../monitor/servlets/trace/NullScanner.java     |  11 +
 .../apache/accumulo/tserver/TabletServer.java   |  19 +-
 .../accumulo/tserver/scan/LookupTask.java       |   2 +-
 .../tserver/session/ConditionalSession.java     |   4 +-
 .../tserver/session/MultiScanSession.java       |   4 +-
 .../accumulo/tserver/session/ScanSession.java   |   4 +-
 .../tserver/session/SessionManager.java         |   4 +-
 .../accumulo/tserver/tablet/ScanDataSource.java |  13 +-
 .../accumulo/tserver/tablet/ScanOptions.java    |  12 +-
 .../apache/accumulo/tserver/tablet/Tablet.java  |  11 +-
 .../accumulo/shell/commands/ScanCommand.java    |  12 +-
 test/pom.xml                                    |  19 +
 .../org/apache/accumulo/test/ShellServerIT.java |  91 +++++
 .../test/functional/ScannerContextIT.java       | 341 ++++++++++++++++
 .../test/performance/thrift/NullTserver.java    |   7 +-
 .../main/resources/ShellServerIT-iterators.jar  | Bin 0 -> 3148 bytes
 .../test/functional/ValueReversingIterator.java |  68 ++++
 35 files changed, 1316 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e1da5a5/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java
b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java
index b5cb474..cb9f6fb 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java
@@ -22,6 +22,8 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.security.Authorizations;
 
+import com.google.common.base.Preconditions;
+
 /**
  *
  * @since 1.6.0
@@ -38,6 +40,8 @@ public class ConditionalWriterConfig {
 
   private Durability durability = Durability.DEFAULT;
 
+  private String classLoaderContext = null;
+
   /**
    * A set of authorization labels that will be checked against the column visibility of
each key in order to filter data. The authorizations passed in must be
    * a subset of the accumulo user's set of authorizations. If the accumulo user has authorizations
(A1, A2) and authorizations (A2, A3) are passed, then an
@@ -133,4 +137,39 @@ public class ConditionalWriterConfig {
   public Durability getDurability() {
     return durability;
   }
+
+  /**
+   * Sets the name of the classloader context on this scanner. See the administration chapter
of the user manual for details on how to configure and use
+   * classloader contexts.
+   *
+   * @param classLoaderContext
+   *          name of the classloader context
+   * @throws NullPointerException
+   *           if context is null
+   * @since 1.8.0
+   */
+  public void setClassLoaderContext(String classLoaderContext) {
+    Preconditions.checkNotNull(classLoaderContext, "context name cannot be null");
+    this.classLoaderContext = classLoaderContext;
+  }
+
+  /**
+   * Clears the current classloader context set on this scanner
+   *
+   * @since 1.8.0
+   */
+  public void clearClassLoaderContext() {
+    this.classLoaderContext = null;
+  }
+
+  /**
+   * Returns the name of the current classloader context set on this scanner
+   *
+   * @return name of the current context
+   * @since 1.8.0
+   */
+  public String getClassLoaderContext() {
+    return this.classLoaderContext;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e1da5a5/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
index 5642785..aed67bc 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
@@ -242,4 +242,31 @@ public interface ScannerBase extends Iterable<Entry<Key,Value>>
{
    * @since 1.8.0
    */
   long getBatchTimeout(TimeUnit timeUnit);
+
+  /**
+   * Sets the name of the classloader context on this scanner. See the administration chapter
of the user manual for details on how to configure and use
+   * classloader contexts.
+   *
+   * @param classLoaderContext
+   *          name of the classloader context
+   * @throws NullPointerException
+   *           if context is null
+   * @since 1.8.0
+   */
+  void setClassLoaderContext(String classLoaderContext);
+
+  /**
+   * Clears the current classloader context set on this scanner
+   *
+   * @since 1.8.0
+   */
+  void clearClassLoaderContext();
+
+  /**
+   * Returns the name of the current classloader context set on this scanner
+   *
+   * @return name of the current context
+   * @since 1.8.0
+   */
+  String getClassLoaderContext();
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e1da5a5/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
index 6c6a551..546de9c 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
@@ -113,6 +113,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
   private String tableId;
   private long timeout;
   private final Durability durability;
+  private final String classLoaderContext;
 
   private static class ServerQueue {
     BlockingQueue<TabletServerMutations<QCMutation>> queue = new LinkedBlockingQueue<TabletServerMutations<QCMutation>>();
@@ -389,6 +390,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
     this.tableId = tableId;
     this.timeout = config.getTimeout(TimeUnit.MILLISECONDS);
     this.durability = config.getDurability();
+    this.classLoaderContext = config.getClassLoaderContext();
 
     Runnable failureHandler = new Runnable() {
 
@@ -506,7 +508,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
     }
 
     TConditionalSession tcs = client.startConditionalUpdate(tinfo, context.rpcCreds(), ByteBufferUtil.toByteBuffers(auths.getAuthorizations()),
tableId,
-        DurabilityImpl.toThrift(durability));
+        DurabilityImpl.toThrift(durability), this.classLoaderContext);
 
     synchronized (cachedSessionIDs) {
       SessionID sid = new SessionID();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e1da5a5/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java
index 55b0a85..d6512f0 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java
@@ -121,7 +121,7 @@ public class ScannerIterator implements Iterator<Entry<Key,Value>>
{
     }
 
     scanState = new ScanState(context, tableId, authorizations, new Range(range), options.fetchedColumns,
size, options.serverSideIteratorList,
-        options.serverSideIteratorOptions, isolated, readaheadThreshold, options.getSamplerConfiguration(),
options.batchTimeOut);
+        options.serverSideIteratorOptions, isolated, readaheadThreshold, options.getSamplerConfiguration(),
options.batchTimeOut, options.classLoaderContext);
 
     // If we want to start readahead immediately, don't wait for hasNext to be called
     if (0l == readaheadThreshold) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e1da5a5/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
index 8d96464..0782a7f 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
@@ -58,6 +58,8 @@ public class ScannerOptions implements ScannerBase {
 
   private SamplerConfiguration samplerConfig = null;
 
+  protected String classLoaderContext = null;
+
   protected ScannerOptions() {}
 
   public ScannerOptions(ScannerOptions so) {
@@ -168,6 +170,7 @@ public class ScannerOptions implements ScannerBase {
         dst.regexIterName = src.regexIterName;
         dst.fetchedColumns = new TreeSet<Column>(src.fetchedColumns);
         dst.serverSideIteratorList = new ArrayList<IterInfo>(src.serverSideIteratorList);
+        dst.classLoaderContext = src.classLoaderContext;
 
         dst.serverSideIteratorOptions = new HashMap<String,Map<String,String>>();
         Set<Entry<String,Map<String,String>>> es = src.serverSideIteratorOptions.entrySet();
@@ -244,4 +247,21 @@ public class ScannerOptions implements ScannerBase {
   public long getBatchTimeout(TimeUnit timeUnit) {
     return timeUnit.convert(batchTimeOut, TimeUnit.MILLISECONDS);
   }
+
+  @Override
+  public void setClassLoaderContext(String classLoaderContext) {
+    Preconditions.checkNotNull(classLoaderContext, "classloader context name cannot be null");
+    this.classLoaderContext = classLoaderContext;
+  }
+
+  @Override
+  public void clearClassLoaderContext() {
+    this.classLoaderContext = null;
+  }
+
+  @Override
+  public String getClassLoaderContext() {
+    return this.classLoaderContext;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e1da5a5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
index 1ff56b9..814e71b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
@@ -649,7 +649,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
         InitialMultiScan imsr = client.startMultiScan(Tracer.traceInfo(), context.rpcCreds(),
thriftTabletRanges,
             Translator.translate(columns, Translators.CT), options.serverSideIteratorList,
options.serverSideIteratorOptions,
             ByteBufferUtil.toByteBuffers(authorizations.getAuthorizations()), waitForWrites,
-            SamplerConfigurationImpl.toThrift(options.getSamplerConfiguration()), options.batchTimeOut);
+            SamplerConfigurationImpl.toThrift(options.getSamplerConfiguration()), options.batchTimeOut,
options.classLoaderContext);
         if (waitForWrites)
           ThriftScanner.serversWaitedForWrites.get(ttype).add(server.toString());
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e1da5a5/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
index 52f3330..57f5102 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
@@ -85,7 +85,8 @@ public class ThriftScanner {
 
   public static boolean getBatchFromServer(ClientContext context, Range range, KeyExtent
extent, String server, SortedMap<Key,Value> results,
       SortedSet<Column> fetchedColumns, List<IterInfo> serverSideIteratorList,
Map<String,Map<String,String>> serverSideIteratorOptions, int size,
-      Authorizations authorizations, boolean retry, long batchTimeOut) throws AccumuloException,
AccumuloSecurityException, NotServingTabletException {
+      Authorizations authorizations, boolean retry, long batchTimeOut, String classLoaderContext)
throws AccumuloException, AccumuloSecurityException,
+      NotServingTabletException {
     if (server == null)
       throw new AccumuloException(new IOException());
 
@@ -96,13 +97,14 @@ public class ThriftScanner {
       try {
         // not reading whole rows (or stopping on row boundries) so there is no need to enable
isolation below
         ScanState scanState = new ScanState(context, extent.getTableId(), authorizations,
range, fetchedColumns, size, serverSideIteratorList,
-            serverSideIteratorOptions, false, Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD,
null, batchTimeOut);
+            serverSideIteratorOptions, false, Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD,
null, batchTimeOut, classLoaderContext);
 
         TabletType ttype = TabletType.type(extent);
         boolean waitForWrites = !serversWaitedForWrites.get(ttype).contains(server);
         InitialScan isr = client.startScan(tinfo, scanState.context.rpcCreds(), extent.toThrift(),
scanState.range.toThrift(),
             Translator.translate(scanState.columns, Translators.CT), scanState.size, scanState.serverSideIteratorList,
scanState.serverSideIteratorOptions,
-            scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated,
scanState.readaheadThreshold, null, scanState.batchTimeOut);
+            scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated,
scanState.readaheadThreshold, null, scanState.batchTimeOut,
+            classLoaderContext);
         if (waitForWrites)
           serversWaitedForWrites.get(ttype).add(server);
 
@@ -151,6 +153,8 @@ public class ThriftScanner {
     TabletLocation prevLoc;
     Long scanID;
 
+    String classLoaderContext;
+
     boolean finished = false;
 
     List<IterInfo> serverSideIteratorList;
@@ -161,10 +165,11 @@ public class ThriftScanner {
 
     public ScanState(ClientContext context, Text tableId, Authorizations authorizations,
Range range, SortedSet<Column> fetchedColumns, int size,
         List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>>
serverSideIteratorOptions, boolean isolated, long readaheadThreshold,
-        SamplerConfiguration samplerConfig, long batchTimeOut) {
+        SamplerConfiguration samplerConfig, long batchTimeOut, String classLoaderContext)
{
       this.context = context;
 
       this.authorizations = authorizations;
+      this.classLoaderContext = classLoaderContext;
 
       columns = new ArrayList<Column>(fetchedColumns.size());
       for (Column column : fetchedColumns) {
@@ -425,7 +430,7 @@ public class ThriftScanner {
         InitialScan is = client.startScan(tinfo, scanState.context.rpcCreds(), loc.tablet_extent.toThrift(),
scanState.range.toThrift(),
             Translator.translate(scanState.columns, Translators.CT), scanState.size, scanState.serverSideIteratorList,
scanState.serverSideIteratorOptions,
             scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated,
scanState.readaheadThreshold,
-            SamplerConfigurationImpl.toThrift(scanState.samplerConfig), scanState.batchTimeOut);
+            SamplerConfigurationImpl.toThrift(scanState.samplerConfig), scanState.batchTimeOut,
scanState.classLoaderContext);
         if (waitForWrites)
           serversWaitedForWrites.get(ttype).add(loc.tablet_location);
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e1da5a5/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
index b581deb..794500e 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java
@@ -90,6 +90,31 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V>
{
   protected static final Logger log = Logger.getLogger(CLASS);
 
   /**
+   * Sets the name of the classloader context on this scanner
+   *
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param context
+   *          name of the classloader context
+   * @since 1.8.0
+   */
+  public static void setClassLoaderContext(JobConf job, String context) {
+    InputConfigurator.setClassLoaderContext(CLASS, job, context);
+  }
+
+  /**
+   * Returns the name of the current classloader context set on this scanner
+   *
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @return name of the current context
+   * @since 1.8.0
+   */
+  public static String getClassLoaderContext(JobConf job) {
+    return InputConfigurator.getClassLoaderContext(CLASS, job);
+  }
+
+  /**
    * Sets the connector information needed to communicate with Accumulo in this job.
    *
    * <p>
@@ -484,7 +509,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V>
{
       if (null == authorizations) {
         authorizations = getScanAuthorizations(job);
       }
-
+      String classLoaderContext = getClassLoaderContext(job);
       String table = baseSplit.getTableName();
 
       // in case the table name changed, we can still use the previous name for terms of
configuration,
@@ -504,6 +529,9 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V>
{
           int scanThreads = 1;
           scanner = instance.getConnector(principal, token).createBatchScanner(baseSplit.getTableName(),
authorizations, scanThreads);
           setupIterators(job, scanner, baseSplit.getTableName(), baseSplit);
+          if (null != classLoaderContext) {
+            scanner.setClassLoaderContext(classLoaderContext);
+          }
         } catch (Exception e) {
           throw new IOException(e);
         }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e1da5a5/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
index 0e51f03..cf168cd 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java
@@ -93,6 +93,31 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
   protected static final Logger log = Logger.getLogger(CLASS);
 
   /**
+   * Sets the name of the classloader context on this scanner
+   *
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param context
+   *          name of the classloader context
+   * @since 1.8.0
+   */
+  public static void setClassLoaderContext(Job job, String context) {
+    InputConfigurator.setClassLoaderContext(CLASS, job.getConfiguration(), context);
+  }
+
+  /**
+   * Returns the name of the current classloader context set on this scanner
+   *
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @return name of the current context
+   * @since 1.8.0
+   */
+  public static String getClassLoaderContext(JobContext job) {
+    return InputConfigurator.getClassLoaderContext(CLASS, job.getConfiguration());
+  }
+
+  /**
    * Sets the connector information needed to communicate with Accumulo in this job.
    *
    * <p>
@@ -515,7 +540,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
       if (null == authorizations) {
         authorizations = getScanAuthorizations(attempt);
       }
-
+      String classLoaderContext = getClassLoaderContext(attempt);
       String table = split.getTableName();
 
       // in case the table name changed, we can still use the previous name for terms of
configuration,
@@ -535,6 +560,9 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V>
{
           int scanThreads = 1;
           scanner = instance.getConnector(principal, token).createBatchScanner(split.getTableName(),
authorizations, scanThreads);
           setupIterators(attempt, scanner, split.getTableName(), split);
+          if (null != classLoaderContext) {
+            scanner.setClassLoaderContext(classLoaderContext);
+          }
         } catch (Exception e) {
           e.printStackTrace();
           throw new IOException(e);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e1da5a5/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java
b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java
index 6ba34af..43a776a 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/InputConfigurator.java
@@ -90,7 +90,7 @@ public class InputConfigurator extends ConfiguratorBase {
    * @since 1.6.0
    */
   public static enum ScanOpts {
-    TABLE_NAME, AUTHORIZATIONS, RANGES, COLUMNS, ITERATORS, TABLE_CONFIGS, SAMPLER_CONFIG
+    TABLE_NAME, AUTHORIZATIONS, RANGES, COLUMNS, ITERATORS, TABLE_CONFIGS, SAMPLER_CONFIG,
CLASSLOADER_CONTEXT
   }
 
   /**
@@ -103,6 +103,36 @@ public class InputConfigurator extends ConfiguratorBase {
   }
 
   /**
+   * Sets the name of the context classloader to use for scans
+   *
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration
key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @param context
+   *          the name of the context classloader
+   * @since 1.8.0
+   */
+  public static void setClassLoaderContext(Class<?> implementingClass, Configuration
conf, String context) {
+    checkArgument(context != null, "context is null");
+    conf.set(enumToConfKey(implementingClass, ScanOpts.CLASSLOADER_CONTEXT), context);
+  }
+
+  /**
+   * Gets the name of the context classloader to use for scans
+   *
+   * @param implementingClass
+   *          the class whose name will be used as a prefix for the property configuration
key
+   * @param conf
+   *          the Hadoop configuration object to configure
+   * @return the classloader context name
+   * @since 1.8.0
+   */
+  public static String getClassLoaderContext(Class<?> implementingClass, Configuration
conf) {
+    return conf.get(enumToConfKey(implementingClass, ScanOpts.CLASSLOADER_CONTEXT), null);
+  }
+
+  /**
    * Sets the name of the input table, over which this job will scan.
    *
    * @param implementingClass

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e1da5a5/core/src/main/java/org/apache/accumulo/core/client/mock/MockScannerBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockScannerBase.java
b/core/src/main/java/org/apache/accumulo/core/client/mock/MockScannerBase.java
index 45b65e9..9302fc9 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockScannerBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockScannerBase.java
@@ -151,4 +151,9 @@ public class MockScannerBase extends ScannerOptions implements ScannerBase
{
   public Authorizations getAuthorizations() {
     return auths;
   }
+
+  @Override
+  public void setClassLoaderContext(String context) {
+    throw new UnsupportedOperationException();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e1da5a5/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
index 81f3c08..6523367 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
@@ -217,12 +217,8 @@ public class IteratorUtil {
     return loadIterators(scope, source, extent, conf, ssiList, ssio, env, true);
   }
 
-  public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V>
loadIterators(IteratorScope scope,
-      SortedKeyValueIterator<K,V> source, KeyExtent extent, AccumuloConfiguration conf,
List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
-      IteratorEnvironment env, boolean useAccumuloClassLoader) throws IOException {
-    List<IterInfo> iters = new ArrayList<IterInfo>(ssiList);
-    Map<String,Map<String,String>> allOptions = new HashMap<String,Map<String,String>>();
-
+  private static void parseIteratorConfiguration(IteratorScope scope, List<IterInfo>
iters, Map<String,Map<String,String>> ssio,
+      Map<String,Map<String,String>> allOptions, AccumuloConfiguration conf)
{
     parseIterConf(scope, iters, allOptions, conf);
 
     for (Entry<String,Map<String,String>> entry : ssio.entrySet()) {
@@ -235,10 +231,26 @@ public class IteratorUtil {
         options.putAll(entry.getValue());
       }
     }
+  }
 
+  public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V>
loadIterators(IteratorScope scope,
+      SortedKeyValueIterator<K,V> source, KeyExtent extent, AccumuloConfiguration conf,
List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
+      IteratorEnvironment env, boolean useAccumuloClassLoader) throws IOException {
+    List<IterInfo> iters = new ArrayList<IterInfo>(ssiList);
+    Map<String,Map<String,String>> allOptions = new HashMap<String,Map<String,String>>();
+    parseIteratorConfiguration(scope, iters, ssio, allOptions, conf);
     return loadIterators(source, iters, allOptions, env, useAccumuloClassLoader, conf.get(Property.TABLE_CLASSPATH));
   }
 
+  public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V>
loadIterators(IteratorScope scope,
+      SortedKeyValueIterator<K,V> source, KeyExtent extent, AccumuloConfiguration conf,
List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
+      IteratorEnvironment env, boolean useAccumuloClassLoader, String classLoaderContext)
throws IOException {
+    List<IterInfo> iters = new ArrayList<IterInfo>(ssiList);
+    Map<String,Map<String,String>> allOptions = new HashMap<String,Map<String,String>>();
+    parseIteratorConfiguration(scope, iters, ssio, allOptions, conf);
+    return loadIterators(source, iters, allOptions, env, useAccumuloClassLoader, classLoaderContext);
+  }
+
   @SuppressWarnings("unchecked")
   public static <K extends WritableComparable<?>,V extends Writable> SortedKeyValueIterator<K,V>
loadIterators(SortedKeyValueIterator<K,V> source,
       Collection<IterInfo> iters, Map<String,Map<String,String>> iterOpts,
IteratorEnvironment env, boolean useAccumuloClassLoader, String context)

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e1da5a5/core/src/main/java/org/apache/accumulo/core/metadata/MetadataLocationObtainer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/MetadataLocationObtainer.java
b/core/src/main/java/org/apache/accumulo/core/metadata/MetadataLocationObtainer.java
index 42dd881..c59fab2 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/MetadataLocationObtainer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/MetadataLocationObtainer.java
@@ -99,7 +99,7 @@ public class MetadataLocationObtainer implements TabletLocationObtainer
{
       serverSideIteratorList.add(new IterInfo(10000, WholeRowIterator.class.getName(), "WRI"));
       Map<String,Map<String,String>> serverSideIteratorOptions = Collections.emptyMap();
       boolean more = ThriftScanner.getBatchFromServer(context, range, src.tablet_extent,
src.tablet_location, encodedResults, locCols, serverSideIteratorList,
-          serverSideIteratorOptions, Constants.SCAN_BATCH_SIZE, Authorizations.EMPTY, false,
0L);
+          serverSideIteratorOptions, Constants.SCAN_BATCH_SIZE, Authorizations.EMPTY, false,
0L, null);
 
       decodeRows(encodedResults, results);
 
@@ -107,7 +107,7 @@ public class MetadataLocationObtainer implements TabletLocationObtainer
{
         range = new Range(results.lastKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME),
true, new Key(stopRow).followingKey(PartialKey.ROW), false);
         encodedResults.clear();
         more = ThriftScanner.getBatchFromServer(context, range, src.tablet_extent, src.tablet_location,
encodedResults, locCols, serverSideIteratorList,
-            serverSideIteratorOptions, Constants.SCAN_BATCH_SIZE, Authorizations.EMPTY, false,
0L);
+            serverSideIteratorOptions, Constants.SCAN_BATCH_SIZE, Authorizations.EMPTY, false,
0L, null);
 
         decodeRows(encodedResults, results);
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0e1da5a5/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/ActiveScan.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/ActiveScan.java
b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/ActiveScan.java
index fe389e4..e46c397 100644
--- a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/ActiveScan.java
+++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/ActiveScan.java
@@ -64,6 +64,7 @@ import org.slf4j.LoggerFactory;
   private static final org.apache.thrift.protocol.TField SSIO_FIELD_DESC = new org.apache.thrift.protocol.TField("ssio",
org.apache.thrift.protocol.TType.MAP, (short)12);
   private static final org.apache.thrift.protocol.TField AUTHORIZATIONS_FIELD_DESC = new
org.apache.thrift.protocol.TField("authorizations", org.apache.thrift.protocol.TType.LIST,
(short)13);
   private static final org.apache.thrift.protocol.TField SCAN_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("scanId",
org.apache.thrift.protocol.TType.I64, (short)14);
+  private static final org.apache.thrift.protocol.TField CLASS_LOADER_CONTEXT_FIELD_DESC
= new org.apache.thrift.protocol.TField("classLoaderContext", org.apache.thrift.protocol.TType.STRING,
(short)15);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes =
new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -92,6 +93,7 @@ import org.slf4j.LoggerFactory;
   public Map<String,Map<String,String>> ssio; // required
   public List<ByteBuffer> authorizations; // required
   public long scanId; // optional
+  public String classLoaderContext; // required
 
   /** The set of fields this struct contains, along with convenience methods for finding
and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -115,7 +117,8 @@ import org.slf4j.LoggerFactory;
     SSI_LIST((short)11, "ssiList"),
     SSIO((short)12, "ssio"),
     AUTHORIZATIONS((short)13, "authorizations"),
-    SCAN_ID((short)14, "scanId");
+    SCAN_ID((short)14, "scanId"),
+    CLASS_LOADER_CONTEXT((short)15, "classLoaderContext");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -156,6 +159,8 @@ import org.slf4j.LoggerFactory;
           return AUTHORIZATIONS;
         case 14: // SCAN_ID
           return SCAN_ID;
+        case 15: // CLASS_LOADER_CONTEXT
+          return CLASS_LOADER_CONTEXT;
         default:
           return null;
       }
@@ -237,6 +242,8 @@ import org.slf4j.LoggerFactory;
             new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING
           , true))));
     tmpMap.put(_Fields.SCAN_ID, new org.apache.thrift.meta_data.FieldMetaData("scanId", org.apache.thrift.TFieldRequirementType.OPTIONAL,

         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    tmpMap.put(_Fields.CLASS_LOADER_CONTEXT, new org.apache.thrift.meta_data.FieldMetaData("classLoaderContext",
org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ActiveScan.class, metaDataMap);
   }
@@ -256,7 +263,8 @@ import org.slf4j.LoggerFactory;
     List<org.apache.accumulo.core.data.thrift.TColumn> columns,
     List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList,
     Map<String,Map<String,String>> ssio,
-    List<ByteBuffer> authorizations)
+    List<ByteBuffer> authorizations,
+    String classLoaderContext)
   {
     this();
     this.client = client;
@@ -273,6 +281,7 @@ import org.slf4j.LoggerFactory;
     this.ssiList = ssiList;
     this.ssio = ssio;
     this.authorizations = authorizations;
+    this.classLoaderContext = classLoaderContext;
   }
 
   /**
@@ -334,6 +343,9 @@ import org.slf4j.LoggerFactory;
       this.authorizations = __this__authorizations;
     }
     this.scanId = other.scanId;
+    if (other.isSetClassLoaderContext()) {
+      this.classLoaderContext = other.classLoaderContext;
+    }
   }
 
   public ActiveScan deepCopy() {
@@ -358,6 +370,7 @@ import org.slf4j.LoggerFactory;
     this.authorizations = null;
     setScanIdIsSet(false);
     this.scanId = 0;
+    this.classLoaderContext = null;
   }
 
   public String getClient() {
@@ -741,6 +754,30 @@ import org.slf4j.LoggerFactory;
     __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __SCANID_ISSET_ID, value);
   }
 
+  public String getClassLoaderContext() {
+    return this.classLoaderContext;
+  }
+
+  public ActiveScan setClassLoaderContext(String classLoaderContext) {
+    this.classLoaderContext = classLoaderContext;
+    return this;
+  }
+
+  public void unsetClassLoaderContext() {
+    this.classLoaderContext = null;
+  }
+
+  /** Returns true if field classLoaderContext is set (has been assigned a value) and false
otherwise */
+  public boolean isSetClassLoaderContext() {
+    return this.classLoaderContext != null;
+  }
+
+  public void setClassLoaderContextIsSet(boolean value) {
+    if (!value) {
+      this.classLoaderContext = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case CLIENT:
@@ -847,6 +884,14 @@ import org.slf4j.LoggerFactory;
       }
       break;
 
+    case CLASS_LOADER_CONTEXT:
+      if (value == null) {
+        unsetClassLoaderContext();
+      } else {
+        setClassLoaderContext((String)value);
+      }
+      break;
+
     }
   }
 
@@ -891,6 +936,9 @@ import org.slf4j.LoggerFactory;
     case SCAN_ID:
       return Long.valueOf(getScanId());
 
+    case CLASS_LOADER_CONTEXT:
+      return getClassLoaderContext();
+
     }
     throw new IllegalStateException();
   }
@@ -928,6 +976,8 @@ import org.slf4j.LoggerFactory;
       return isSetAuthorizations();
     case SCAN_ID:
       return isSetScanId();
+    case CLASS_LOADER_CONTEXT:
+      return isSetClassLoaderContext();
     }
     throw new IllegalStateException();
   }
@@ -1062,6 +1112,15 @@ import org.slf4j.LoggerFactory;
         return false;
     }
 
+    boolean this_present_classLoaderContext = true && this.isSetClassLoaderContext();
+    boolean that_present_classLoaderContext = true && that.isSetClassLoaderContext();
+    if (this_present_classLoaderContext || that_present_classLoaderContext) {
+      if (!(this_present_classLoaderContext && that_present_classLoaderContext))
+        return false;
+      if (!this.classLoaderContext.equals(that.classLoaderContext))
+        return false;
+    }
+
     return true;
   }
 
@@ -1208,6 +1267,16 @@ import org.slf4j.LoggerFactory;
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(isSetClassLoaderContext()).compareTo(other.isSetClassLoaderContext());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetClassLoaderContext()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.classLoaderContext, other.classLoaderContext);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -1321,6 +1390,14 @@ import org.slf4j.LoggerFactory;
       sb.append(this.scanId);
       first = false;
     }
+    if (!first) sb.append(", ");
+    sb.append("classLoaderContext:");
+    if (this.classLoaderContext == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.classLoaderContext);
+    }
+    first = false;
     sb.append(")");
     return sb.toString();
   }
@@ -1530,6 +1607,14 @@ import org.slf4j.LoggerFactory;
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 15: // CLASS_LOADER_CONTEXT
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.classLoaderContext = iprot.readString();
+              struct.setClassLoaderContextIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -1643,6 +1728,11 @@ import org.slf4j.LoggerFactory;
         oprot.writeI64(struct.scanId);
         oprot.writeFieldEnd();
       }
+      if (struct.classLoaderContext != null) {
+        oprot.writeFieldBegin(CLASS_LOADER_CONTEXT_FIELD_DESC);
+        oprot.writeString(struct.classLoaderContext);
+        oprot.writeFieldEnd();
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -1700,7 +1790,10 @@ import org.slf4j.LoggerFactory;
       if (struct.isSetScanId()) {
         optionals.set(12);
       }
-      oprot.writeBitSet(optionals, 13);
+      if (struct.isSetClassLoaderContext()) {
+        optionals.set(13);
+      }
+      oprot.writeBitSet(optionals, 14);
       if (struct.isSetClient()) {
         oprot.writeString(struct.client);
       }
@@ -1772,12 +1865,15 @@ import org.slf4j.LoggerFactory;
       if (struct.isSetScanId()) {
         oprot.writeI64(struct.scanId);
       }
+      if (struct.isSetClassLoaderContext()) {
+        oprot.writeString(struct.classLoaderContext);
+      }
     }
 
     @Override
     public void read(org.apache.thrift.protocol.TProtocol prot, ActiveScan struct) throws
org.apache.thrift.TException {
       TTupleProtocol iprot = (TTupleProtocol) prot;
-      BitSet incoming = iprot.readBitSet(13);
+      BitSet incoming = iprot.readBitSet(14);
       if (incoming.get(0)) {
         struct.client = iprot.readString();
         struct.setClientIsSet(true);
@@ -1882,6 +1978,10 @@ import org.slf4j.LoggerFactory;
         struct.scanId = iprot.readI64();
         struct.setScanIdIsSet(true);
       }
+      if (incoming.get(13)) {
+        struct.classLoaderContext = iprot.readString();
+        struct.setClassLoaderContextIsSet(true);
+      }
     }
   }
 


Mime
View raw message