fluo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [2/2] incubator-fluo git commit: fixes #816 introduced better way to setup observers and deprecated old
Date Wed, 29 Mar 2017 17:06:24 GMT
fixes #816 introduced better way to setup observers and deprecated old


Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo/commit/d6af386b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo/tree/d6af386b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo/diff/d6af386b

Branch: refs/heads/master
Commit: d6af386bfacd795350f0bdb79da751748c09cfdc
Parents: bf13386
Author: Keith Turner <kturner@apache.org>
Authored: Wed Mar 29 13:05:41 2017 -0400
Committer: Keith Turner <kturner@apache.org>
Committed: Wed Mar 29 13:05:41 2017 -0400

----------------------------------------------------------------------
 docs/applications.md                            |  45 +++-
 .../fluo/accumulo/util/ZookeeperPath.java       |   4 +-
 .../org/apache/fluo/api/client/FluoClient.java  |   4 +
 .../fluo/api/config/FluoConfiguration.java      |  63 ++++-
 .../fluo/api/config/ObserverSpecification.java  |   3 +
 .../java/org/apache/fluo/api/data/Bytes.java    |   2 +-
 .../fluo/api/observer/AbstractObserver.java     |   4 +
 .../org/apache/fluo/api/observer/Observer.java  |  50 +++-
 .../fluo/api/observer/ObserverProvider.java     | 118 ++++++++++
 .../fluo/api/observer/StringObserver.java       |  34 +++
 .../fluo/api/config/FluoConfigurationTest.java  |  23 +-
 modules/core/pom.xml                            |   4 +
 .../apache/fluo/core/client/FluoAdminImpl.java  |  54 +----
 .../org/apache/fluo/core/client/Operations.java | 111 ---------
 .../org/apache/fluo/core/impl/Environment.java  |  62 +----
 .../org/apache/fluo/core/impl/LockResolver.java |   4 +-
 .../apache/fluo/core/impl/TransactionImpl.java  |  14 +-
 .../fluo/core/observer/ObserverStore.java       |  32 +++
 .../apache/fluo/core/observer/ObserverUtil.java | 105 +++++++++
 .../apache/fluo/core/observer/Observers.java    |  28 +++
 .../fluo/core/observer/RegisteredObservers.java |  28 +++
 .../fluo/core/observer/v1/ObserverContext.java  |  63 +++++
 .../fluo/core/observer/v1/ObserverStoreV1.java  | 233 +++++++++++++++++++
 .../fluo/core/observer/v1/ObserversV1.java      | 130 +++++++++++
 .../core/observer/v2/JsonObservedColumn.java    |  45 ++++
 .../fluo/core/observer/v2/JsonObservers.java    |  55 +++++
 .../v2/ObserverProviderContextImpl.java         |  55 +++++
 .../fluo/core/observer/v2/ObserverStoreV2.java  | 151 ++++++++++++
 .../fluo/core/observer/v2/ObserversV2.java      | 112 +++++++++
 .../fluo/core/worker/NotificationProcessor.java |   3 +-
 .../fluo/core/worker/ObserverContext.java       |  62 -----
 .../org/apache/fluo/core/worker/Observers.java  | 122 ----------
 .../apache/fluo/core/worker/WorkTaskAsync.java  |   1 +
 .../src/main/config/fluo.properties             |   7 +-
 .../org/apache/fluo/integration/ITBase.java     |  15 +-
 .../org/apache/fluo/integration/ITBaseImpl.java |   2 +-
 .../org/apache/fluo/integration/ITBaseMini.java |   2 +-
 .../fluo/integration/impl/AppConfigIT.java      |  46 ++--
 .../fluo/integration/impl/CollisionIT.java      |  35 ++-
 .../apache/fluo/integration/impl/FailureIT.java |  22 +-
 .../apache/fluo/integration/impl/FluoIT.java    |  26 +--
 .../fluo/integration/impl/NotificationGcIT.java |  10 +-
 .../fluo/integration/impl/ObserverConfigIT.java |   6 +-
 .../integration/impl/SelfNotificationIT.java    |  25 +-
 .../integration/impl/StrongNotificationIT.java  |  31 +--
 .../integration/impl/WeakNotificationIT.java    |  22 +-
 .../impl/WeakNotificationOverlapIT.java         |  54 ++---
 .../apache/fluo/integration/impl/WorkerIT.java  |  48 ++--
 .../org/apache/fluo/integration/log/LogIT.java  |  35 ++-
 pom.xml                                         |   5 +
 50 files changed, 1575 insertions(+), 640 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/docs/applications.md
