incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Blur platform commands can now be reloaded externally.
Date Mon, 22 Sep 2014 20:13:09 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 6ed7dc2a0 -> 21ff08341


Blur platform commands can now be reloaded externally.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/21ff0834
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/21ff0834
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/21ff0834

Branch: refs/heads/master
Commit: 21ff083415a41f7519dd105162245a785c531833
Parents: 6ed7dc2
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Sep 22 16:12:56 2014 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Sep 22 16:12:56 2014 -0400

----------------------------------------------------------------------
 .../apache/blur/command/BaseCommandManager.java | 158 +++++++++++++++++--
 .../blur/command/ControllerCommandManager.java  |   6 +-
 .../blur/command/ShardCommandManager.java       |  10 +-
 .../blur/thrift/ThriftBlurControllerServer.java |  18 ++-
 .../blur/thrift/ThriftBlurShardServer.java      |  12 +-
 .../org/apache/blur/thrift/ThriftServer.java    |  39 +++++
 .../blur/command/ShardCommandManagerTest.java   |  77 ++++++++-
 .../services/org.apache.blur.command.Commands   |  16 ++
 .../apache/blur/command/test1/TestCommand.java  |  37 +++++
 .../org/apache/blur/command/test1/howto.txt     |  19 +++
 .../org/apache/blur/command/test1/test1.jar     | Bin 0 -> 1630 bytes
 .../services/org.apache.blur.command.Commands   |  16 ++
 .../apache/blur/command/test2/TestCommand.java  |  37 +++++
 .../org/apache/blur/command/test2/test2.jar     | Bin 0 -> 1630 bytes
 .../org/apache/blur/utils/BlurConstants.java    |  11 +-
 .../src/main/resources/blur-default.properties  |  18 +++
 16 files changed, 451 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/21ff0834/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java b/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
index 5e40f9a..b36c63a 100644
--- a/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
@@ -1,15 +1,25 @@
 package org.apache.blur.command;
 
 import java.io.Closeable;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.math.BigInteger;
+import java.net.MalformedURLException;
 import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
 import java.util.Enumeration;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
@@ -22,6 +32,12 @@ import java.util.concurrent.TimeUnit;
 import org.apache.blur.concurrent.Executors;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 import com.google.common.collect.MapMaker;
 
