fluo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mwa...@apache.org
Subject incubator-fluo git commit: Closes #647 - Add Oracle and Worker to API
Date Tue, 21 Jun 2016 16:16:07 GMT
Repository: incubator-fluo
Updated Branches:
  refs/heads/master cb2091bfb -> 2b2d784fc


Closes #647 - Add Oracle and Worker to API


Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo/commit/2b2d784f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo/tree/2b2d784f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo/diff/2b2d784f

Branch: refs/heads/master
Commit: 2b2d784fc4e5f96af8346d779274e2b1a3246e0c
Parents: cb2091b
Author: Mike Walch <mwalch@gmail.com>
Authored: Tue Jun 21 11:37:21 2016 -0400
Committer: Mike Walch <mwalch@gmail.com>
Committed: Tue Jun 21 11:37:21 2016 -0400

----------------------------------------------------------------------
 README.md                                       |   4 +-
 .../org/apache/fluo/api/client/FluoFactory.java |  18 +++
 .../fluo/api/config/FluoConfiguration.java      |  22 +++
 .../org/apache/fluo/api/service/FluoOracle.java |  23 +++
 .../apache/fluo/api/service/FluoService.java    |  38 +++++
 .../org/apache/fluo/api/service/FluoWorker.java |  23 +++
 .../fluo/api/config/FluoConfigurationTest.java  |   6 +-
 .../fluo/cluster/main/FluoOracleMain.java       | 136 ------------------
 .../fluo/cluster/main/FluoWorkerMain.java       | 140 -------------------
 .../apache/fluo/cluster/main/MainOptions.java   |  61 --------
 .../apache/fluo/cluster/main/MiniFluoMain.java  |  66 ---------
 .../fluo/cluster/runnable/OracleRunnable.java   |  97 +++++++++++++
 .../fluo/cluster/runnable/WorkerRunnable.java   | 111 +++++++++++++++
 .../fluo/cluster/runner/YarnAppRunner.java      |  16 +--
 .../apache/fluo/cluster/util/ClusterUtil.java   |  32 -----
 .../apache/fluo/cluster/yarn/FluoTwillApp.java  |  31 ++--
 .../apache/fluo/core/oracle/FluoOracleImpl.java | 133 ++++++++++++++++++
 .../org/apache/fluo/core/util/CuratorUtil.java  |  38 +++++
 .../apache/fluo/core/worker/FluoWorkerImpl.java | 130 +++++++++++++++++
 .../distribution/src/main/scripts/local-fluo    |  12 +-
 modules/distribution/src/main/scripts/mini-fluo |   4 +-
 .../java/org/apache/fluo/mini/MiniFluoImpl.java |  39 ++++++
 22 files changed, 712 insertions(+), 468 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2b2d784f/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 5455c02..0e89697 100644
--- a/README.md
+++ b/README.md
@@ -59,8 +59,8 @@ Below are helpful resources for Fluo application developers:
 [Metrics]: docs/metrics.md
 [Contributing]: docs/contributing.md
 [Architecture]: docs/architecture.md
-[ti]: https://travis-ci.org/apache/fluo.svg?branch=master
-[tl]: https://travis-ci.org/apache/fluo
+[ti]: https://travis-ci.org/apache/incubator-fluo.svg?branch=master
+[tl]: https://travis-ci.org/apache/incubator-fluo
 [li]: http://img.shields.io/badge/license-ASL-blue.svg
 [ll]: https://github.com/apache/incubator-fluo/blob/master/LICENSE
 [mi]: https://maven-badges.herokuapp.com/maven-central/org.apache.fluo/fluo-api/badge.svg

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2b2d784f/modules/api/src/main/java/org/apache/fluo/api/client/FluoFactory.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/FluoFactory.java b/modules/api/src/main/java/org/apache/fluo/api/client/FluoFactory.java
index fa29955..1b684e8 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/FluoFactory.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/FluoFactory.java
@@ -21,6 +21,8 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.exceptions.FluoException;
 import org.apache.fluo.api.mini.MiniFluo;
+import org.apache.fluo.api.service.FluoOracle;
+import org.apache.fluo.api.service.FluoWorker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,6 +74,22 @@ public class FluoFactory {
     return buildClassWithConfig(config.getMiniClass(), config);
   }
 