----------------------------------------------------------------------
diff --git a/docs/applications.md b/docs/applications.md
index 5685381..cbf12ba 100644
--- a/docs/applications.md
+++ b/docs/applications.md
@@ -100,12 +100,12 @@ public class AppCommand {
 
 To create an observer, follow these steps:
 
-1. Create a class that extends [AbstractObserver] like the example below. Please use [slf4j] for
+1. Create one or more classes that extend [Observer] like the example below. Please use [slf4j] for
    any logging in observers as [slf4j] supports multiple logging implementations. This is
    necessary as Fluo applications have a hard requirement on [logback] when running in YARN.
 
     ```java
-    public class InvertObserver extends AbstractObserver {
+    public class InvertObserver implements Observer {
 
       @Override
       public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
@@ -114,25 +114,45 @@ To create an observer, follow these steps:
         // invert row and value
         tx.set(value, new Column("inv", "data"), row);
       }
+    }
+    ```
+2.  Create a class that implements [ObserverProvider] like the example below.  The purpose of this
+    class is associate a set Observers with columns that trigger the observers.  The class can
+    create multiple observers.
 
+    ```java
+    class AppObserverProvider implements ObserverProvider {
       @Override
-      public ObservedColumn getObservedColumn() {
-        return new ObservedColumn(new Column("obs", "data"), NotificationType.STRONG);
+      public void provide(Registry or, Context ctx) {
+        //setup InvertObserver to be triggered when the column obs:data is modified
+        or.register(new Column("obs", "data"),
+                           NotificationType.STRONG,
+                           new InvertObserver());
+        
+        //Observer is a Functional interface.  So Obsevers can be written as lambdas.
+        or.register(new Column("new","data"),
+                           NotificationType.WEAK,
+                           (tx,row,col) -> { 
+                             Bytes combined = combineNewAndOld(tx,row);
+                             tx.set(row, new Column("current","data"), combined);
+                           });
       }
     }
     ```
-2.  Build a jar containing this class and include this jar in the `lib/` directory of your Fluo
+
+3.  Build a jar containing thses classes and include this jar in the `lib/` directory of your Fluo
     application.
-3.  Configure your Fluo instance to use this observer by modifying the Observer section of
+4.  Configure your Fluo instance to use this observer provider by modifying the Observer section of
     [fluo.properties].
-4.  Restart your Fluo instance so that your Fluo workers load the new observer.
+5.  Initialize Fluo.  During initialization Fluo will obtain the observed columns from the 
+    ObserverProvider and persist the columns in Zookeeper.  These columns persisted in Zookeeper
+    are used by transactions to know when to trigger observers.
+6.  Start your Fluo instance so that your Fluo workers load the new observer.
 
 ## Application Configuration
 
-Each observer can have its own configuration. This is useful for the case of using the same
-observer code w/ different parameters. However for the case of sharing the same configuration
-across observers, fluo provides a simple mechanism to set and access application specific
-configuration. See the javadoc on [FluoClient].getAppConfiguration() for more details.
+For configuring observers, fluo provides a simple mechanism to set and access application specific
+configuration.  See the javadoc on [FluoClient].getAppConfiguration() for more details.
 
 ## Debugging Applications
 
@@ -195,7 +215,8 @@ where D is a hex digit. Also the `\` character is escaped to make the output una
 [FluoFactory]: ../modules/api/src/main/java/org/apache/fluo/api/client/FluoFactory.java
 [FluoClient]: ../modules/api/src/main/java/org/apache/fluo/api/client/FluoClient.java
 [FluoConfiguration]: ../modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
-[AbstractObserver]: ../modules/api/src/main/java/org/apache/fluo/api/observer/AbstractObserver.java
+[Observer]: ../modules/api/src/main/java/org/apache/fluo/api/observer/Observer.java
+[ObserverProvider]: ../modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java
 [fluo.properties]: ../modules/distribution/src/main/config/fluo.properties
 [API]: https://fluo.apache.org/apidocs/
 [metrics]: metrics.md

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ZookeeperPath.java
----------------------------------------------------------------------
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ZookeeperPath.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ZookeeperPath.java
index aa6039e..8c7f1d2 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ZookeeperPath.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/ZookeeperPath.java
@@ -25,7 +25,9 @@ public class ZookeeperPath {
   public static final String CONFIG_ACCUMULO_INSTANCE_NAME = CONFIG + "/accumulo.instance.name";
   public static final String CONFIG_ACCUMULO_INSTANCE_ID = CONFIG + "/accumulo.instance.id";
   public static final String CONFIG_FLUO_APPLICATION_ID = CONFIG + "/fluo.application.id";
-  public static final String CONFIG_FLUO_OBSERVERS = CONFIG + "/fluo.observers";
+  @Deprecated
+  public static final String CONFIG_FLUO_OBSERVERS1 = CONFIG + "/fluo.observers";
+  public static final String CONFIG_FLUO_OBSERVERS2 = CONFIG + "/fluo.observers2";
   public static final String CONFIG_SHARED = CONFIG + "/shared.config";
 
   public static final String ORACLE = "/oracle";

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/api/src/main/java/org/apache/fluo/api/client/FluoClient.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/FluoClient.java b/modules/api/src/main/java/org/apache/fluo/api/client/FluoClient.java
index 140351e..fcbe4db 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/FluoClient.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/FluoClient.java
@@ -15,8 +15,10 @@
 
 package org.apache.fluo.api.client;
 
+import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.SimpleConfiguration;
 import org.apache.fluo.api.metrics.MetricsReporter;
+import org.apache.fluo.api.observer.ObserverProvider.Context;
 
 /**
  * Client interface for Fluo. Fluo clients will have shared resources used by all objects created by
@@ -63,6 +65,8 @@ public interface FluoClient extends AutoCloseable {
    *         keeping config files consistent across a cluster. To update this configuration, use
    *         {@link FluoAdmin#updateSharedConfig()}. Changes made to the returned Configuration will
    *         not update Zookeeper.
+   * @see FluoConfiguration#getAppConfiguration()
+   * @see Context#getAppConfiguration()
    */
   SimpleConfiguration getAppConfiguration();
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
index 53a4819..c759dbb 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
@@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Preconditions;
 import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.observer.ObserverProvider;
+import org.apache.fluo.api.observer.ObserverProvider.Context;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,8 +88,19 @@ public class FluoConfiguration extends SimpleConfiguration {
 
   /** The properties below get loaded into/from Zookeeper */
   // Observer
+  @Deprecated
   public static final String OBSERVER_PREFIX = FLUO_PREFIX + ".observer.";
 
+  /**
+   * @since 1.1.0
+   */
+  public static final String OBSERVER_PROVIDER = FLUO_PREFIX + ".observer.provider";
+
+  /**
+   * @since 1.1.0
+   */
+  public static final String OBSERVER_PROVIDER_DEFAULT = "";
+
   // Transaction
   public static final String TRANSACTION_PREFIX = FLUO_PREFIX + ".tx";
   public static final String TRANSACTION_ROLLBACK_TIME_PROP = TRANSACTION_PREFIX + ".rollback.time";
@@ -281,6 +294,11 @@ public class FluoConfiguration extends SimpleConfiguration {
     return getPositiveInt(WORKER_NUM_THREADS_PROP, WORKER_NUM_THREADS_DEFAULT);
   }
 
+  /**
+   * @deprecated since 1.1.0. Replaced by {@link #setObserverProvider(String)} and
+   *             {@link #getObserverProvider()}
+   */
+  @Deprecated
   public List<ObserverSpecification> getObserverSpecifications() {
 
     List<ObserverSpecification> configList = new ArrayList<>();
@@ -344,6 +362,37 @@ public class FluoConfiguration extends SimpleConfiguration {
     return max + 1;
   }
 
+  /**
+   * Configure the observer provider that Fluo workers will use.
+   *
+   * @since 1.1.0
+   *
+   * @param className Name of a class that implements {@link ObserverProvider}. Must be non-null and
+   *        non-empty.
+   */
+  public void setObserverProvider(String className) {
+    setNonEmptyString(OBSERVER_PROVIDER, className);
+  }
+
+  /**
+   * Calls {@link #setObserverProvider(String)} with the class name.
+   *
+   * @since 1.1.0
+   */
+  public void setObserverProvider(Class<? extends ObserverProvider> clazz) {
+    setObserverProvider(clazz.getName());
+  }
+
+  /**
+   * @return The configured {@link ObserverProvider} class name. If one was not configured, returns
+   *         {@value #OBSERVER_PROVIDER_DEFAULT}
+   * @since 1.1.0
+   */
+  public String getObserverProvider() {
+    return getString(OBSERVER_PROVIDER, OBSERVER_PROVIDER_DEFAULT);
+  }
+
+  @Deprecated
   private void addObserver(ObserverSpecification oconf, int next) {
     Map<String, String> params = oconf.getConfiguration().toMap();
     StringBuilder paramString = new StringBuilder();
@@ -359,7 +408,11 @@ public class FluoConfiguration extends SimpleConfiguration {
   /**
    * Adds an {@link ObserverSpecification} to the configuration using a unique integer prefix thats
    * not currently in use.
+   *
+   * @deprecated since 1.1.0. Replaced by {@link #setObserverProvider(String)} and
+   *             {@link #getObserverProvider()}
    */
+  @Deprecated
   public FluoConfiguration addObserver(ObserverSpecification oconf) {
     int next = getNextObserverId();
     addObserver(oconf, next);
@@ -368,7 +421,11 @@ public class FluoConfiguration extends SimpleConfiguration {
 
   /**
    * Adds multiple observers using unique integer prefixes for each.
+   *
+   * @deprecated since 1.1.0. Replaced by {@link #setObserverProvider(String)} and
+   *             {@link #getObserverProvider()}
    */
+  @Deprecated
   public FluoConfiguration addObservers(Iterable<ObserverSpecification> observers) {
     int next = getNextObserverId();
     for (ObserverSpecification oconf : observers) {
@@ -379,7 +436,11 @@ public class FluoConfiguration extends SimpleConfiguration {
 
   /**
    * Removes any configured observers.
+   *
+   * @deprecated since 1.1.0. Replaced by {@link #setObserverProvider(String)} and
+   *             {@link #getObserverProvider()}
    */
+  @Deprecated
   public FluoConfiguration clearObservers() {
     Iterator<String> iter1 = getKeys(OBSERVER_PREFIX.substring(0, OBSERVER_PREFIX.length() - 1));
     while (iter1.hasNext()) {
@@ -429,7 +490,7 @@ public class FluoConfiguration extends SimpleConfiguration {
    *         to subset will be reflected in this configuration, but with the prefix added. This
    *         method is useful for setting application configuration before initialization. For
    *         reading application configuration after initialization, see
-   *         {@link FluoClient#getAppConfiguration()}
+   *         {@link FluoClient#getAppConfiguration()} and {@link Context#getAppConfiguration()}
    */
   public SimpleConfiguration getAppConfiguration() {
     return subset(APP_PREFIX);

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/api/src/main/java/org/apache/fluo/api/config/ObserverSpecification.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/config/ObserverSpecification.java b/modules/api/src/main/java/org/apache/fluo/api/config/ObserverSpecification.java
index 76829c9..9ebb47b 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/config/ObserverSpecification.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/config/ObserverSpecification.java
@@ -26,7 +26,10 @@ import org.apache.fluo.api.observer.Observer;
  * {@link FluoConfiguration#addObserver(ObserverSpecification)}.
  *
  * @since 1.0.0
+ * @deprecated since 1.1.0. The methods that used this class in {@link FluoConfiguration} were
+ *             deprecated.
  */
+@Deprecated
 public class ObserverSpecification {
   private final String className;
   private final Map<String, String> configMap;

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java b/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
index 077eac7..59c4321 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
@@ -58,7 +58,7 @@ public final class Bytes implements Comparable<Bytes>, Serializable {
   private final int offset;
   private final int length;
 
-  private WeakReference<String> utf8String;
+  private transient WeakReference<String> utf8String;
 
   public static final Bytes EMPTY = new Bytes(new byte[0]);
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/api/src/main/java/org/apache/fluo/api/observer/AbstractObserver.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/observer/AbstractObserver.java b/modules/api/src/main/java/org/apache/fluo/api/observer/AbstractObserver.java
index 24505a5..6c191f9 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/observer/AbstractObserver.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/observer/AbstractObserver.java
@@ -26,7 +26,11 @@ import org.apache.fluo.api.client.TransactionBase;
  * user.
  *
  * @since 1.0.0
+ * @deprecated since 1.1.0. This class was deprecated for two reasons. First the methods its
+ *             overrides were deprecated. Second, the methods it overrides were made into Java 8
+ *             default methods.
  */
+@Deprecated
 public abstract class AbstractObserver implements Observer {
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/api/src/main/java/org/apache/fluo/api/observer/Observer.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/observer/Observer.java b/modules/api/src/main/java/org/apache/fluo/api/observer/Observer.java
index 381784a..21f4a2a 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/observer/Observer.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/observer/Observer.java
@@ -17,6 +17,7 @@ package org.apache.fluo.api.observer;
 
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.SimpleConfiguration;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
@@ -24,13 +25,15 @@ import org.apache.fluo.api.metrics.MetricsReporter;
 
 /**
  * Implemented by users to a watch a {@link Column} and be notified of changes to the Column via the
- * {@link #process(TransactionBase, Bytes, Column)} method. An observer is created for each worker
- * thread and reused for the lifetime of a worker thread. Consider extending
- * {@link AbstractObserver} as it will let you optionally implement {@link #init(Context)} and
- * {@link #close()}. The abstract class will also shield you from the addition of interface methods.
+ * {@link #process(TransactionBase, Bytes, Column)} method.
+ * 
+ * <p>
+ * In Fluo version 1.1.0 this was converted to a functional interface. This change along with the
+ * introduction of {@link ObserverProvider} allows Observers to be written as lambdas.
  *
  * @since 1.0.0
  */
+@FunctionalInterface
 public interface Observer {
 
   /**
@@ -44,7 +47,9 @@ public interface Observer {
    * A {@link Column} and {@link NotificationType} pair
    *
    * @since 1.0.0
+   * @deprecated since 1.1.0. The method that used this class was deprecated.
    */
+  @Deprecated
   class ObservedColumn {
     private final Column col;
     private final NotificationType notificationType;
@@ -61,11 +66,22 @@ public interface Observer {
     public NotificationType getType() {
       return notificationType;
     }
+
+    /**
+     * @since 1.1.0
+     */
+    @Override
+    public String toString() {
+      return col + " " + notificationType;
+    }
   }
 
   /**
    * @since 1.0.0
+   *
+   * @deprecated since 1.1.0. The method that used this interface was deprecated.
    */
+  @Deprecated
   interface Context {
     /**
      * @return A configuration object with application configuration like that returned by
@@ -88,8 +104,14 @@ public interface Observer {
    * Implemented by user to initialize Observer.
    *
    * @param context Observer context
+   *
+   * @deprecated since 1.1.0. Fluo will no longer call this method when observers are configured by
+   *             {@link FluoConfiguration#setObserverProvider(String)}. Its only called when
+   *             observers are configured the old way by
+   *             {@link FluoConfiguration#addObserver(org.apache.fluo.api.config.ObserverSpecification)}
    */
-  void init(Context context) throws Exception;
+  @Deprecated
+  default void init(Context context) throws Exception {}
 
   /**
    * Implemented by users to process notifications on a {@link ObservedColumn}. If a notification
@@ -107,11 +129,25 @@ public interface Observer {
    * then an exception will be thrown. It is safe to assume that {@link #init(Context)} will be
    * called before this method. If the return value of the method is derived from what is passed to
    * {@link #init(Context)}, then the derivation process should be deterministic.
+   *
+   * @deprecated since 1.1.0 Fluo will no longer call this method when observers are configured by
+   *             {@link FluoConfiguration#setObserverProvider(String)}. Its only called when
+   *             observers are configured the old way by
+   *             {@link FluoConfiguration#addObserver(org.apache.fluo.api.config.ObserverSpecification)}
    */
-  ObservedColumn getObservedColumn();
+  @Deprecated
+  default ObservedColumn getObservedColumn() {
+    throw new UnsupportedOperationException();
+  }
 
   /**
    * Implemented by user to close resources used by Observer
+   *
+   * @deprecated since 1.1.0. Fluo will no longer call this method when observers are configured by
+   *             {@link FluoConfiguration#setObserverProvider(String)}. Its only called when
+   *             observers are configured the old way by
+   *             {@link FluoConfiguration#addObserver(org.apache.fluo.api.config.ObserverSpecification)}
    */
-  void close();
+  @Deprecated
+  default void close() {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java b/modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java
new file mode 100644
index 0000000..c172268
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.api.observer;
+
+import java.util.function.BiConsumer;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.metrics.MetricsReporter;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+
+/**
+ * Fluo Workers use this class to register {@link Observer}s to process notifications.
+ * Implementations of this class should register zero or more {@link Observer}s.
+ *
+ * <p>
+ * When Fluo is initialized {@link #provideColumns(BiConsumer, Context)} is called. The columns it
+ * registers are stored in Zookeeper. Transactions will use the columns stored in Zookeeper to
+ * determine when to set notifications. When Workers call {@link #provide(Registry, Context)}, the
+ * columns registered must be the same as those registered during initialization. If this is not the
+ * case, then the worker will fail to start.
+ *
+ * @see FluoConfiguration#setObserverProvider(String)
+ * @since 1.1.0
+ */
+public interface ObserverProvider {
+
+  /**
+   * @since 1.1.0
+   */
+  interface Context {
+    /**
+     * @return A configuration object with application configuration like that returned by
+     *         {@link FluoClient#getAppConfiguration()}
+     */
+    SimpleConfiguration getAppConfiguration();
+
+    /**
+     * @return A {@link MetricsReporter} to report application metrics from observers.
+     */
+    MetricsReporter getMetricsReporter();
+  }
+
+  /**
+   * Observers are registered with the worker using this interface. Registering an {@link Observer}s
+   * relates it to the columns that trigger it.
+   *
+   * @since 1.1.0
+   */
+  interface Registry {
+    void register(Column observedColumn, NotificationType ntfyType, Observer observer);
+
+    /**
+     * This method was created to allow Observers written as lambda to be passed {@link String}
+     * instead of {@link Bytes} for the row.
+     * 
+     * <pre>
+     * <code>
+     *   void provide(ObserverRegistry or, Context ctx) {
+     *     or.registers(someColumn, WEAK, (tx,row,col) -> {
+     *      //row is of type String
+     *     };
+     *   }
+     * </code>
+     * </pre>
+     */
+    void registers(Column observedColumn, NotificationType ntfyType, StringObserver observer);
+  }
+
+  /**
+   * This is method is called by Fluo Workers to register observers to process notifications.
+   *
+   * <p>
+   * Observers registered may be called concurrently by multiple threads to process different
+   * notifications. Observers should be tolerant of this.
+   *
+   * @param or Register observers with this.
+   */
+  void provide(Registry or, Context ctx);
+
+  /**
+   * Called during Fluo initialization to determine what columns are being observed. The default
+   * implementation of this method calls {@link #provide(Registry, Context)} and ignores Observers.
+   *
+   * @param colRegistry register all observed columns with this consumer
+   */
+  default void provideColumns(BiConsumer<Column, NotificationType> colRegistry, Context ctx) {
+    Registry or = new Registry() {
+      @Override
+      public void registers(Column oc, NotificationType nt, StringObserver obs) {
+        colRegistry.accept(oc, nt);
+      }
+
+      @Override
+      public void register(Column oc, NotificationType nt, Observer obs) {
+        colRegistry.accept(oc, nt);
+      }
+    };
+
+    provide(or, ctx);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/api/src/main/java/org/apache/fluo/api/observer/StringObserver.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/observer/StringObserver.java b/modules/api/src/main/java/org/apache/fluo/api/observer/StringObserver.java
new file mode 100644
index 0000000..f57a98e
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/fluo/api/observer/StringObserver.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.api.observer;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+
+/**
+ * @since 1.1.0
+ */
+@FunctionalInterface
+public interface StringObserver extends Observer {
+
+  @Override
+  default void process(TransactionBase tx, Bytes row, Column col) throws Exception {
+    process(tx, row.toString(), col);
+  }
+
+  abstract void process(TransactionBase tx, String row, Column col) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
----------------------------------------------------------------------
diff --git a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
index 38705db..3e36ba1 100644
--- a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
+++ b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
@@ -209,6 +209,7 @@ public class FluoConfigurationTest {
     Assert.assertEquals("33", config.getReporterConfiguration("jmx").getString("frequency"));
   }
 
+  @SuppressWarnings("deprecation")
   private void assertIAE(String value) {
     FluoConfiguration config = new FluoConfiguration();
     try {
@@ -219,6 +220,7 @@ public class FluoConfigurationTest {
     }
   }
 
+  @SuppressWarnings("deprecation")
   @Test
   public void testObserverConfig() {
     FluoConfiguration config = new FluoConfiguration();
@@ -247,6 +249,7 @@ public class FluoConfigurationTest {
     Assert.assertEquals(0, ocList.get(0).getConfiguration().toMap().size());
   }
 
+  @SuppressWarnings("deprecation")
   @Test
   public void testObserverConfig2() {
     FluoConfiguration config = new FluoConfiguration();
@@ -374,7 +377,7 @@ public class FluoConfigurationTest {
     c1.setAccumuloZookeepers("localhost:7171");
     c1.setInstanceZookeepers("localhost:7171/testS");
     c1.setWorkerThreads(100);
-    c1.addObserver(new ObserverSpecification("com.foo.Observer1"));
+    c1.setObserverProvider("com.foo.MyObserverProvider");
 
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     ObjectOutputStream oo = new ObjectOutputStream(baos);
@@ -395,4 +398,22 @@ public class FluoConfigurationTest {
 
     in.close();
   }
+
+  @Test(expected = NullPointerException.class)
+  public void testNullObserverProvider() {
+    FluoConfiguration fc = new FluoConfiguration();
+    fc.setObserverProvider((String) null);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testEmptyObserverProvider() {
+    FluoConfiguration fc = new FluoConfiguration();
+    fc.setObserverProvider("");
+  }
+
+  @Test
+  public void testNoObserverProvider() {
+    FluoConfiguration fc = new FluoConfiguration();
+    Assert.assertEquals("", fc.getObserverProvider());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/core/pom.xml b/modules/core/pom.xml
index de20ac5..d6784a4 100644
--- a/modules/core/pom.xml
+++ b/modules/core/pom.xml
@@ -26,6 +26,10 @@
   <description>The modules contains the core implementation code of Apache Fluo.</description>
   <dependencies>
     <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
index 8d059a5..62914e0 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
@@ -15,6 +15,7 @@
 
 package org.apache.fluo.core.client;
 
+import java.io.ByteArrayOutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -38,17 +39,11 @@ import org.apache.fluo.accumulo.util.ZookeeperPath;
 import org.apache.fluo.accumulo.util.ZookeeperUtil;
 import org.apache.fluo.api.client.FluoAdmin;
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.fluo.api.config.SimpleConfiguration;
-import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.exceptions.FluoException;
-import org.apache.fluo.api.observer.Observer;
-import org.apache.fluo.api.observer.Observer.NotificationType;
-import org.apache.fluo.api.observer.Observer.ObservedColumn;
+import org.apache.fluo.core.observer.ObserverUtil;
 import org.apache.fluo.core.util.AccumuloUtil;
 import org.apache.fluo.core.util.ByteUtil;
 import org.apache.fluo.core.util.CuratorUtil;
-import org.apache.fluo.core.worker.ObserverContext;
 import org.apache.hadoop.io.Text;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
@@ -208,42 +203,6 @@ public class FluoAdminImpl implements FluoAdmin {
   @Override
   public void updateSharedConfig() {
 
-    logger.info("Setting up observers using app config: {}", config.getAppConfiguration());
-
-    Map<Column, ObserverSpecification> colObservers = new HashMap<>();
-    Map<Column, ObserverSpecification> weakObservers = new HashMap<>();
-    for (ObserverSpecification ospec : config.getObserverSpecifications()) {
-
-      Observer observer;
-      try {
-        observer = Class.forName(ospec.getClassName()).asSubclass(Observer.class).newInstance();
-      } catch (ClassNotFoundException e1) {
-        throw new FluoException("Observer class '" + ospec.getClassName() + "' was not "
-            + "found.  Check for class name misspellings or failure to include "
-            + "the observer jar.", e1);
-      } catch (InstantiationException | IllegalAccessException e2) {
-        throw new FluoException("Observer class '" + ospec.getClassName()
-            + "' could not be created.", e2);
-      }
-
-      SimpleConfiguration oc = ospec.getConfiguration();
-      logger.info("Setting up observer {} using params {}.", observer.getClass().getSimpleName(),
-          oc.toMap());
-      try {
-        observer.init(new ObserverContext(config.subset(FluoConfiguration.APP_PREFIX), oc));
-      } catch (Exception e) {
-        throw new FluoException("Observer '" + ospec.getClassName() + "' could not be initialized",
-            e);
-      }
-
-      ObservedColumn observedCol = observer.getObservedColumn();
-      if (observedCol.getType() == NotificationType.STRONG) {
-        colObservers.put(observedCol.getColumn(), ospec);
-      } else {
-        weakObservers.put(observedCol.getColumn(), ospec);
-      }
-    }
-
     Properties sharedProps = new Properties();
     Iterator<String> iter = config.getKeys();
     while (iter.hasNext()) {
@@ -257,8 +216,13 @@ public class FluoAdminImpl implements FluoAdmin {
 
     try {
       CuratorFramework curator = getAppCurator();
-      Operations.updateObservers(curator, colObservers, weakObservers);
-      Operations.updateSharedConfig(curator, sharedProps);
+      ObserverUtil.initialize(curator, config);
+
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      sharedProps.store(baos, "Shared java props");
+
+      CuratorUtil.putData(curator, ZookeeperPath.CONFIG_SHARED, baos.toByteArray(),
+          CuratorUtil.NodeExistsPolicy.OVERWRITE);
     } catch (Exception e) {
       throw new FluoException("Failed to update shared configuration in Zookeeper", e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/core/src/main/java/org/apache/fluo/core/client/Operations.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/Operations.java b/modules/core/src/main/java/org/apache/fluo/core/client/Operations.java
deleted file mode 100644
index 16851a6..0000000
--- a/modules/core/src/main/java/org/apache/fluo/core/client/Operations.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.core.client;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.fluo.accumulo.util.ZookeeperPath;
-import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.core.util.ColumnUtil;
-import org.apache.fluo.core.util.CuratorUtil;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utility methods for initializing Zookeeper and Accumulo
- */
-public class Operations {
-
-  private Operations() {}
-
-  private static final Logger logger = LoggerFactory.getLogger(Operations.class);
-
-  // TODO refactor all method in this class to take a properties object... if so the prop keys would
-  // need to be public
-
-  public static void updateSharedConfig(CuratorFramework curator, Properties sharedProps)
-      throws Exception {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    sharedProps.store(baos, "Shared java props");
-
-    CuratorUtil.putData(curator, ZookeeperPath.CONFIG_SHARED, baos.toByteArray(),
-        CuratorUtil.NodeExistsPolicy.OVERWRITE);
-  }
-
-  public static void updateObservers(CuratorFramework curator,
-      Map<Column, ObserverSpecification> colObservers,
-      Map<Column, ObserverSpecification> weakObservers) throws Exception {
-
-    // TODO check that no workers are running... or make workers watch this znode
-
-    String observerPath = ZookeeperPath.CONFIG_FLUO_OBSERVERS;
-    try {
-      curator.delete().deletingChildrenIfNeeded().forPath(observerPath);
-    } catch (NoNodeException nne) {
-      // it's ok if node doesn't exist
-    } catch (Exception e) {
-      logger.error("An error occurred deleting Zookeeper node. node=[" + observerPath
-          + "], error=[" + e.getMessage() + "]");
-      throw new RuntimeException(e);
-    }
-
-    byte[] serializedObservers = serializeObservers(colObservers, weakObservers);
-    CuratorUtil.putData(curator, observerPath, serializedObservers,
-        CuratorUtil.NodeExistsPolicy.OVERWRITE);
-  }
-
-  private static void serializeObservers(DataOutputStream dos,
-      Map<Column, ObserverSpecification> colObservers) throws IOException {
-    // TODO use a human readable serialized format like json
-
-    Set<Entry<Column, ObserverSpecification>> es = colObservers.entrySet();
-
-    WritableUtils.writeVInt(dos, colObservers.size());
-
-    for (Entry<Column, ObserverSpecification> entry : es) {
-      ColumnUtil.writeColumn(entry.getKey(), dos);
-      dos.writeUTF(entry.getValue().getClassName());
-      Map<String, String> params = entry.getValue().getConfiguration().toMap();
-      WritableUtils.writeVInt(dos, params.size());
-      for (Entry<String, String> pentry : params.entrySet()) {
-        dos.writeUTF(pentry.getKey());
-        dos.writeUTF(pentry.getValue());
-      }
-    }
-  }
-
-  private static byte[] serializeObservers(Map<Column, ObserverSpecification> colObservers,
-      Map<Column, ObserverSpecification> weakObservers) throws IOException {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    try (DataOutputStream dos = new DataOutputStream(baos)) {
-      serializeObservers(dos, colObservers);
-      serializeObservers(dos, weakObservers);
-    }
-
-    byte[] serializedObservers = baos.toByteArray();
-    return serializedObservers;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
index 30b398d..4ad6ec9 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
@@ -16,17 +16,10 @@
 package org.apache.fluo.core.impl;
 
 import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.accumulo.core.client.Connector;
@@ -35,16 +28,14 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.fluo.accumulo.util.ZookeeperPath;
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.config.SimpleConfiguration;
-import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.metrics.MetricsReporter;
 import org.apache.fluo.core.metrics.MetricNames;
 import org.apache.fluo.core.metrics.MetricsReporterImpl;
+import org.apache.fluo.core.observer.RegisteredObservers;
+import org.apache.fluo.core.observer.ObserverUtil;
 import org.apache.fluo.core.util.AccumuloUtil;
-import org.apache.fluo.core.util.ColumnUtil;
 import org.apache.fluo.core.util.CuratorUtil;
-import org.apache.hadoop.io.WritableUtils;
 
 /**
  * Holds common environment configuration and shared resources
@@ -54,9 +45,7 @@ public class Environment implements AutoCloseable {
   private String table;
   private Authorizations auths = new Authorizations();
   private String accumuloInstance;
-  private Map<Column, ObserverSpecification> observers;
-  private Map<Column, ObserverSpecification> weakObservers;
-  private Set<Column> allObserversColumns;
+  private RegisteredObservers observers;
   private Connector conn;
   private String accumuloInstanceID;
   private String fluoApplicationID;
@@ -105,8 +94,6 @@ public class Environment implements AutoCloseable {
     this.auths = env.auths;
     this.accumuloInstance = env.accumuloInstance;
     this.observers = env.observers;
-    this.weakObservers = env.weakObservers;
-    this.allObserversColumns = env.allObserversColumns;
     this.conn = env.conn;
     this.accumuloInstanceID = env.accumuloInstanceID;
     this.fluoApplicationID = env.fluoApplicationID;
@@ -136,18 +123,10 @@ public class Environment implements AutoCloseable {
           new String(curator.getData().forPath(ZookeeperPath.CONFIG_ACCUMULO_TABLE),
               StandardCharsets.UTF_8);
 
-      ByteArrayInputStream bais =
-          new ByteArrayInputStream(curator.getData().forPath(ZookeeperPath.CONFIG_FLUO_OBSERVERS));
-      DataInputStream dis = new DataInputStream(bais);
-
-      observers = Collections.unmodifiableMap(readObservers(dis));
-      weakObservers = Collections.unmodifiableMap(readObservers(dis));
-      allObserversColumns = new HashSet<>();
-      allObserversColumns.addAll(observers.keySet());
-      allObserversColumns.addAll(weakObservers.keySet());
-      allObserversColumns = Collections.unmodifiableSet(allObserversColumns);
+      observers = ObserverUtil.load(curator);
 
-      bais = new ByteArrayInputStream(curator.getData().forPath(ZookeeperPath.CONFIG_SHARED));
+      ByteArrayInputStream bais =
+          new ByteArrayInputStream(curator.getData().forPath(ZookeeperPath.CONFIG_SHARED));
       Properties sharedProps = new Properties();
       sharedProps.load(bais);
 
@@ -164,29 +143,6 @@ public class Environment implements AutoCloseable {
     }
   }
 
-  private static Map<Column, ObserverSpecification> readObservers(DataInputStream dis)
-      throws IOException {
-
-    HashMap<Column, ObserverSpecification> omap = new HashMap<>();
-
-    int num = WritableUtils.readVInt(dis);
-    for (int i = 0; i < num; i++) {
-      Column col = ColumnUtil.readColumn(dis);
-      String clazz = dis.readUTF();
-      Map<String, String> params = new HashMap<>();
-      int numParams = WritableUtils.readVInt(dis);
-      for (int j = 0; j < numParams; j++) {
-        String k = dis.readUTF();
-        String v = dis.readUTF();
-        params.put(k, v);
-      }
-
-      ObserverSpecification ospec = new ObserverSpecification(clazz, params);
-      omap.put(col, ospec);
-    }
-
-    return omap;
-  }
 
   public void setAuthorizations(Authorizations auths) {
     this.auths = auths;
@@ -216,14 +172,10 @@ public class Environment implements AutoCloseable {
     return fluoApplicationID;
   }
 
-  public Map<Column, ObserverSpecification> getObservers() {
+  public RegisteredObservers getConfiguredObservers() {
     return observers;
   }
 
-  public Map<Column, ObserverSpecification> getWeakObservers() {
-    return weakObservers;
-  }
-
   public String getTable() {
     return table;
   }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
index cd08482..6ff9437 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
@@ -42,6 +42,8 @@ import org.apache.fluo.core.util.ConditionalFlutation;
 import org.apache.fluo.core.util.FluoCondition;
 import org.apache.fluo.core.util.SpanUtil;
 
+import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
+
 /**
  * This is utility code for either rolling forward or back failed transactions. A transaction is
  * deemed to have failed if the reading transaction waited too long or the transactor id does not
@@ -242,7 +244,7 @@ public class LockResolver {
 
       LockValue lv = new LockValue(entry.getValue().get());
       ColumnUtil.commitColumn(env, lv.isTrigger(), false, col, lv.isWrite(), lv.isDelete(), lockTs,
-          commitTs, env.getObservers().keySet(), mut);
+          commitTs, env.getConfiguredObservers().getObservedColumns(STRONG), mut);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index e8398f8..322ae77 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -78,6 +78,9 @@ import org.apache.fluo.core.util.Flutation;
 import org.apache.fluo.core.util.Hex;
 import org.apache.fluo.core.util.SpanUtil;
 
+import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
+import static org.apache.fluo.api.observer.Observer.NotificationType.WEAK;
+
 /**
  * Transaction implementation
  */
@@ -125,9 +128,10 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
     this.env = env;
     this.stats = new TxStats(env);
     this.startTs = startTs;
-    this.observedColumns = env.getObservers().keySet();
+    this.observedColumns = env.getConfiguredObservers().getObservedColumns(STRONG);
 
-    if (trigger != null && env.getWeakObservers().containsKey(trigger.getColumn())) {
+    if (trigger != null
+        && env.getConfiguredObservers().getObservedColumns(WEAK).contains(trigger.getColumn())) {
       this.weakNotification = trigger;
     } else {
       this.notification = trigger;
@@ -310,7 +314,7 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
     Objects.requireNonNull(row);
     Objects.requireNonNull(col);
 
-    if (!env.getWeakObservers().containsKey(col)) {
+    if (!env.getConfiguredObservers().getObservedColumns(WEAK).contains(col)) {
       throw new IllegalArgumentException("Column not configured for weak notifications " + col);
     }
 
@@ -1022,7 +1026,7 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
 
     HashMap<Bytes, Mutation> mutations = new HashMap<>();
 
-    if (env.getObservers().containsKey(cd.pcol) && isWrite(cd.pval) && !isDelete(cd.pval)) {
+    if (observedColumns.contains(cd.pcol) && isWrite(cd.pval) && !isDelete(cd.pval)) {
       Flutation m = new Flutation(env, cd.prow);
       Notification.put(env, m, cd.pcol, commitTs);
       mutations.put(cd.prow, m);
@@ -1031,7 +1035,7 @@ public class TransactionImpl extends AbstractTransactionBase implements AsyncTra
     for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
 
       for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
-        if (env.getObservers().containsKey(colUpdates.getKey())) {
+        if (observedColumns.contains(colUpdates.getKey())) {
           Bytes val = colUpdates.getValue();
           if (isWrite(val) && !isDelete(val)) {
             Mutation m = mutations.get(rowUpdates.getKey());

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverStore.java b/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverStore.java
new file mode 100644
index 0000000..bac18c3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverStore.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.fluo.api.config.FluoConfiguration;
+
+/*
+ * This interface enables abstracting the new and old way on configuring observers.
+ */
+public interface ObserverStore {
+  boolean handles(FluoConfiguration config);
+
+  void clear(CuratorFramework curator) throws Exception;
+
+  void update(CuratorFramework curator, FluoConfiguration config) throws Exception;
+
+  RegisteredObservers load(CuratorFramework curator) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverUtil.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverUtil.java b/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverUtil.java
new file mode 100644
index 0000000..0e2f94a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverUtil.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.FluoException;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.observer.v1.ObserverStoreV1;
+import org.apache.fluo.core.observer.v2.ObserverStoreV2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ObserverUtil {
+
+  private static Logger logger = LoggerFactory.getLogger(ObserverUtil.class);
+
+  public static void initialize(CuratorFramework curator, FluoConfiguration config) {
+
+    logger.info("Setting up observers using app config: {}", config.getAppConfiguration());
+
+    ObserverStore ov1 = new ObserverStoreV1();
+    ObserverStore ov2 = new ObserverStoreV2();
+
+    if (ov1.handles(config) && ov2.handles(config)) {
+      throw new IllegalArgumentException(
+          "Old and new observers configuration present.  There can only be one.");
+    }
+
+    try {
+      if (ov1.handles(config)) {
+        ov2.clear(curator);
+        ov1.update(curator, config);
+      } else if (ov2.handles(config)) {
+        ov1.clear(curator);
+        ov2.update(curator, config);
+      }
+    } catch (Exception e) {
+      throw new FluoException("Failed to update shared configuration in Zookeeper", e);
+    }
+  }
+
+  public static RegisteredObservers load(CuratorFramework curator) throws Exception {
+    ObserverStore ov1 = new ObserverStoreV1();
+    ObserverStore ov2 = new ObserverStoreV2();
+
+    // try to load observers using old and new config
+    RegisteredObservers co = ov1.load(curator);
+    if (co == null) {
+      co = ov2.load(curator);
+    }
+
+    if (co == null) {
+      // no observers configured, so return an empty provider
+      co = new RegisteredObservers() {
+        @Override
+        public Observers getObservers(Environment env) {
+          return new Observers() {
+
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public void returnObserver(Observer o) {
+              throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public Observer getObserver(Column col) {
+              throw new UnsupportedOperationException();
+            }
+          };
+        }
+
+        @Override
+        public Set<Column> getObservedColumns(NotificationType nt) {
+          return Collections.emptySet();
+        }
+      };
+    }
+
+    return co;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/core/src/main/java/org/apache/fluo/core/observer/Observers.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/Observers.java b/modules/core/src/main/java/org/apache/fluo/core/observer/Observers.java
new file mode 100644
index 0000000..d4cc366
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/Observers.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.Observer;
+
+public interface Observers extends AutoCloseable {
+  Observer getObserver(Column col);
+
+  void returnObserver(Observer o);
+
+  @Override
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/core/src/main/java/org/apache/fluo/core/observer/RegisteredObservers.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/RegisteredObservers.java b/modules/core/src/main/java/org/apache/fluo/core/observer/RegisteredObservers.java
new file mode 100644
index 0000000..0b07c1f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/RegisteredObservers.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer;
+
+import java.util.Set;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.core.impl.Environment;
+
+public interface RegisteredObservers {
+  Set<Column> getObservedColumns(Observer.NotificationType nt);
+
+  Observers getObservers(Environment env);
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverContext.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverContext.java
new file mode 100644
index 0000000..f37f2c1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverContext.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer.v1;
+
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.api.metrics.MetricsReporter;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.metrics.DummyMetricsReporter;
+
+@SuppressWarnings("deprecation")
+public class ObserverContext implements Observer.Context {
+
+  private final SimpleConfiguration observerConfig;
+  private final SimpleConfiguration appConfig;
+  private final Environment env;
+
+  public ObserverContext(SimpleConfiguration appConfig, SimpleConfiguration observerConfig) {
+    this.appConfig = appConfig;
+    this.observerConfig = observerConfig;
+    this.env = null;
+  }
+
+  public ObserverContext(Environment env, SimpleConfiguration observerConfig) {
+    this.env = env;
+    this.appConfig = null;
+    this.observerConfig = observerConfig;
+  }
+
+  @Override
+  public SimpleConfiguration getAppConfiguration() {
+    if (env == null) {
+      return appConfig;
+    }
+    return env.getAppConfiguration();
+  }
+
+  @Override
+  public SimpleConfiguration getObserverConfiguration() {
+    return observerConfig;
+  }
+
+  @Override
+  public MetricsReporter getMetricsReporter() {
+    if (env == null) {
+      return new DummyMetricsReporter();
+    }
+    return env.getMetricsReporter();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java
new file mode 100644
index 0000000..754ca28
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserverStoreV1.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer.v1;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.fluo.accumulo.util.ZookeeperPath;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.ObserverSpecification;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.FluoException;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+import org.apache.fluo.api.observer.Observer.ObservedColumn;
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.observer.RegisteredObservers;
+import org.apache.fluo.core.observer.Observers;
+import org.apache.fluo.core.observer.ObserverStore;
+import org.apache.fluo.core.util.ColumnUtil;
+import org.apache.fluo.core.util.CuratorUtil;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Support for observers configured the old way.
+ */
+@SuppressWarnings("deprecation")
+public class ObserverStoreV1 implements ObserverStore {
+
+  private static final Logger logger = LoggerFactory.getLogger(ObserverStoreV1.class);
+
+  @Override
+  public boolean handles(FluoConfiguration config) {
+    Collection<ObserverSpecification> obsSpecs = config.getObserverSpecifications();
+    return !obsSpecs.isEmpty();
+  }
+
+  @Override
+  public void update(CuratorFramework curator, FluoConfiguration config) throws Exception {
+    Collection<ObserverSpecification> obsSpecs = config.getObserverSpecifications();
+
+    Map<Column, ObserverSpecification> colObservers = new HashMap<>();
+    Map<Column, ObserverSpecification> weakObservers = new HashMap<>();
+
+    for (ObserverSpecification ospec : obsSpecs) {
+      Observer observer;
+      try {
+        observer = Class.forName(ospec.getClassName()).asSubclass(Observer.class).newInstance();
+      } catch (ClassNotFoundException e1) {
+        throw new FluoException("Observer class '" + ospec.getClassName() + "' was not "
+            + "found.  Check for class name misspellings or failure to include "
+            + "the observer jar.", e1);
+      } catch (InstantiationException | IllegalAccessException e2) {
+        throw new FluoException("Observer class '" + ospec.getClassName()
+            + "' could not be created.", e2);
+      }
+
+      SimpleConfiguration oc = ospec.getConfiguration();
+      logger.info("Setting up observer {} using params {}.", observer.getClass().getSimpleName(),
+          oc.toMap());
+      try {
+        observer.init(new ObserverContext(config.subset(FluoConfiguration.APP_PREFIX), oc));
+      } catch (Exception e) {
+        throw new FluoException("Observer '" + ospec.getClassName() + "' could not be initialized",
+            e);
+      }
+
+      ObservedColumn observedCol = observer.getObservedColumn();
+      if (observedCol.getType() == NotificationType.STRONG) {
+        colObservers.put(observedCol.getColumn(), ospec);
+      } else {
+        weakObservers.put(observedCol.getColumn(), ospec);
+      }
+    }
+
+    updateObservers(curator, colObservers, weakObservers);
+  }
+
+  private static void updateObservers(CuratorFramework curator,
+      Map<Column, ObserverSpecification> colObservers,
+      Map<Column, ObserverSpecification> weakObservers) throws Exception {
+
+    // TODO check that no workers are running... or make workers watch this znode
+
+    String observerPath = ZookeeperPath.CONFIG_FLUO_OBSERVERS1;
+    try {
+      curator.delete().deletingChildrenIfNeeded().forPath(observerPath);
+    } catch (NoNodeException nne) {
+      // it's ok if node doesn't exist
+    } catch (Exception e) {
+      logger.error("An error occurred deleting Zookeeper node. node=[" + observerPath
+          + "], error=[" + e.getMessage() + "]");
+      throw new RuntimeException(e);
+    }
+
+    byte[] serializedObservers = serializeObservers(colObservers, weakObservers);
+    CuratorUtil.putData(curator, observerPath, serializedObservers,
+        CuratorUtil.NodeExistsPolicy.OVERWRITE);
+  }
+
+  private static void serializeObservers(DataOutputStream dos,
+      Map<Column, ObserverSpecification> colObservers) throws IOException {
+    // TODO use a human readable serialized format like json
+
+    Set<Entry<Column, ObserverSpecification>> es = colObservers.entrySet();
+
+    WritableUtils.writeVInt(dos, colObservers.size());
+
+    for (Entry<Column, ObserverSpecification> entry : es) {
+      ColumnUtil.writeColumn(entry.getKey(), dos);
+      dos.writeUTF(entry.getValue().getClassName());
+      Map<String, String> params = entry.getValue().getConfiguration().toMap();
+      WritableUtils.writeVInt(dos, params.size());
+      for (Entry<String, String> pentry : params.entrySet()) {
+        dos.writeUTF(pentry.getKey());
+        dos.writeUTF(pentry.getValue());
+      }
+    }
+  }
+
+  private static byte[] serializeObservers(Map<Column, ObserverSpecification> colObservers,
+      Map<Column, ObserverSpecification> weakObservers) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (DataOutputStream dos = new DataOutputStream(baos)) {
+      serializeObservers(dos, colObservers);
+      serializeObservers(dos, weakObservers);
+    }
+
+    byte[] serializedObservers = baos.toByteArray();
+    return serializedObservers;
+  }
+
+
+  private static Map<Column, ObserverSpecification> readObservers(DataInputStream dis)
+      throws IOException {
+
+    HashMap<Column, ObserverSpecification> omap = new HashMap<>();
+
+    int num = WritableUtils.readVInt(dis);
+    for (int i = 0; i < num; i++) {
+      Column col = ColumnUtil.readColumn(dis);
+      String clazz = dis.readUTF();
+      Map<String, String> params = new HashMap<>();
+      int numParams = WritableUtils.readVInt(dis);
+      for (int j = 0; j < numParams; j++) {
+        String k = dis.readUTF();
+        String v = dis.readUTF();
+        params.put(k, v);
+      }
+
+      ObserverSpecification ospec = new ObserverSpecification(clazz, params);
+      omap.put(col, ospec);
+    }
+
+    return omap;
+  }
+
+  @Override
+  public RegisteredObservers load(CuratorFramework curator) throws Exception {
+
+    Map<Column, ObserverSpecification> observers;
+    Map<Column, ObserverSpecification> weakObservers;
+
+    ByteArrayInputStream bais;
+    try {
+      bais =
+          new ByteArrayInputStream(curator.getData().forPath(ZookeeperPath.CONFIG_FLUO_OBSERVERS1));
+    } catch (NoNodeException nne) {
+      return null;
+    }
+    DataInputStream dis = new DataInputStream(bais);
+
+    observers = Collections.unmodifiableMap(readObservers(dis));
+    weakObservers = Collections.unmodifiableMap(readObservers(dis));
+
+
+    return new RegisteredObservers() {
+
+      @Override
+      public Observers getObservers(Environment env) {
+        return new ObserversV1(env, observers, weakObservers);
+      }
+
+      @Override
+      public Set<Column> getObservedColumns(NotificationType nt) {
+        switch (nt) {
+          case STRONG:
+            return observers.keySet();
+          case WEAK:
+            return weakObservers.keySet();
+          default:
+            throw new IllegalArgumentException("Unknown notification type " + nt);
+        }
+      }
+    };
+  }
+
+  @Override
+  public void clear(CuratorFramework curator) throws Exception {
+    try {
+      curator.delete().forPath(ZookeeperPath.CONFIG_FLUO_OBSERVERS1);
+    } catch (NoNodeException nne) {
+      // nothing to delete
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
new file mode 100644
index 0000000..65c1b2f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer.v1;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.fluo.api.config.ObserverSpecification;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.observer.Observers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("deprecation")
+class ObserversV1 implements Observers {
+
+  private static final Logger log = LoggerFactory.getLogger(ObserversV1.class);
+
+  private Environment env;
+  Map<Column, List<Observer>> observers = new HashMap<>();
+  Map<Column, ObserverSpecification> strongObservers;
+  Map<Column, ObserverSpecification> weakObservers;
+
+  private List<Observer> getObserverList(Column col) {
+    List<Observer> observerList;
+    synchronized (observers) {
+      observerList = observers.get(col);
+      if (observerList == null) {
+        observerList = new ArrayList<>();
+        observers.put(col, observerList);
+      }
+    }
+    return observerList;
+  }
+
+  public ObserversV1(Environment env, Map<Column, ObserverSpecification> strongObservers,
+      Map<Column, ObserverSpecification> weakObservers) {
+    this.env = env;
+    this.strongObservers = strongObservers;
+    this.weakObservers = weakObservers;
+  }
+
+  public Observer getObserver(Column col) {
+
+    List<Observer> observerList;
+    observerList = getObserverList(col);
+
+    synchronized (observerList) {
+      if (observerList.size() > 0) {
+        return observerList.remove(observerList.size() - 1);
+      }
+    }
+
+    Observer observer = null;
+
+    ObserverSpecification observerConfig = strongObservers.get(col);
+    if (observerConfig == null) {
+      observerConfig = weakObservers.get(col);
+    }
+
+    if (observerConfig != null) {
+      try {
+        observer =
+            Class.forName(observerConfig.getClassName()).asSubclass(Observer.class).newInstance();
+        observer.init(new ObserverContext(env, observerConfig.getConfiguration()));
+      } catch (RuntimeException e) {
+        throw e;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+
+      if (!observer.getObservedColumn().getColumn().equals(col)) {
+        throw new IllegalStateException("Mismatch between configured column and class column "
+            + observerConfig.getClassName() + " " + col + " "
+            + observer.getObservedColumn().getColumn());
+      }
+    }
+
+    return observer;
+  }
+
+  public void returnObserver(Observer observer) {
+    List<Observer> olist = getObserverList(observer.getObservedColumn().getColumn());
+    synchronized (olist) {
+      olist.add(observer);
+    }
+  }
+
+  @Override
+  public void close() {
+    if (observers == null) {
+      return;
+    }
+
+    synchronized (observers) {
+      for (List<Observer> olist : observers.values()) {
+        synchronized (olist) {
+          for (Observer observer : olist) {
+            try {
+              observer.close();
+            } catch (Exception e) {
+              log.error("Failed to close observer", e);
+            }
+          }
+          olist.clear();
+        }
+      }
+    }
+
+    observers = null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservedColumn.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservedColumn.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservedColumn.java
new file mode 100644
index 0000000..3787be3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservedColumn.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer.v2;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+
+/**
+ * this class created for json serialization
+ */
+class JsonObservedColumn {
+  private byte[] fam;
+  private byte[] qual;
+  private byte[] vis;
+  private String notificationType;
+
+  JsonObservedColumn(Column col, NotificationType nt) {
+    this.fam = col.getFamily().toArray();
+    this.qual = col.getQualifier().toArray();
+    this.vis = col.getVisibility().toArray();
+    this.notificationType = nt.name();
+  }
+
+  public Column getColumn() {
+    return new Column(Bytes.of(fam), Bytes.of(qual), Bytes.of(vis));
+  }
+
+  public NotificationType getNotificationType() {
+    return NotificationType.valueOf(notificationType);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservers.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservers.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservers.java
new file mode 100644
index 0000000..44f229b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/JsonObservers.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer.v2;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+/**
+ * this class created for json serialization
+ */
+class JsonObservers {
+  String obsProviderClass;
+  List<JsonObservedColumn> observedColumns;
+
+  JsonObservers(String obsProviderClass, Map<Column, NotificationType> columns) {
+    this.obsProviderClass = obsProviderClass;
+    this.observedColumns =
+        columns.entrySet().stream()
+            .map(entry -> new JsonObservedColumn(entry.getKey(), entry.getValue()))
+            .collect(toList());
+  }
+
+  public String getObserverProviderClass() {
+    return obsProviderClass;
+  }
+
+  public Map<Column, NotificationType> getObservedColumns() {
+    return observedColumns.stream().collect(
+        toMap(JsonObservedColumn::getColumn, JsonObservedColumn::getNotificationType));
+  }
+
+  @Override
+  public String toString() {
+    return obsProviderClass + " " + getObservedColumns();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverProviderContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverProviderContextImpl.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverProviderContextImpl.java
new file mode 100644
index 0000000..3d2b1f3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverProviderContextImpl.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer.v2;
+
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.api.metrics.MetricsReporter;
+import org.apache.fluo.api.observer.ObserverProvider.Context;
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.metrics.DummyMetricsReporter;
+
+public class ObserverProviderContextImpl implements Context {
+
+  private SimpleConfiguration appConfig;
+  private final Environment env;
+
+  public ObserverProviderContextImpl(SimpleConfiguration appConfig) {
+    this.appConfig = appConfig;
+    this.env = null;
+  }
+
+  public ObserverProviderContextImpl(Environment env) {
+    this.env = env;
+    this.appConfig = null;
+  }
+
+  @Override
+  public SimpleConfiguration getAppConfiguration() {
+    if (env == null) {
+      return appConfig;
+    }
+    return env.getAppConfiguration();
+  }
+
+  @Override
+  public MetricsReporter getMetricsReporter() {
+    if (env == null) {
+      return new DummyMetricsReporter();
+    }
+    return env.getMetricsReporter();
+  }
+
+}


Mime
View raw message