@@ -54,22 +70,141 @@ public class BaseCommandManager implements Closeable {
   protected final Map<Class<? extends Command>, String> _commandNameLookup =
new ConcurrentHashMap<Class<? extends Command>, String>();
   protected final ConcurrentMap<ExecutionId, Future<Response>> _runningMap;
   protected final long _connectionTimeout;
+  protected final String _tmpPath;
+  protected final String _commandPath;
+  protected final Timer _timer;
+  protected final long _pollingPeriod = TimeUnit.SECONDS.toMillis(15);
+  protected final Map<Path, BigInteger> _commandLastChange = new ConcurrentHashMap<Path,
BigInteger>();
+  protected final Configuration _configuration;
 
-  public BaseCommandManager(int threadCount, long connectionTimeout) throws IOException {
-    lookForCommandsToRegister();
-    _executorService = Executors.newThreadPool("command-", threadCount);
-    _executorServiceDriver = Executors.newThreadPool("command-driver-", threadCount);
+  public BaseCommandManager(String tmpPath, String commandPath, int workerThreadCount, int
driverThreadCount,
+      long connectionTimeout, Configuration configuration) throws IOException {
+    _configuration = configuration;
+    lookForCommandsToRegisterInClassPath();
+    _tmpPath = tmpPath;
+    _commandPath = commandPath;
+    _executorService = Executors.newThreadPool("command-worker-", workerThreadCount);
+    _executorServiceDriver = Executors.newThreadPool("command-driver-", driverThreadCount);
     _connectionTimeout = connectionTimeout / 2;
     _runningMap = new MapMaker().weakKeys().makeMap();
+    if (_tmpPath == null || _commandPath == null) {
+      _timer = null;
+      LOG.info("Tmp Path [{0}] or Command Path [{1}] is null so the automatic command reload
will be disabled.",
+          _tmpPath, _commandPath);
+    } else {
+      loadNewCommandsFromCommandPath();
+      _timer = new Timer("Command-Loader", true);
+      _timer.schedule(getNewCommandTimerTask(), _pollingPeriod, _pollingPeriod);
+    }
   }
 
-  @SuppressWarnings("unchecked")
-  private void lookForCommandsToRegister() throws IOException {
+  protected TimerTask getNewCommandTimerTask() {
+    return new TimerTask() {
+      @Override
+      public void run() {
+        try {
+          loadNewCommandsFromCommandPath();
+        } catch (Throwable t) {
+          LOG.error("Unknown error while trying to load new commands.", t);
+        }
+      }
+    };
+  }
+
+  public void commandRefresh() throws IOException {
+    loadNewCommandsFromCommandPath();
+  }
+
+  protected synchronized void loadNewCommandsFromCommandPath() throws IOException {
+    Path path = new Path(_commandPath);
+    FileSystem fileSystem = path.getFileSystem(_configuration);
+    FileStatus[] listStatus = fileSystem.listStatus(path);
+    for (FileStatus fileStatus : listStatus) {
+      BigInteger contentsCheck = checkContents(fileStatus, fileSystem);
+      Path entryPath = fileStatus.getPath();
+      BigInteger currentValue = _commandLastChange.get(entryPath);
+      if (!contentsCheck.equals(currentValue)) {
+        loadNewCommand(fileSystem, fileStatus, contentsCheck);
+        _commandLastChange.put(entryPath, contentsCheck);
+      }
+    }
+  }
+
+  protected void loadNewCommand(FileSystem fileSystem, FileStatus fileStatus, BigInteger
hashOfContents)
+      throws IOException {
+    File file = new File(_tmpPath, UUID.randomUUID().toString());
+    if (!file.mkdirs()) {
+      LOG.error("Error while trying to create a tmp directory for loading a new command set
from [{0}].",
+          fileStatus.getPath());
+      return;
+    }
+    LOG.info("Copying new command with hash [{2}] set from [{0}] into [{1}].", fileStatus.getPath(),
+        file.getAbsolutePath(), hashOfContents.toString(Character.MAX_RADIX));
+    copyLocal(fileSystem, fileStatus.getPath(), file);
+    URLClassLoader loader = new URLClassLoader(getUrls(file).toArray(new URL[] {}));
+    Enumeration<URL> resources = loader.getResources(META_INF_SERVICES_ORG_APACHE_BLUR_COMMAND_COMMANDS);
+    loadCommandClasses(resources, loader);
+  }
+
+  protected List<URL> getUrls(File file) throws MalformedURLException {
+    List<URL> urls = new ArrayList<URL>();
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        urls.addAll(getUrls(f));
+      }
+    } else {
+      URL url = file.toURI().toURL();
+      LOG.info("Adding url [{0}] to be loaded.", url);
+      urls.add(url);
+    }
+    return urls;
+  }
+
+  protected void copyLocal(FileSystem fileSystem, Path path, File destDir) throws IOException
{
+    File file = new File(destDir, path.getName());
+    if (fileSystem.isDirectory(path)) {
+      if (!file.mkdirs()) {
+        LOG.error("Error while trying to create a sub directory [{0}].", file.getAbsolutePath());
+        throw new IOException("Error while trying to create a sub directory [" + file.getAbsolutePath()
+ "].");
+      }
+      FileStatus[] listStatus = fileSystem.listStatus(path);
+      for (FileStatus fileStatus : listStatus) {
+        copyLocal(fileSystem, fileStatus.getPath(), file);
+      }
+    } else {
+      FileOutputStream output = new FileOutputStream(file);
+      FSDataInputStream inputStream = fileSystem.open(path);
+      IOUtils.copy(inputStream, output);
+      inputStream.close();
+      output.close();
+    }
+  }
+
+  protected BigInteger checkContents(FileStatus fileStatus, FileSystem fileSystem) throws
IOException {
+    if (fileStatus.isDirectory()) {
+      BigInteger count = BigInteger.ZERO;
+      Path path = fileStatus.getPath();
+      FileStatus[] listStatus = fileSystem.listStatus(path);
+      for (FileStatus fs : listStatus) {
+        count = count.add(checkContents(fs, fileSystem));
+      }
+      return count;
+    } else {
+      return BigInteger.valueOf(fileStatus.getModificationTime());
+    }
+  }
+
+  protected void lookForCommandsToRegisterInClassPath() throws IOException {
     Enumeration<URL> systemResources = ClassLoader
         .getSystemResources(META_INF_SERVICES_ORG_APACHE_BLUR_COMMAND_COMMANDS);
+    loadCommandClasses(systemResources, getClass().getClassLoader());
+  }
+
+  @SuppressWarnings("unchecked")
+  protected void loadCommandClasses(Enumeration<URL> enumeration, ClassLoader loader)
throws IOException {
     Properties properties = new Properties();
-    while (systemResources.hasMoreElements()) {
-      URL url = systemResources.nextElement();
+    while (enumeration.hasMoreElements()) {
+      URL url = enumeration.nextElement();
       InputStream inputStream = url.openStream();
       properties.load(inputStream);
       inputStream.close();
@@ -77,8 +212,9 @@ public class BaseCommandManager implements Closeable {
     Set<Object> keySet = properties.keySet();
     for (Object o : keySet) {
       String classNameToRegister = o.toString();
+      LOG.info("Loading class [{0}]", classNameToRegister);
       try {
-        register((Class<? extends Command>) Class.forName(classNameToRegister));
+        register((Class<? extends Command>) loader.loadClass(classNameToRegister));
       } catch (ClassNotFoundException e) {
         throw new IOException(e);
       }
@@ -135,6 +271,10 @@ public class BaseCommandManager implements Closeable {
   public void close() throws IOException {
     _executorService.shutdownNow();
     _executorServiceDriver.shutdownNow();
+    if (_timer != null) {
+      _timer.cancel();
+      _timer.purge();
+    }
   }
 
   public void register(Class<? extends Command> commandClass) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/21ff0834/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
b/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
index 4b13e51..cf99473 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
@@ -6,6 +6,7 @@ import java.util.concurrent.Callable;
 
 import org.apache.blur.server.LayoutFactory;
 import org.apache.blur.server.TableContextFactory;
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -26,8 +27,9 @@ import org.apache.blur.server.TableContextFactory;
 @SuppressWarnings("unchecked")
 public class ControllerCommandManager extends BaseCommandManager {
 
-  public ControllerCommandManager(int threadCount, long connectionTimeout) throws IOException
{
-    super(threadCount, connectionTimeout);
+  public ControllerCommandManager(String tmpPath, String commandPath, int workerThreadCount,
int driverThreadCount,
+      long connectionTimeout, Configuration configuration) throws IOException {
+    super(tmpPath, commandPath, workerThreadCount, driverThreadCount, connectionTimeout,
configuration);
   }
 
   public Response execute(TableContextFactory tableContextFactory, LayoutFactory layoutFactory,
String commandName,

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/21ff0834/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java b/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java
index 25a4f10..4298cdf 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ShardCommandManager.java
@@ -40,8 +40,9 @@ public class ShardCommandManager extends BaseCommandManager {
 
   private final IndexServer _indexServer;
 
-  public ShardCommandManager(IndexServer indexServer, int threadCount, long connectionTimeout)
throws IOException {
-    super(threadCount, connectionTimeout);
+  public ShardCommandManager(IndexServer indexServer, String tmpPath, String commandPath,
int workerThreadCount,
+      int driverThreadCount, long connectionTimeout, Configuration configuration) throws
IOException {
+    super(tmpPath, commandPath, workerThreadCount, driverThreadCount, connectionTimeout,
configuration);
     _indexServer = indexServer;
   }
 
@@ -94,7 +95,7 @@ public class ShardCommandManager extends BaseCommandManager {
     for (String table : tables) {
       Set<Shard> shardSet = shardMap.get(table);
       boolean checkShards = !shardSet.isEmpty();
-      
+
       TableContext tableContext = tableContextFactory.getTableContext(table);
       Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
       for (Entry<String, BlurIndex> e : indexes.entrySet()) {
@@ -138,8 +139,6 @@ public class ShardCommandManager extends BaseCommandManager {
     return resultMap;
   }
 
-
-
   private Callable<Object> getCallable(final ShardServerContext shardServerContext,
final TableContext tableContext,
       final Args args, final Shard shard, final BlurIndex blurIndex,
       final IndexReadCombiningCommand<?, ?> readCombiningCommand) {
@@ -231,4 +230,5 @@ public class ShardCommandManager extends BaseCommandManager {
     System.out.println("IMPLEMENT ME!!!!");
   }
 
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/21ff0834/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
index 5be19cd..be391c3 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurControllerServer.java
@@ -16,8 +16,11 @@ package org.apache.blur.thrift;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+import static org.apache.blur.utils.BlurConstants.BLUR_COMMAND_LIB_PATH;
 import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_BIND_ADDRESS;
 import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_BIND_PORT;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_COMMAND_DRIVER_THREADS;
+import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_COMMAND_WORKER_THREADS;
 import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_HOSTNAME;
 import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_REMOTE_FETCH_COUNT;
 import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_RETRY_DEFAULT_DELAY;
@@ -38,8 +41,9 @@ import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_THRIFT_SELECTO
 import static org.apache.blur.utils.BlurConstants.BLUR_GUI_CONTROLLER_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_HTTP_STATUS_RUNNING_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_MAX_RECORDS_PER_ROW_FETCH_REQUEST;
-import static org.apache.blur.utils.BlurConstants.BLUR_THRIFT_MAX_FRAME_SIZE;
 import static org.apache.blur.utils.BlurConstants.BLUR_THRIFT_DEFAULT_MAX_FRAME_SIZE;
+import static org.apache.blur.utils.BlurConstants.BLUR_THRIFT_MAX_FRAME_SIZE;
+import static org.apache.blur.utils.BlurConstants.BLUR_TMP_PATH;
 import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_CONNECTION;
 import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_TIMEOUT;
 import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_TIMEOUT_DEFAULT;
@@ -69,6 +73,7 @@ import org.apache.blur.trace.TraceStorage;
 import org.apache.blur.utils.BlurUtil;
 import org.apache.blur.utils.MemoryReporter;
 import org.apache.blur.zookeeper.ZkUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.zookeeper.ZooKeeper;
 import org.mortbay.jetty.servlet.ServletHolder;
 import org.mortbay.jetty.webapp.WebAppContext;
@@ -128,8 +133,15 @@ public class ThriftBlurControllerServer extends ThriftServer {
     int timeout = configuration.getInt(BLUR_CONTROLLER_SHARD_CONNECTION_TIMEOUT, 60000);
     BlurControllerServer.BlurClient client = new BlurControllerServer.BlurClientRemote(timeout);
 
-    final ControllerCommandManager controllerCommandManager = new ControllerCommandManager(16,
-        Connection.DEFAULT_TIMEOUT);
+    String tmpPath = configuration.get(BLUR_TMP_PATH, getDefaultTmpPath(BLUR_TMP_PATH));
+    int numberOfControllerWorkerCommandThreads = configuration.getInt(BLUR_CONTROLLER_COMMAND_WORKER_THREADS,
16);
+    int numberOfControllerDriverCommandThreads = configuration.getInt(BLUR_CONTROLLER_COMMAND_DRIVER_THREADS,
16);
+    String commandPath = configuration.get(BLUR_COMMAND_LIB_PATH, getCommandLibPath());
+
+    Configuration config = new Configuration();
+    final ControllerCommandManager controllerCommandManager = new ControllerCommandManager(tmpPath,
commandPath,
+        numberOfControllerWorkerCommandThreads, numberOfControllerDriverCommandThreads, Connection.DEFAULT_TIMEOUT,
+        config);
 
     final BlurControllerServer controllerServer = new BlurControllerServer();
     controllerServer.setClient(client);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/21ff0834/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index ad2207a..bf556d7 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -18,6 +18,7 @@ package org.apache.blur.thrift;
  */
 import static org.apache.blur.utils.BlurConstants.BLUR_CLUSTER;
 import static org.apache.blur.utils.BlurConstants.BLUR_CLUSTER_NAME;
+import static org.apache.blur.utils.BlurConstants.BLUR_COMMAND_LIB_PATH;
 import static org.apache.blur.utils.BlurConstants.BLUR_CONTROLLER_REMOTE_FETCH_COUNT;
 import static org.apache.blur.utils.BlurConstants.BLUR_GUI_SHARD_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_HTTP_STATUS_RUNNING_PORT;
@@ -31,6 +32,8 @@ import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_ADDRESS;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCK_CACHE_TOTAL_SIZE;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCK_CACHE_VERSION;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_COMMAND_DRIVER_THREADS;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_COMMAND_WORKER_THREADS;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_DEEP_PAGING_CACHE_SIZE;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_FETCHCOUNT;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_FILTER_CACHE_CLASS;
@@ -47,6 +50,7 @@ import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_WARMUP_DISABLED;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_WARMUP_THREAD_COUNT;
 import static org.apache.blur.utils.BlurConstants.BLUR_THRIFT_DEFAULT_MAX_FRAME_SIZE;
 import static org.apache.blur.utils.BlurConstants.BLUR_THRIFT_MAX_FRAME_SIZE;
+import static org.apache.blur.utils.BlurConstants.BLUR_TMP_PATH;
 import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_CONNECTION;
 import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_TIMEOUT;
 import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_TIMEOUT_DEFAULT;
@@ -229,7 +233,13 @@ public class ThriftBlurShardServer extends ThriftServer {
         fetchCount, indexManagerThreadCount, mutateThreadCount, statusCleanupTimerDelay,
facetThreadCount,
         deepPagingCache);
 
-    final ShardCommandManager commandManager = new ShardCommandManager(indexServer, 16, Connection.DEFAULT_TIMEOUT);
+    String tmpPath = configuration.get(BLUR_TMP_PATH, getDefaultTmpPath(BLUR_TMP_PATH));
+    int numberOfShardWorkerCommandThreads = configuration.getInt(BLUR_SHARD_COMMAND_WORKER_THREADS,
16);
+    int numberOfShardDriverCommandThreads = configuration.getInt(BLUR_SHARD_COMMAND_DRIVER_THREADS,
16);
+    String commandPath = configuration.get(BLUR_COMMAND_LIB_PATH, getCommandLibPath());
+
+    final ShardCommandManager commandManager = new ShardCommandManager(indexServer, tmpPath,
commandPath,
+        numberOfShardWorkerCommandThreads, numberOfShardDriverCommandThreads, Connection.DEFAULT_TIMEOUT,
config);
 
     final BlurShardServer shardServer = new BlurShardServer();
     shardServer.setCommandManager(commandManager);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/21ff0834/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
index 6957e9b..5a5b32e 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftServer.java
@@ -23,9 +23,11 @@ import static org.apache.blur.metrics.MetricsConstants.LOAD_AVERAGE;
 import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
 import static org.apache.blur.metrics.MetricsConstants.SYSTEM;
 import static org.apache.blur.utils.BlurConstants.BLUR_HDFS_TRACE_PATH;
+import static org.apache.blur.utils.BlurConstants.BLUR_HOME;
 import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_TRACE_PATH;
 
 import java.io.BufferedReader;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -99,6 +101,43 @@ public class ThriftServer {
     _serverTransport = serverTransport;
   }
 
+  public static String getCommandLibPath() {
+    String blurHomeDir = getBlurHomeDir();
+    if (blurHomeDir == null) {
+      return null;
+    }
+    return new File(blurHomeDir, "commands").toURI().toString();
+  }
+
+  public static String getBlurHomeDir() {
+    return System.getenv(BLUR_HOME);
+  }
+
+  public static String getDefaultTmpPath(String propName) throws IOException {
+    String blurHomeDir = getBlurHomeDir();
+    File tmp;
+    if (blurHomeDir == null) {
+      tmp = getTmpDir();
+      LOG.info("Attempting to use default tmp directory [{0}]", tmp);
+    } else {
+      tmp = new File(blurHomeDir, "tmp");
+      LOG.info("Attempting to use configured tmp directory [{0}]", tmp);
+      if (!tmp.mkdirs()) {
+        tmp = getTmpDir();
+        LOG.info("Attempting to use default tmp directory [{0}]", tmp);
+      }
+    }
+    if (!tmp.mkdirs()) {
+      throw new IOException("Cannot create tmp directory [" + tmp.toURI()
+          + "], please create directory or configure property [" + propName + "].");
+    }
+    return tmp.toURI().toString();
+  }
+
+  private static File getTmpDir() {
+    return new File(System.getProperty("java.io.tmpdir"));
+  }
+
   public static TraceStorage setupTraceStorage(BlurConfiguration configuration) throws IOException
{
     String zKpath = configuration.get(BLUR_ZOOKEEPER_TRACE_PATH);
     String hdfsPath = configuration.get(BLUR_HDFS_TRACE_PATH);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/21ff0834/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
b/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
index 943b994..ef28841 100644
--- a/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
+++ b/blur-core/src/test/java/org/apache/blur/command/ShardCommandManagerTest.java
@@ -16,7 +16,12 @@
  */
 package org.apache.blur.command;
 
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -34,6 +39,8 @@ import org.apache.blur.thrift.generated.RowMutation;
 import org.apache.blur.thrift.generated.ShardState;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.utils.BlurUtil;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.lucene.analysis.core.KeywordAnalyzer;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexReader;
@@ -59,12 +66,15 @@ public class ShardCommandManagerTest {
       throw new RuntimeException(e);
     }
   }
-
+  private String _tmpPath = "./target/tmp/ShardCommandManagerTest/tmp";
+  private String _commandPath = "./target/tmp/ShardCommandManagerTest/command";
   private ShardCommandManager _manager;
+  private Configuration _config;
 
   @Before
   public void setup() throws IOException {
-    _manager = new ShardCommandManager(getIndexServer(), 10, 1000);
+    _config = new Configuration();
+    _manager = new ShardCommandManager(getIndexServer(), null, null, 10, 10, 1000, _config);
   }
 
   @After
@@ -73,6 +83,69 @@ public class ShardCommandManagerTest {
   }
 
   @Test
+  public void testNewCommandLoading() throws IOException, TimeoutException, InterruptedException
{
+    _manager.close();
+    new File(_tmpPath).mkdirs();
+    File commandPath = new File(_commandPath);
+    rmr(commandPath);
+    commandPath.mkdirs();
+    {
+      InputStream inputStream = getClass().getResourceAsStream("/org/apache/blur/command/test1/test1.jar");
+      File dest = new File(commandPath, "test.jar");
+      FileOutputStream output = new FileOutputStream(dest);
+      IOUtils.copy(inputStream, output);
+      inputStream.close();
+      output.close();
+    }
+    ShardCommandManager manager = new ShardCommandManager(getIndexServer(), _tmpPath, _commandPath,
10, 10, 1000,
+        _config);
+    {
+      Args args = new Args();
+      args.set("table", "test");
+      Response response = manager.execute(getTableContextFactory(), "test", args);
+      Map<Shard, Object> shardResults = response.getShardResults();
+      for (Object o : shardResults.values()) {
+        assertEquals("test1", o);
+      }
+    }
+
+    {
+      InputStream inputStream = getClass().getResourceAsStream("/org/apache/blur/command/test2/test2.jar");
+      File dest = new File(commandPath, "test.jar");
+      FileOutputStream output = new FileOutputStream(dest);
+      IOUtils.copy(inputStream, output);
+      inputStream.close();
+      output.close();
+    }
+    manager.commandRefresh();
+
+    {
+      Args args = new Args();
+      args.set("table", "test");
+      Response response = manager.execute(getTableContextFactory(), "test", args);
+      Map<Shard, Object> shardResults = response.getShardResults();
+      for (Object o : shardResults.values()) {
+        assertEquals("test2", o);
+      }
+    }
+
+    // For closing.
+    _manager = manager;
+  }
+
+  private void rmr(File file) {
+    if (!file.exists()) {
+      return;
+    }
+    if (file.isDirectory()) {
+      for (File f : file.listFiles()) {
+        rmr(f);
+      }
+    }
+    file.delete();
+  }
+
+  @Test
   public void testShardCommandManagerNormalWait() throws IOException, TimeoutException {
     Response response;
     ExecutionId executionId = null;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/21ff0834/blur-core/src/test/resources/org/apache/blur/command/test1/META-INF/services/org.apache.blur.command.Commands
----------------------------------------------------------------------
diff --git a/blur-core/src/test/resources/org/apache/blur/command/test1/META-INF/services/org.apache.blur.command.Commands
b/blur-core/src/test/resources/org/apache/blur/command/test1/META-INF/services/org.apache.blur.command.Commands
new file mode 100644
index 0000000..584f55b
--- /dev/null
+++ b/blur-core/src/test/resources/org/apache/blur/command/test1/META-INF/services/org.apache.blur.command.Commands
@@ -0,0 +1,16 @@
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+TestCommand
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/21ff0834/blur-core/src/test/resources/org/apache/blur/command/test1/TestCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/resources/org/apache/blur/command/test1/TestCommand.java b/blur-core/src/test/resources/org/apache/blur/command/test1/TestCommand.java
new file mode 100644
index 0000000..c639eb5
--- /dev/null
+++ b/blur-core/src/test/resources/org/apache/blur/command/test1/TestCommand.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.blur.command.Command;
+import org.apache.blur.command.IndexContext;
+import org.apache.blur.command.IndexReadCommand;
+
+@SuppressWarnings("serial")
+public class TestCommand extends Command implements IndexReadCommand<String> {
+
+  @Override
+  public String getName() {
+    return "test";
+  }
+
+  @Override
+  public String execute(IndexContext context) throws IOException {
+    return "test1";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/21ff0834/blur-core/src/test/resources/org/apache/blur/command/test1/howto.txt
----------------------------------------------------------------------
diff --git a/blur-core/src/test/resources/org/apache/blur/command/test1/howto.txt b/blur-core/src/test/resources/org/apache/blur/command/test1/howto.txt
new file mode 100644
index 0000000..35aff76
--- /dev/null
+++ b/blur-core/src/test/resources/org/apache/blur/command/test1/howto.txt
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Running this directory.
+javac -cp ../../../../../../../../target/blur-core-*.jar TestCommand.java 
+jar -cf test1.jar META-INF/ TestCommand.class

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/21ff0834/blur-core/src/test/resources/org/apache/blur/command/test1/test1.jar
----------------------------------------------------------------------
diff --git a/blur-core/src/test/resources/org/apache/blur/command/test1/test1.jar b/blur-core/src/test/resources/org/apache/blur/command/test1/test1.jar
new file mode 100644
index 0000000..a06757b
Binary files /dev/null and b/blur-core/src/test/resources/org/apache/blur/command/test1/test1.jar
differ

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/21ff0834/blur-core/src/test/resources/org/apache/blur/command/test2/META-INF/services/org.apache.blur.command.Commands
----------------------------------------------------------------------
diff --git a/blur-core/src/test/resources/org/apache/blur/command/test2/META-INF/services/org.apache.blur.command.Commands
b/blur-core/src/test/resources/org/apache/blur/command/test2/META-INF/services/org.apache.blur.command.Commands
new file mode 100644
index 0000000..584f55b
--- /dev/null
+++ b/blur-core/src/test/resources/org/apache/blur/command/test2/META-INF/services/org.apache.blur.command.Commands
@@ -0,0 +1,16 @@
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+TestCommand
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/21ff0834/blur-core/src/test/resources/org/apache/blur/command/test2/TestCommand.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/resources/org/apache/blur/command/test2/TestCommand.java b/blur-core/src/test/resources/org/apache/blur/command/test2/TestCommand.java
new file mode 100644
index 0000000..6baaf96
--- /dev/null
+++ b/blur-core/src/test/resources/org/apache/blur/command/test2/TestCommand.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.blur.command.Command;
+import org.apache.blur.command.IndexContext;
+import org.apache.blur.command.IndexReadCommand;
+
+@SuppressWarnings("serial")
+public class TestCommand extends Command implements IndexReadCommand<String> {
+
+  @Override
+  public String getName() {
+    return "test";
+  }
+
+  @Override
+  public String execute(IndexContext context) throws IOException {
+    return "test2";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/21ff0834/blur-core/src/test/resources/org/apache/blur/command/test2/test2.jar
----------------------------------------------------------------------
diff --git a/blur-core/src/test/resources/org/apache/blur/command/test2/test2.jar b/blur-core/src/test/resources/org/apache/blur/command/test2/test2.jar
new file mode 100644
index 0000000..9a8e3f4
Binary files /dev/null and b/blur-core/src/test/resources/org/apache/blur/command/test2/test2.jar
differ

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/21ff0834/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
index 66fbe8d..97c5cda 100644
--- a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
+++ b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
@@ -146,7 +146,7 @@ public class BlurConstants {
 
   public static final String BLUR_THRIFT_MAX_FRAME_SIZE = "blur.thrift.max.frame.size";
   public static final int BLUR_THRIFT_DEFAULT_MAX_FRAME_SIZE = 16384000;
-  
+
   public static final String BLUR_SHARD_FILTERED_SERVER_CLASS = "blur.shard.filtered.server.class";
   public static final String BLUR_CONTROLLER_FILTERED_SERVER_CLASS = "blur.controller.filtered.server.class";
 
@@ -158,6 +158,15 @@ public class BlurConstants {
   public static final String BLUR_CLUSTER;
   public static final String BLUR_HTTP_STATUS_RUNNING_PORT = "blur.http.status.running.port";
 
+  public static final String BLUR_SHARD_COMMAND_DRIVER_THREADS = "blur.shard.command.driver.threads";
+  public static final String BLUR_SHARD_COMMAND_WORKER_THREADS = "blur.shard.command.worker.threads";
+  public static final String BLUR_CONTROLLER_COMMAND_DRIVER_THREADS = "blur.controller.command.driver.threads";
+  public static final String BLUR_CONTROLLER_COMMAND_WORKER_THREADS = "blur.controller.command.worker.threads";
+  public static final String BLUR_COMMAND_LIB_PATH = "blur.command.lib.path";
+  public static final String BLUR_TMP_PATH = "blur.tmp.path";
+
+  public static final String BLUR_HOME = "BLUR_HOME";
+
   public static final long ZK_WAIT_TIME = TimeUnit.SECONDS.toMillis(5);
   public static final String DELETE_MARKER_VALUE = "delete";
   public static final String DELETE_MARKER = "_deletemarker_";

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/21ff0834/blur-util/src/main/resources/blur-default.properties
----------------------------------------------------------------------
diff --git a/blur-util/src/main/resources/blur-default.properties b/blur-util/src/main/resources/blur-default.properties
index afaabf5..008649e 100644
--- a/blur-util/src/main/resources/blur-default.properties
+++ b/blur-util/src/main/resources/blur-default.properties
@@ -34,6 +34,12 @@ blur.metrics.reporters=
 # Thrift max frame size
 blur.thrift.max.frame.size=16384000
 
+# The command lib path where the controller and shard server processes will poll for new
commands to enable.
+# blur.command.lib.path=
+
+# The command lib path where the controller and shard server processes will poll for new
commands to enable.
+# blur.tmp.path=
+
 ### Shard Server Configuration
 
 # The hostname for the shard, if blank the hostname is automatically detected
@@ -45,6 +51,12 @@ blur.shard.bind.address=0.0.0.0
 # The default binding port of the shard server, 0 for random
 blur.shard.bind.port=40020
 
+# The number of command driver threads.
+blur.shard.command.driver.threads=16
+
+# The number of command worker threads.
+blur.shard.command.worker.threads=16
+
 # The number of fetcher threads
 blur.shard.data.fetch.thread.count=8
 
@@ -222,6 +234,12 @@ blur.controller.bind.port=40010
 # The connection timeout, NOTE: this will be the maximum amount of time you can wait for
a query.
 blur.controller.shard.connection.timeout=60000
 
+# The number of command driver threads.
+blur.controller.command.driver.threads=16
+
+# The number of command worker threads.
+blur.controller.command.worker.threads=16
+
 # The number of threads used for thrift requests
 blur.controller.server.thrift.thread.count=32
 


Mime
View raw message