+  /**
+   * Creates a {@link FluoOracle} using the provided configuration.
+   */
+  public static FluoOracle newOracle(Configuration configuration) {
+    FluoConfiguration config = new FluoConfiguration(configuration);
+    return buildClassWithConfig(config.getOracleClass(), config);
+  }
+
+  /**
+   * Creates a {@link FluoWorker} using the provided configuration.
+   */
+  public static FluoWorker newWorker(Configuration configuration) {
+    FluoConfiguration config = new FluoConfiguration(configuration);
+    return buildClassWithConfig(config.getWorkerClass(), config);
+  }
+
   @SuppressWarnings("unchecked")
   private static <T> T buildClassWithConfig(String clazz, Configuration config) {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2b2d784f/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
index 2ca45af..6a3b74a 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
@@ -72,6 +72,8 @@ public class FluoConfiguration extends CompositeConfiguration {
 
   // Worker
   private static final String WORKER_PREFIX = FLUO_PREFIX + ".worker";
+  public static final String WORKER_CLASS_PROP = WORKER_PREFIX + ".class";
+  public static final String WORKER_CLASS_DEFAULT = FLUO_PREFIX + ".core.worker.FluoWorkerImpl";
   public static final String WORKER_NUM_THREADS_PROP = WORKER_PREFIX + ".num.threads";
   public static final String WORKER_INSTANCES_PROP = WORKER_PREFIX + ".instances";
   public static final String WORKER_MAX_MEMORY_MB_PROP = WORKER_PREFIX + ".max.memory.mb";
@@ -90,6 +92,8 @@ public class FluoConfiguration extends CompositeConfiguration {
 
   // Oracle
   private static final String ORACLE_PREFIX = FLUO_PREFIX + ".oracle";
+  public static final String ORACLE_CLASS_PROP = ORACLE_PREFIX + ".class";
+  public static final String ORACLE_CLASS_DEFAULT = FLUO_PREFIX + ".core.oracle.FluoOracleImpl";
   public static final String ORACLE_INSTANCES_PROP = ORACLE_PREFIX + ".instances";
   public static final String ORACLE_MAX_MEMORY_MB_PROP = ORACLE_PREFIX + ".max.memory.mb";
   public static final String ORACLE_NUM_CORES_PROP = ORACLE_PREFIX + ".num.cores";
@@ -175,10 +179,12 @@ public class FluoConfiguration extends CompositeConfiguration {
     getLoaderQueueSize();
     getLoaderThreads();
     getObserverConfig();
+    getOracleClass();
     getOracleInstances();
     getOracleMaxMemory();
     getOracleNumCores();
     getTransactionRollbackTime();
+    getWorkerClass();
     getWorkerInstances();
     getWorkerMaxMemory();
     getWorkerNumCores();
@@ -337,6 +343,14 @@ public class FluoConfiguration extends CompositeConfiguration {
     return getNonEmptyString(ADMIN_CLASS_PROP, ADMIN_CLASS_DEFAULT);
   }
 
+  public FluoConfiguration setWorkerClass(String workerClass) {
+    return setNonEmptyString(WORKER_CLASS_PROP, workerClass);
+  }
+
+  public String getWorkerClass() {
+    return getNonEmptyString(WORKER_CLASS_PROP, WORKER_CLASS_DEFAULT);
+  }
+
   public FluoConfiguration setWorkerThreads(int numThreads) {
     return setPositiveInt(WORKER_NUM_THREADS_PROP, numThreads);
   }
@@ -502,6 +516,14 @@ public class FluoConfiguration extends CompositeConfiguration {
     return getNonNegativeInt(LOADER_QUEUE_SIZE_PROP, LOADER_QUEUE_SIZE_DEFAULT);
   }
 
+  public FluoConfiguration setOracleClass(String oracleClass) {
+    return setNonEmptyString(ORACLE_CLASS_PROP, oracleClass);
+  }
+
+  public String getOracleClass() {
+    return getNonEmptyString(ORACLE_CLASS_PROP, ORACLE_CLASS_DEFAULT);
+  }
+
   public FluoConfiguration setOracleMaxMemory(int oracleMaxMemory) {
     return setPositiveInt(ORACLE_MAX_MEMORY_MB_PROP, oracleMaxMemory);
   }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2b2d784f/modules/api/src/main/java/org/apache/fluo/api/service/FluoOracle.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/service/FluoOracle.java b/modules/api/src/main/java/org/apache/fluo/api/service/FluoOracle.java
new file mode 100644
index 0000000..e042151
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/fluo/api/service/FluoOracle.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.api.service;
+
+/**
+ * Fluo Oracle service
+ */
+public interface FluoOracle extends FluoService {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2b2d784f/modules/api/src/main/java/org/apache/fluo/api/service/FluoService.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/service/FluoService.java b/modules/api/src/main/java/org/apache/fluo/api/service/FluoService.java
new file mode 100644
index 0000000..5be879b
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/fluo/api/service/FluoService.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.api.service;
+
+/**
+ * Shared interface for Fluo services
+ */
+public interface FluoService {
+
+  /**
+   * Starts service. Waits until service has started before returning.
+   * 
+   * @throws org.apache.fluo.api.exceptions.FluoException if service fails to start
+   */
+  void start();
+
+  /**
+   * Stops service cleanly. Waits until service has stopped before returning.
+   * 
+   * @throws org.apache.fluo.api.exceptions.FluoException if service has failed or failure occurs
+   *         while stopping.
+   */
+  void stop();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2b2d784f/modules/api/src/main/java/org/apache/fluo/api/service/FluoWorker.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/service/FluoWorker.java b/modules/api/src/main/java/org/apache/fluo/api/service/FluoWorker.java
new file mode 100644
index 0000000..e760c6f
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/fluo/api/service/FluoWorker.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.api.service;
+
+/**
+ * Fluo worker service
+ */
+public interface FluoWorker extends FluoService {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2b2d784f/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
----------------------------------------------------------------------
diff --git a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
index 4c97673..f475311 100644
--- a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
+++ b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
@@ -50,6 +50,7 @@ public class FluoConfigurationTest {
     Assert.assertEquals(FluoConfiguration.ADMIN_CLASS_DEFAULT, base.getAdminClass());
     Assert.assertEquals(FluoConfiguration.ADMIN_ACCUMULO_CLASSPATH_DEFAULT,
         base.getAccumuloClasspath());
+    Assert.assertEquals(FluoConfiguration.WORKER_CLASS_DEFAULT, base.getWorkerClass());
     Assert.assertEquals(FluoConfiguration.WORKER_NUM_THREADS_DEFAULT, base.getWorkerThreads());
     Assert.assertEquals(FluoConfiguration.WORKER_INSTANCES_DEFAULT, base.getWorkerInstances());
     Assert.assertEquals(FluoConfiguration.WORKER_MAX_MEMORY_MB_DEFAULT, base.getWorkerMaxMemory());
@@ -58,6 +59,7 @@ public class FluoConfigurationTest {
         base.getTransactionRollbackTime());
     Assert.assertEquals(FluoConfiguration.LOADER_NUM_THREADS_DEFAULT, base.getLoaderThreads());
     Assert.assertEquals(FluoConfiguration.LOADER_QUEUE_SIZE_DEFAULT, base.getLoaderQueueSize());
+    Assert.assertEquals(FluoConfiguration.ORACLE_CLASS_DEFAULT, base.getOracleClass());
     Assert.assertEquals(FluoConfiguration.ORACLE_INSTANCES_DEFAULT, base.getOracleInstances());
     Assert.assertEquals(FluoConfiguration.ORACLE_MAX_MEMORY_MB_DEFAULT, base.getOracleMaxMemory());
     Assert.assertEquals(FluoConfiguration.ORACLE_NUM_CORES_DEFAULT, base.getOracleNumCores());
@@ -102,8 +104,10 @@ public class FluoConfigurationTest {
     Assert.assertEquals(7, config.setLoaderThreads(7).getLoaderThreads());
     Assert.assertEquals(0, config.setLoaderThreads(0).getLoaderThreads());
     Assert.assertEquals("mini", config.setMiniClass("mini").getMiniClass());
+    Assert.assertEquals("oracle", config.setOracleClass("oracle").getOracleClass());
     Assert.assertEquals(8, config.setOracleMaxMemory(8).getOracleMaxMemory());
     Assert.assertEquals(10, config.setOracleInstances(10).getOracleInstances());
+    Assert.assertEquals("worker", config.setWorkerClass("worker").getWorkerClass());
     Assert.assertEquals(11, config.setWorkerInstances(11).getWorkerInstances());
     Assert.assertEquals(12, config.setWorkerMaxMemory(12).getWorkerMaxMemory());
     Assert.assertEquals(13, config.setWorkerThreads(13).getWorkerThreads());
@@ -376,7 +380,7 @@ public class FluoConfigurationTest {
     String[] nonEmptyMethods =
         {"setAccumuloInstance", "setAccumuloTable", "setAccumuloUser", "setAccumuloZookeepers",
             "setAdminClass", "setClientClass", "setMiniClass", "setMiniDataDir",
-            "setInstanceZookeepers"};
+            "setInstanceZookeepers", "setWorkerClass", "setOracleClass"};
     for (String methodName : nonEmptyMethods) {
       try {
         config.getClass().getMethod(methodName, String.class).invoke(config, "");

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2b2d784f/modules/cluster/src/main/java/org/apache/fluo/cluster/main/FluoOracleMain.java
----------------------------------------------------------------------
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/main/FluoOracleMain.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/main/FluoOracleMain.java
deleted file mode 100644
index 35f7eaf..0000000
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/main/FluoOracleMain.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.cluster.main;
-
-import java.io.File;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.beust.jcommander.JCommander;
-import org.apache.curator.framework.recipes.cache.NodeCache;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.cluster.util.ClusterUtil;
-import org.apache.fluo.cluster.util.LogbackUtil;
-import org.apache.fluo.core.impl.Environment;
-import org.apache.fluo.core.metrics.MetricNames;
-import org.apache.fluo.core.metrics.ReporterUtil;
-import org.apache.fluo.core.oracle.OracleServer;
-import org.apache.fluo.core.util.UtilWaitThread;
-import org.apache.twill.api.AbstractTwillRunnable;
-import org.apache.twill.api.TwillContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.fluo.cluster.main.MainOptions.STDOUT;
-
-/**
- * Main method of Fluo oracle that can be called within a Twill/YARN application or on its own as a
- * Java application
- */
-public class FluoOracleMain extends AbstractTwillRunnable {
-
-  private static final Logger log = LoggerFactory.getLogger(FluoOracleMain.class);
-  public static String ORACLE_NAME = "FluoOracle";
-  private AtomicBoolean shutdown = new AtomicBoolean(false);
-
-  @Override
-  public void run() {
-    System.out.println("Starting Oracle");
-    String logDir = System.getenv("LOG_DIRS");
-    if (logDir == null) {
-      System.err
-          .println("LOG_DIRS env variable was not set by Twill.  Logging to console instead!");
-      logDir = STDOUT;
-    }
-    run(new String[] {"-config-dir", "./conf", "-log-output", logDir});
-  }
-
-  public void run(String[] args) {
-    MainOptions options = new MainOptions();
-    try {
-      JCommander jcommand = new JCommander(options, args);
-
-      if (options.help) {
-        jcommand.usage();
-        System.exit(-1);
-      }
-      options.validateConfig();
-
-      if (!options.getLogOutput().equals(STDOUT)) {
-        LogbackUtil.init("oracle", options.getConfigDir(), options.getLogOutput());
-      }
-    } catch (Exception e) {
-      System.err.println("Exception while starting FluoOracle: " + e.getMessage());
-      e.printStackTrace();
-      System.exit(-1);
-    }
-
-    try {
-      FluoConfiguration config = new FluoConfiguration(new File(options.getFluoProps()));
-      if (!config.hasRequiredOracleProps()) {
-        log.error("fluo.properties is missing required properties for oracle");
-        System.exit(-1);
-      }
-      // any client in oracle should retry forever
-      config.setClientRetryTimeout(-1);
-
-      try {
-        config.validate();
-      } catch (Exception e) {
-        System.err.println("Error - Invalid fluo.properties due to " + e.getMessage());
-        e.printStackTrace();
-        System.exit(-1);
-      }
-
-      TwillContext context = getContext();
-      if (context != null && System.getProperty(MetricNames.METRICS_ID_PROP) == null) {
-        System.setProperty(MetricNames.METRICS_ID_PROP, "oracle-" + context.getInstanceId());
-      }
-
-      try (Environment env = new Environment(config);
-          AutoCloseable reporters = ReporterUtil.setupReporters(env);
-          NodeCache appIdCache = ClusterUtil.startAppIdWatcher(env)) {
-        log.info("Starting Oracle for Fluo '{}' application with the following configuration:",
-            config.getApplicationName());
-        env.getConfiguration().print();
-
-        OracleServer server = new OracleServer(env);
-        server.start();
-
-        while (!shutdown.get()) {
-          UtilWaitThread.sleep(10000);
-        }
-
-        server.stop();
-      }
-
-    } catch (Exception e) {
-      log.error("Exception running FluoOracle: ", e);
-    }
-
-    log.info("FluoOracle is exiting.");
-  }
-
-  @Override
-  public void stop() {
-    log.info("Stopping Fluo oracle");
-    shutdown.set(true);
-  }
-
-  public static void main(String[] args) {
-    FluoOracleMain oracle = new FluoOracleMain();
-    oracle.run(args);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2b2d784f/modules/cluster/src/main/java/org/apache/fluo/cluster/main/FluoWorkerMain.java
----------------------------------------------------------------------
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/main/FluoWorkerMain.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/main/FluoWorkerMain.java
deleted file mode 100644
index cfab15d..0000000
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/main/FluoWorkerMain.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.cluster.main;
-
-import java.io.File;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.beust.jcommander.JCommander;
-import org.apache.curator.framework.recipes.cache.NodeCache;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.cluster.util.ClusterUtil;
-import org.apache.fluo.cluster.util.LogbackUtil;
-import org.apache.fluo.core.impl.Environment;
-import org.apache.fluo.core.metrics.MetricNames;
-import org.apache.fluo.core.metrics.ReporterUtil;
-import org.apache.fluo.core.util.UtilWaitThread;
-import org.apache.fluo.core.worker.NotificationFinder;
-import org.apache.fluo.core.worker.NotificationFinderFactory;
-import org.apache.fluo.core.worker.NotificationProcessor;
-import org.apache.twill.api.AbstractTwillRunnable;
-import org.apache.twill.api.TwillContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.fluo.cluster.main.MainOptions.STDOUT;
-
-/**
- * Main run method of Fluo worker that can be called within a Twill/YARN application or on its own
- * as a Java application
- */
-public class FluoWorkerMain extends AbstractTwillRunnable {
-
-  private static final Logger log = LoggerFactory.getLogger(FluoWorkerMain.class);
-  public static String WORKER_NAME = "FluoWorker";
-  private AtomicBoolean shutdown = new AtomicBoolean(false);
-
-  @Override
-  public void run() {
-    System.out.println("Starting Worker");
-    String logDir = System.getenv("LOG_DIRS");
-    if (logDir == null) {
-      System.err
-          .println("LOG_DIRS env variable was not set by Twill.  Logging to console instead!");
-      logDir = STDOUT;
-    }
-    run(new String[] {"-config-dir", "./conf", "-log-output", logDir});
-  }
-
-  public void run(String[] args) {
-    MainOptions options = new MainOptions();
-    try {
-      JCommander jcommand = new JCommander(options, args);
-
-      if (options.help) {
-        jcommand.usage();
-        System.exit(-1);
-      }
-      options.validateConfig();
-
-      if (!options.getLogOutput().equals(STDOUT)) {
-        LogbackUtil.init("worker", options.getConfigDir(), options.getLogOutput());
-      }
-    } catch (Exception e) {
-      System.err.println("Exception while starting FluoWorker: " + e.getMessage());
-      e.printStackTrace();
-      System.exit(-1);
-    }
-
-    try {
-      FluoConfiguration config = new FluoConfiguration(new File(options.getFluoProps()));
-      if (!config.hasRequiredWorkerProps()) {
-        log.error("fluo.properties is missing required properties for worker");
-        System.exit(-1);
-      }
-      // any client in worker should retry forever
-      config.setClientRetryTimeout(-1);
-
-      try {
-        config.validate();
-      } catch (Exception e) {
-        System.err.println("Error - Invalid fluo.properties due to " + e.getMessage());
-        e.printStackTrace();
-        System.exit(-1);
-      }
-
-      TwillContext context = getContext();
-      if (context != null && System.getProperty(MetricNames.METRICS_ID_PROP) == null) {
-        System.setProperty(MetricNames.METRICS_ID_PROP, "worker-" + context.getInstanceId());
-      }
-
-      try (Environment env = new Environment(config);
-          AutoCloseable reporters = ReporterUtil.setupReporters(env);
-          NodeCache appIdCache = ClusterUtil.startAppIdWatcher(env)) {
-        log.info("Starting Worker for Fluo '{}' application with the following configuration:",
-            config.getApplicationName());
-        env.getConfiguration().print();
-
-        NotificationProcessor np = new NotificationProcessor(env);
-        NotificationFinder notificationFinder =
-            NotificationFinderFactory.newNotificationFinder(env.getConfiguration());
-        notificationFinder.init(env, np);
-        notificationFinder.start();
-
-        while (!shutdown.get()) {
-          UtilWaitThread.sleep(1000);
-        }
-
-        notificationFinder.stop();
-      }
-    } catch (Exception e) {
-      log.error("Exception running FluoWorker: ", e);
-    }
-
-    log.info("Worker is exiting.");
-  }
-
-  @Override
-  public void stop() {
-    log.info("Stopping Fluo worker");
-    shutdown.set(true);
-  }
-
-  public static void main(String[] args) throws Exception {
-    FluoWorkerMain worker = new FluoWorkerMain();
-    worker.run(args);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2b2d784f/modules/cluster/src/main/java/org/apache/fluo/cluster/main/MainOptions.java
----------------------------------------------------------------------
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/main/MainOptions.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/main/MainOptions.java
deleted file mode 100644
index 2af6359..0000000
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/main/MainOptions.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.cluster.main;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import com.beust.jcommander.Parameter;
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.fluo.api.config.FluoConfiguration;
-
-public class MainOptions {
-
-  public static final String STDOUT = "STDOUT";
-
-  @Parameter(names = "-config-dir", description = "Location of Fluo configuration directory")
-  private String configDir;
-
-  @Parameter(names = "-log-output",
-      description = "Location to output logging.  Set to directory or STDOUT (which is default)")
-  private String logOutput = STDOUT;
-
-  @Parameter(names = {"-h", "-help", "--help"}, help = true, description = "Prints help")
-  public boolean help;
-
-  public String getConfigDir() {
-    return configDir;
-  }
-
-  public String getFluoProps() {
-    return configDir + "/fluo.properties";
-  }
-
-  public String getLogOutput() {
-    return logOutput;
-  }
-
-  public void validateConfig() throws IOException {
-    if (getConfigDir() == null) {
-      System.err.println("Please set -config-dir option to directory containing fluo.properties "
-          + "file like below:\n");
-      Properties defaults =
-          ConfigurationConverter.getProperties(FluoConfiguration.getDefaultConfiguration());
-      defaults.store(System.err, "Fluo properties");
-      System.exit(-1);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2b2d784f/modules/cluster/src/main/java/org/apache/fluo/cluster/main/MiniFluoMain.java
----------------------------------------------------------------------
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/main/MiniFluoMain.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/main/MiniFluoMain.java
deleted file mode 100644
index a06174d..0000000
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/main/MiniFluoMain.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.cluster.main;
-
-import java.io.File;
-
-import com.beust.jcommander.JCommander;
-import org.apache.fluo.api.client.FluoFactory;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.mini.MiniFluo;
-import org.apache.fluo.core.util.UtilWaitThread;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Main method for MiniFluo
- */
-public class MiniFluoMain {
-
-  private static final Logger log = LoggerFactory.getLogger(MiniFluoMain.class);
-
-  public static void main(String[] args) {
-
-    try {
-      MainOptions options = new MainOptions();
-      JCommander jcommand = new JCommander(options, args);
-
-      if (options.help) {
-        jcommand.usage();
-        System.exit(-1);
-      }
-      options.validateConfig();
-
-      FluoConfiguration config = new FluoConfiguration(new File(options.getFluoProps()));
-      if (!config.hasRequiredMiniFluoProps()) {
-        log.error("Failed to start MiniFluo - fluo.properties is missing required properties for "
-            + "MiniFluo");
-        System.exit(-1);
-      }
-      try (MiniFluo mini = FluoFactory.newMiniFluo(config)) {
-        log.info("MiniFluo is running");
-
-        while (true) {
-          UtilWaitThread.sleep(1000);
-        }
-      }
-    } catch (Exception e) {
-      log.error("Exception running MiniFluo: ", e);
-    }
-
-    log.info("MiniFluo is exiting.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2b2d784f/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/OracleRunnable.java
----------------------------------------------------------------------
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/OracleRunnable.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/OracleRunnable.java
new file mode 100644
index 0000000..e4f2cea
--- /dev/null
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/OracleRunnable.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.cluster.runnable;
+
+import java.io.File;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.service.FluoOracle;
+import org.apache.fluo.cluster.util.LogbackUtil;
+import org.apache.fluo.core.metrics.MetricNames;
+import org.apache.fluo.core.util.UtilWaitThread;
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.TwillContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Run method of Fluo oracle that is called within a Twill/YARN application
+ */
+public class OracleRunnable extends AbstractTwillRunnable {
+
+  private static final Logger log = LoggerFactory.getLogger(OracleRunnable.class);
+  public static String ORACLE_NAME = "FluoOracle";
+  private AtomicBoolean shutdown = new AtomicBoolean(false);
+  private static final String STDOUT = "STDOUT";
+
+  @Override
+  public void run() {
+    System.out.println("Starting Oracle");
+    String configDir = "./conf";
+    String propsPath = configDir + "/fluo.properties";
+    Objects.requireNonNull(propsPath);
+    File propsFile = new File(propsPath);
+    if (!propsFile.exists()) {
+      System.err.println("ERROR - Fluo properties file does not exist: " + propsPath);
+      System.exit(-1);
+    }
+    String logDir = System.getenv("LOG_DIRS");
+    if (logDir == null) {
+      System.err
+          .println("LOG_DIRS env variable was not set by Twill.  Logging to console instead!");
+      logDir = STDOUT;
+    }
+
+    try {
+      if (!logDir.equals(STDOUT)) {
+        LogbackUtil.init("oracle", configDir, logDir);
+      }
+    } catch (Exception e) {
+      System.err.println("Exception while starting FluoOracle: " + e.getMessage());
+      e.printStackTrace();
+      System.exit(-1);
+    }
+
+    try {
+      FluoConfiguration config = new FluoConfiguration(propsFile);
+
+      TwillContext context = getContext();
+      if (context != null && System.getProperty(MetricNames.METRICS_ID_PROP) == null) {
+        System.setProperty(MetricNames.METRICS_ID_PROP, "oracle-" + context.getInstanceId());
+      }
+
+      FluoOracle oracle = FluoFactory.newOracle(config);
+      oracle.start();
+      while (!shutdown.get()) {
+        UtilWaitThread.sleep(10000);
+      }
+      oracle.stop();
+    } catch (Exception e) {
+      log.error("Exception running FluoOracle: ", e);
+    }
+
+    log.info("FluoOracle is exiting.");
+  }
+
+  @Override
+  public void stop() {
+    log.info("Stopping Fluo oracle");
+    shutdown.set(true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2b2d784f/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/WorkerRunnable.java
----------------------------------------------------------------------
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/WorkerRunnable.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/WorkerRunnable.java
new file mode 100644
index 0000000..f7fdbb9
--- /dev/null
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/WorkerRunnable.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.cluster.runnable;
+
+import java.io.File;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.service.FluoWorker;
+import org.apache.fluo.cluster.util.LogbackUtil;
+import org.apache.fluo.core.metrics.MetricNames;
+import org.apache.fluo.core.util.UtilWaitThread;
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.TwillContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Run method of Fluo worker that is called within a Twill/YARN application
+ */
+public class WorkerRunnable extends AbstractTwillRunnable {
+
+  private static final Logger log = LoggerFactory.getLogger(WorkerRunnable.class);
+  public static String WORKER_NAME = "FluoWorker";
+  private AtomicBoolean shutdown = new AtomicBoolean(false);
+  private static final String STDOUT = "STDOUT";
+
+  @Override
+  public void run() {
+    System.out.println("Starting Worker");
+    String configDir = "./conf";
+    String propsPath = configDir + "/fluo.properties";
+    Objects.requireNonNull(propsPath);
+    File propsFile = new File(propsPath);
+    if (!propsFile.exists()) {
+      System.err.println("ERROR - Fluo properties file does not exist: " + propsPath);
+      System.exit(-1);
+    }
+    String logDir = System.getenv("LOG_DIRS");
+    if (logDir == null) {
+      System.err
+          .println("LOG_DIRS env variable was not set by Twill.  Logging to console instead!");
+      logDir = STDOUT;
+    }
+
+    try {
+      if (!logDir.equals(STDOUT)) {
+        LogbackUtil.init("worker", configDir, logDir);
+      }
+    } catch (Exception e) {
+      System.err.println("Exception while starting FluoWorker: " + e.getMessage());
+      e.printStackTrace();
+      System.exit(-1);
+    }
+
+    try {
+      FluoConfiguration config = new FluoConfiguration(propsFile);
+      if (!config.hasRequiredWorkerProps()) {
+        log.error("fluo.properties is missing required properties for worker");
+        System.exit(-1);
+      }
+      // any client in worker should retry forever
+      config.setClientRetryTimeout(-1);
+
+      try {
+        config.validate();
+      } catch (Exception e) {
+        System.err.println("Error - Invalid fluo.properties due to " + e.getMessage());
+        e.printStackTrace();
+        System.exit(-1);
+      }
+
+      TwillContext context = getContext();
+      if (context != null && System.getProperty(MetricNames.METRICS_ID_PROP) == null) {
+        System.setProperty(MetricNames.METRICS_ID_PROP, "worker-" + context.getInstanceId());
+      }
+
+      FluoWorker worker = FluoFactory.newWorker(config);
+      worker.start();
+      while (!shutdown.get()) {
+        UtilWaitThread.sleep(1000);
+      }
+      worker.stop();
+    } catch (Exception e) {
+      log.error("Exception running FluoWorker: ", e);
+    }
+
+    log.info("Worker is exiting.");
+  }
+
+  @Override
+  public void stop() {
+    log.info("Stopping Fluo worker");
+    shutdown.set(true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2b2d784f/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
----------------------------------------------------------------------
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
index 357d4f3..dd1d068 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
@@ -30,8 +30,8 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.fluo.accumulo.util.ZookeeperPath;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.exceptions.FluoException;
-import org.apache.fluo.cluster.main.FluoOracleMain;
-import org.apache.fluo.cluster.main.FluoWorkerMain;
+import org.apache.fluo.cluster.runnable.OracleRunnable;
+import org.apache.fluo.cluster.runnable.WorkerRunnable;
 import org.apache.fluo.cluster.yarn.FluoTwillApp;
 import org.apache.fluo.cluster.yarn.TwillUtil;
 import org.apache.fluo.core.client.FluoAdminImpl;
@@ -331,16 +331,16 @@ public class YarnAppRunner extends ClusterAppRunner implements AutoCloseable {
   }
 
   private boolean allContainersRunning(TwillController controller, FluoConfiguration config) {
-    return TwillUtil.numRunning(controller, FluoOracleMain.ORACLE_NAME) == config
+    return TwillUtil.numRunning(controller, OracleRunnable.ORACLE_NAME) == config
         .getOracleInstances()
-        && TwillUtil.numRunning(controller, FluoWorkerMain.WORKER_NAME) == config
+        && TwillUtil.numRunning(controller, WorkerRunnable.WORKER_NAME) == config
             .getWorkerInstances();
   }
 
   private String containerStatus(TwillController controller, FluoConfiguration config) {
-    return "" + TwillUtil.numRunning(controller, FluoOracleMain.ORACLE_NAME) + " of "
+    return "" + TwillUtil.numRunning(controller, OracleRunnable.ORACLE_NAME) + " of "
         + config.getOracleInstances() + " Oracle containers and "
-        + TwillUtil.numRunning(controller, FluoWorkerMain.WORKER_NAME) + " of "
+        + TwillUtil.numRunning(controller, WorkerRunnable.WORKER_NAME) + " of "
         + config.getWorkerInstances() + " Worker containers";
   }
 
@@ -371,12 +371,12 @@ public class YarnAppRunner extends ClusterAppRunner implements AutoCloseable {
       if (extraInfo) {
         ResourceReport report = getResourceReport(controller, 30000);
         Collection<TwillRunResources> resources;
-        resources = report.getRunnableResources(FluoOracleMain.ORACLE_NAME);
+        resources = report.getRunnableResources(OracleRunnable.ORACLE_NAME);
         System.out.println("\nThe application has " + resources.size() + " of "
             + config.getOracleInstances() + " desired Oracle containers:\n");
         TwillUtil.printResources(resources);
 
-        resources = report.getRunnableResources(FluoWorkerMain.WORKER_NAME);
+        resources = report.getRunnableResources(WorkerRunnable.WORKER_NAME);
         System.out.println("\nThe application has " + resources.size() + " of "
             + config.getWorkerInstances() + " desired Worker containers:\n");
         TwillUtil.printResources(resources);

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2b2d784f/modules/cluster/src/main/java/org/apache/fluo/cluster/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/ClusterUtil.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/util/ClusterUtil.java
index c1389fb..e182ca9 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/ClusterUtil.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/util/ClusterUtil.java
@@ -52,36 +52,4 @@ public class ClusterUtil {
     }
   }
 
-  /**
-   * Start watching the fluo app uuid. If it changes or goes away then halt the process.
-   */
-  public static NodeCache startAppIdWatcher(Environment env) {
-    try {
-      CuratorFramework curator = env.getSharedResources().getCurator();
-
-      byte[] uuidBytes = curator.getData().forPath(ZookeeperPath.CONFIG_FLUO_APPLICATION_ID);
-      if (uuidBytes == null) {
-        Halt.halt("Fluo Application UUID not found");
-        throw new RuntimeException(); // make findbugs happy
-      }
-
-      final String uuid = new String(uuidBytes, StandardCharsets.UTF_8);
-
-      final NodeCache nodeCache = new NodeCache(curator, ZookeeperPath.CONFIG_FLUO_APPLICATION_ID);
-      nodeCache.getListenable().addListener(new NodeCacheListener() {
-        @Override
-        public void nodeChanged() throws Exception {
-          ChildData node = nodeCache.getCurrentData();
-          if (node == null || !uuid.equals(new String(node.getData(), StandardCharsets.UTF_8))) {
-            Halt.halt("Fluo Application UUID has changed or disappeared");
-          }
-        }
-      });
-      nodeCache.start();
-      return nodeCache;
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2b2d784f/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
----------------------------------------------------------------------
diff --git a/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java b/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
index b678af5..352a1b6 100644
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
+++ b/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
@@ -18,8 +18,8 @@ package org.apache.fluo.cluster.yarn;
 import java.io.File;
 
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.cluster.main.FluoOracleMain;
-import org.apache.fluo.cluster.main.FluoWorkerMain;
+import org.apache.fluo.cluster.runnable.OracleRunnable;
+import org.apache.fluo.cluster.runnable.WorkerRunnable;
 import org.apache.fluo.cluster.runner.YarnAppRunner;
 import org.apache.twill.api.ResourceSpecification;
 import org.apache.twill.api.ResourceSpecification.SizeUnit;
@@ -33,7 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Represents Fluo oracle application in Twill
+ * Represents Fluo application in Twill
  */
 public class FluoTwillApp implements TwillApplication {
 
@@ -50,13 +50,16 @@ public class FluoTwillApp implements TwillApplication {
   private MoreFile addConfigFiles(LocalFileAdder fileAdder) {
     File confDir = new File(fluoConf);
     MoreFile moreFile = null;
-    for (File f : confDir.listFiles()) {
-      if (f.isFile()) {
-        log.trace("Adding config file - " + f.getAbsolutePath());
-        if (moreFile == null) {
-          moreFile = fileAdder.add(String.format("./conf/%s", f.getName()), f);
-        } else {
-          moreFile = moreFile.add(String.format("./conf/%s", f.getName()), f);
+    File[] confFiles = confDir.listFiles();
+    if (confFiles != null) {
+      for (File f : confFiles) {
+        if (f.isFile()) {
+          log.trace("Adding config file - " + f.getAbsolutePath());
+          if (moreFile == null) {
+            moreFile = fileAdder.add(String.format("./conf/%s", f.getName()), f);
+          } else {
+            moreFile = moreFile.add(String.format("./conf/%s", f.getName()), f);
+          }
         }
       }
     }
@@ -88,7 +91,7 @@ public class FluoTwillApp implements TwillApplication {
             .setInstances(config.getOracleInstances()).build();
 
     LocalFileAdder fileAdder =
-        moreRunnable.add(FluoOracleMain.ORACLE_NAME, new FluoOracleMain(), oracleResources)
+        moreRunnable.add(OracleRunnable.ORACLE_NAME, new OracleRunnable(), oracleResources)
             .withLocalFiles();
     RunnableSetter runnableSetter = addConfigFiles(fileAdder).apply();
 
@@ -99,12 +102,12 @@ public class FluoTwillApp implements TwillApplication {
             .setInstances(config.getWorkerInstances()).build();
 
     fileAdder =
-        runnableSetter.add(FluoWorkerMain.WORKER_NAME, new FluoWorkerMain(), workerResources)
+        runnableSetter.add(WorkerRunnable.WORKER_NAME, new WorkerRunnable(), workerResources)
             .withLocalFiles();
     runnableSetter = addConfigFiles(fileAdder).apply();
 
     // Set runnable order, build and return TwillSpecification
-    return runnableSetter.withOrder().begin(FluoOracleMain.ORACLE_NAME)
-        .nextWhenStarted(FluoWorkerMain.WORKER_NAME).build();
+    return runnableSetter.withOrder().begin(OracleRunnable.ORACLE_NAME)
+        .nextWhenStarted(WorkerRunnable.WORKER_NAME).build();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2b2d784f/modules/core/src/main/java/org/apache/fluo/core/oracle/FluoOracleImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/oracle/FluoOracleImpl.java b/modules/core/src/main/java/org/apache/fluo/core/oracle/FluoOracleImpl.java
new file mode 100644
index 0000000..6851e01
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/oracle/FluoOracleImpl.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.oracle;
+
+import java.io.File;
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.exceptions.FluoException;
+import org.apache.fluo.api.service.FluoOracle;
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.metrics.ReporterUtil;
+import org.apache.fluo.core.util.CuratorUtil;
+import org.apache.fluo.core.util.UtilWaitThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FluoOracleImpl implements FluoOracle {
+
+  private static final Logger log = LoggerFactory.getLogger(FluoOracleImpl.class);
+
+  private OracleService oracleService;
+
+  public FluoOracleImpl(FluoConfiguration config) {
+    this.oracleService = new OracleService(config);
+  }
+
+  @Override
+  public void start() {
+    try {
+      oracleService.startAndWait();
+    } catch (UncheckedExecutionException e) {
+      throw new FluoException(e);
+    }
+  }
+
+  @Override
+  public void stop() {
+    try {
+      oracleService.stopAndWait();
+    } catch (UncheckedExecutionException e) {
+      throw new FluoException(e);
+    }
+  }
+
+  private static class OracleService extends AbstractIdleService {
+
+    private static final Logger log = LoggerFactory.getLogger(OracleService.class);
+
+    private FluoConfiguration config;
+    private Environment env;
+    private AutoCloseable reporters;
+    private OracleServer oracleServer;
+    private NodeCache appIdCache;
+
+    OracleService(FluoConfiguration config) {
+      Objects.requireNonNull(config);
+      Preconditions.checkArgument(config.hasRequiredOracleProps());
+      // any client in oracle should retry forever
+      config.setClientRetryTimeout(-1);
+      try {
+        config.validate();
+      } catch (Exception e) {
+        throw new IllegalArgumentException("Invalid FluoConfiguration", e);
+      }
+      this.config = config;
+    }
+
+    @Override
+    protected void startUp() throws Exception {
+      env = new Environment(config);
+      reporters = ReporterUtil.setupReporters(env);
+      appIdCache = CuratorUtil.startAppIdWatcher(env);
+
+      log.info("Starting Oracle for Fluo '{}' application with the following configuration:",
+          config.getApplicationName());
+      env.getConfiguration().print();
+
+      oracleServer = new OracleServer(env);
+      oracleServer.start();
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+      oracleServer.stop();
+      appIdCache.close();
+      reporters.close();
+      env.close();
+    }
+  }
+
+  public static void main(String[] args) {
+    if (args.length != 1) {
+      System.err.println("Usage: FluoOracleImpl <fluoPropsPath>");
+      System.exit(-1);
+    }
+    String propsPath = args[0];
+    Objects.requireNonNull(propsPath);
+    File propsFile = new File(propsPath);
+    if (!propsFile.exists()) {
+      System.err.println("ERROR - Fluo properties file does not exist: " + propsPath);
+      System.exit(-1);
+    }
+    Preconditions.checkArgument(propsFile.exists());
+    try {
+      FluoConfiguration config = new FluoConfiguration(propsFile);
+      FluoOracleImpl oracle = new FluoOracleImpl(config);
+      oracle.start();
+      while (true) {
+        UtilWaitThread.sleep(10000);
+      }
+    } catch (Exception e) {
+      log.error("Exception running FluoOracle: ", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2b2d784f/modules/core/src/main/java/org/apache/fluo/core/util/CuratorUtil.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/CuratorUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/CuratorUtil.java
index 48f60a0..7b649d6 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/CuratorUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/CuratorUtil.java
@@ -15,14 +15,20 @@
 
 package org.apache.fluo.core.util;
 
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
 import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
 import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.fluo.accumulo.util.ZookeeperPath;
 import org.apache.fluo.accumulo.util.ZookeeperUtil;
 import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.core.impl.Environment;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
@@ -132,4 +138,36 @@ public class CuratorUtil {
       throw new IllegalStateException(e);
     }
   }
+
+  /**
+   * Start watching the fluo app uuid. If it changes or goes away then halt the process.
+   */
+  public static NodeCache startAppIdWatcher(Environment env) {
+    try {
+      CuratorFramework curator = env.getSharedResources().getCurator();
+
+      byte[] uuidBytes = curator.getData().forPath(ZookeeperPath.CONFIG_FLUO_APPLICATION_ID);
+      if (uuidBytes == null) {
+        Halt.halt("Fluo Application UUID not found");
+        throw new RuntimeException(); // make findbugs happy
+      }
+
+      final String uuid = new String(uuidBytes, StandardCharsets.UTF_8);
+
+      final NodeCache nodeCache = new NodeCache(curator, ZookeeperPath.CONFIG_FLUO_APPLICATION_ID);
+      nodeCache.getListenable().addListener(new NodeCacheListener() {
+        @Override
+        public void nodeChanged() throws Exception {
+          ChildData node = nodeCache.getCurrentData();
+          if (node == null || !uuid.equals(new String(node.getData(), StandardCharsets.UTF_8))) {
+            Halt.halt("Fluo Application UUID has changed or disappeared");
+          }
+        }
+      });
+      nodeCache.start();
+      return nodeCache;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2b2d784f/modules/core/src/main/java/org/apache/fluo/core/worker/FluoWorkerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/FluoWorkerImpl.java b/modules/core/src/main/java/org/apache/fluo/core/worker/FluoWorkerImpl.java
new file mode 100644
index 0000000..e5b1bf2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/FluoWorkerImpl.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.worker;
+
+import java.io.File;
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.exceptions.FluoException;
+import org.apache.fluo.api.service.FluoWorker;
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.metrics.ReporterUtil;
+import org.apache.fluo.core.util.CuratorUtil;
+import org.apache.fluo.core.util.UtilWaitThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FluoWorkerImpl implements FluoWorker {
+
+  private static final Logger log = LoggerFactory.getLogger(FluoWorkerImpl.class);
+
+  private WorkerService workerService;
+
+  public FluoWorkerImpl(FluoConfiguration config) {
+    this.workerService = new WorkerService(config);
+  }
+
+  @Override
+  public void start() {
+    try {
+      workerService.startAndWait();
+    } catch (UncheckedExecutionException e) {
+      throw new FluoException(e);
+    }
+  }
+
+  @Override
+  public void stop() {
+    try {
+      workerService.stopAndWait();
+    } catch (UncheckedExecutionException e) {
+      throw new FluoException(e);
+    }
+  }
+
+  private static class WorkerService extends AbstractIdleService {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerService.class);
+
+    private FluoConfiguration config;
+    private Environment env;
+    private AutoCloseable reporters;
+    private NotificationProcessor np;
+    private NotificationFinder notificationFinder;
+    private NodeCache appIdCache;
+
+    WorkerService(FluoConfiguration config) {
+      Objects.requireNonNull(config);
+      Preconditions.checkArgument(config.hasRequiredWorkerProps());
+      this.config = config;
+    }
+
+    @Override
+    protected void startUp() throws Exception {
+      env = new Environment(config);
+      reporters = ReporterUtil.setupReporters(env);
+      appIdCache = CuratorUtil.startAppIdWatcher(env);
+
+      log.info("Starting Worker for Fluo '{}' application with the following configuration:",
+          config.getApplicationName());
+      env.getConfiguration().print();
+
+      np = new NotificationProcessor(env);
+      notificationFinder = NotificationFinderFactory.newNotificationFinder(env.getConfiguration());
+      notificationFinder.init(env, np);
+      notificationFinder.start();
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+      notificationFinder.stop();
+      np.close();
+      appIdCache.close();
+      reporters.close();
+      env.close();
+    }
+  }
+
+  public static void main(String[] args) {
+    if (args.length != 1) {
+      System.err.println("Usage: FluoWorkerImpl <fluoPropsPath>");
+      System.exit(-1);
+    }
+    String propsPath = args[0];
+    Objects.requireNonNull(propsPath);
+    File propsFile = new File(propsPath);
+    if (!propsFile.exists()) {
+      System.err.println("ERROR - Fluo properties file does not exist: " + propsPath);
+      System.exit(-1);
+    }
+    Preconditions.checkArgument(propsFile.exists());
+    try {
+      FluoConfiguration config = new FluoConfiguration(propsFile);
+      FluoWorkerImpl worker = new FluoWorkerImpl(config);
+      worker.start();
+      while (true) {
+        UtilWaitThread.sleep(10000);
+      }
+    } catch (Exception e) {
+      log.error("Exception running FluoWorker: ", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2b2d784f/modules/distribution/src/main/scripts/local-fluo
----------------------------------------------------------------------
diff --git a/modules/distribution/src/main/scripts/local-fluo b/modules/distribution/src/main/scripts/local-fluo
index 248ee71..748620d 100755
--- a/modules/distribution/src/main/scripts/local-fluo
+++ b/modules/distribution/src/main/scripts/local-fluo
@@ -66,7 +66,7 @@ function check_dirs {
 }
 
 function kill_if_running {
-  validate_app
+  validate_app_full
   PID=`jps -m | grep $1 | grep $APP | cut -f 1 -d ' '`
   if [ -z "$PID" ]; then
     echo "ERROR - A $1 process for the Fluo application '$APP' is not running"
@@ -77,7 +77,7 @@ function kill_if_running {
 }
 
 LOGHOST=$(hostname)
-LOCAL_OPTS="-config-dir $APP_CONF_DIR -log-output $APP_LOG_DIR"
+LOCAL_OPTS="$APP_CONF_DIR/fluo.properties"
 LOCAL_LIB="$FLUO_LIB_DIR/*:$FLUO_LIB_DIR/logback/*"
 
 case "$1" in
@@ -85,21 +85,21 @@ start-oracle)
   validate_app_full
   check_dirs
   SERVICE="oracle"
-  java -Dfluo.app=$APP -cp "$LOCAL_LIB" org.apache.fluo.cluster.main.FluoOracleMain $LOCAL_OPTS >${APP_LOG_DIR}/${SERVICE}_${LOGHOST}.out 2>${APP_LOG_DIR}/${SERVICE}_${LOGHOST}.err &
+  java -Dfluo.app=$APP -cp "$LOCAL_LIB" org.apache.fluo.core.oracle.FluoOracleImpl $LOCAL_OPTS >${APP_LOG_DIR}/${SERVICE}_${LOGHOST}.out 2>${APP_LOG_DIR}/${SERVICE}_${LOGHOST}.err &
   echo "Started oracle for Fluo '$APP' application.  View its logs at $APP_LOG_DIR"
   ;;
 kill-oracle)
-  kill_if_running FluoOracleMain
+  kill_if_running FluoOracleImpl
   ;;
 start-worker)
   validate_app_full
   check_dirs
   SERVICE="worker"
-  java -Dfluo.app=$APP -cp "$LOCAL_LIB:$APP_LIB_DIR/*" org.apache.fluo.cluster.main.FluoWorkerMain $LOCAL_OPTS >${APP_LOG_DIR}/${SERVICE}_${LOGHOST}.out 2>${APP_LOG_DIR}/${SERVICE}_${LOGHOST}.err &
+  java -Dfluo.app=$APP -cp "$LOCAL_LIB:$APP_LIB_DIR/*" org.apache.fluo.core.worker.FluoWorkerImpl $LOCAL_OPTS >${APP_LOG_DIR}/${SERVICE}_${LOGHOST}.out 2>${APP_LOG_DIR}/${SERVICE}_${LOGHOST}.err &
   echo "Started worker for Fluo '$APP' application.  View its logs at $APP_LOG_DIR"
   ;;
 kill-worker)
-  kill_if_running FluoWorkerMain
+  kill_if_running FluoWorkerImpl
   ;;
 *)
   print_usage

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2b2d784f/modules/distribution/src/main/scripts/mini-fluo
----------------------------------------------------------------------
diff --git a/modules/distribution/src/main/scripts/mini-fluo b/modules/distribution/src/main/scripts/mini-fluo
index 9c1fae1..074b872 100755
--- a/modules/distribution/src/main/scripts/mini-fluo
+++ b/modules/distribution/src/main/scripts/mini-fluo
@@ -97,7 +97,7 @@ function kill_if_running {
 
 FLUO_LOG_HOST=$(hostname)
 FLUO_APP="mini"
-MINI_OPTS="-config-dir $APP_CONF_DIR -log-output $APP_LOG_DIR"
+MINI_OPTS="$APP_CONF_DIR/fluo.properties"
 MINI_OUT=${APP_LOG_DIR}/${FLUO_APP}_${FLUO_LOG_HOST}.out
 MINI_ERR=${APP_LOG_DIR}/${FLUO_APP}_${FLUO_LOG_HOST}.err
 MINI_LIB="$APP_CONF_DIR/*:$FLUO_LIB_DIR/*:$FLUO_LIB_DIR/log4j/*"
@@ -124,7 +124,7 @@ new)
 start)
   validate_app
   check_dirs
-  java $START_OPTS -cp "$MINI_LIB:$FLUO_LIB_DIR/logback/*:$APP_LIB_DIR/*" org.apache.fluo.cluster.main.MiniFluoMain $MINI_OPTS >$MINI_OUT 2>$MINI_ERR &
+  java $START_OPTS -cp "$MINI_LIB:$FLUO_LIB_DIR/logback/*:$APP_LIB_DIR/*" org.apache.fluo.mini.MiniFluoImpl $MINI_OPTS >$MINI_OUT 2>$MINI_ERR &
   echo "Started '$APP' application in MiniFluo.  View logs at $APP_LOG_DIR"
   echo "MiniFluo will create properties file for Fluo clients at $APP_CLIENT_PROPS"
   echo -n "Waiting for MiniFluo to create client.properties..."

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2b2d784f/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java
----------------------------------------------------------------------
diff --git a/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java b/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java
index 44d1283..1c86a7e 100644
--- a/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java
+++ b/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java
@@ -16,10 +16,12 @@
 package org.apache.fluo.mini;
 
 import java.io.File;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.accumulo.core.client.Scanner;
@@ -37,6 +39,7 @@ import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.FluoConfigurationImpl;
 import org.apache.fluo.core.impl.Notification;
 import org.apache.fluo.core.oracle.OracleServer;
+import org.apache.fluo.core.util.UtilWaitThread;
 import org.apache.fluo.core.worker.NotificationFinder;
 import org.apache.fluo.core.worker.NotificationFinderFactory;
 import org.apache.fluo.core.worker.NotificationProcessor;
@@ -190,4 +193,40 @@ public class MiniFluoImpl implements MiniFluo {
       throw new RuntimeException(e);
     }
   }
+
+  public static void main(String[] args) {
+
+    try {
+      if (args.length != 1) {
+        System.err.println("Usage: MiniFluoImpl <fluoPropsPath>");
+        System.exit(-1);
+      }
+      String propsPath = args[0];
+      Objects.requireNonNull(propsPath);
+      File propsFile = new File(propsPath);
+      if (!propsFile.exists()) {
+        System.err.println("ERROR - Fluo properties file does not exist: " + propsPath);
+        System.exit(-1);
+      }
+      Preconditions.checkArgument(propsFile.exists());
+
+      FluoConfiguration config = new FluoConfiguration(propsFile);
+      if (!config.hasRequiredMiniFluoProps()) {
+        log.error("Failed to start MiniFluo - fluo.properties is missing required properties for "
+            + "MiniFluo");
+        System.exit(-1);
+      }
+      try (MiniFluo mini = new MiniFluoImpl(config)) {
+        log.info("MiniFluo is running");
+
+        while (true) {
+          UtilWaitThread.sleep(1000);
+        }
+      }
+    } catch (Exception e) {
+      log.error("Exception running MiniFluo: ", e);
+    }
+
+    log.info("MiniFluo is exiting.");
+  }
 }


Mime
View raw message