fluo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [1/2] incubator-fluo git commit: fixes #816 introduced better way to setup observers and deprecated old
Date Wed, 29 Mar 2017 17:06:23 GMT
Repository: incubator-fluo
Updated Branches:
  refs/heads/master bf133862a -> d6af386bf


http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverStoreV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverStoreV2.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverStoreV2.java
new file mode 100644
index 0000000..2e544cd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverStoreV2.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer.v2;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.FluoException;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+import org.apache.fluo.api.observer.ObserverProvider;
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.observer.RegisteredObservers;
+import org.apache.fluo.core.observer.Observers;
+import org.apache.fluo.core.observer.ObserverStore;
+import org.apache.fluo.core.util.CuratorUtil;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.fluo.accumulo.util.ZookeeperPath.CONFIG_FLUO_OBSERVERS2;
+
+/*
+ * Support for observers configured the new way.
+ */
+public class ObserverStoreV2 implements ObserverStore {
+
+  @Override
+  public boolean handles(FluoConfiguration config) {
+    return !config.getObserverProvider().isEmpty();
+  }
+
+  @Override
+  public void update(CuratorFramework curator, FluoConfiguration config) throws Exception {
+    String obsProviderClass = config.getObserverProvider();
+
+    ObserverProvider observerProvider = newObserverProvider(obsProviderClass);
+
+    Map<Column, NotificationType> obsCols = new HashMap<>();
+    BiConsumer<Column, NotificationType> obsColConsumer = (col, nt) -> {
+      Objects.requireNonNull(col, "Observed column must be non-null");
+      Objects.requireNonNull(nt, "Notification type must be non-null");
+      Preconditions.checkArgument(!obsCols.containsKey(col), "Duplicate observed column %s", col);
+      obsCols.put(col, nt);
+    };
+
+    observerProvider.provideColumns(obsColConsumer,
+        new ObserverProviderContextImpl(config.getAppConfiguration()));
+
+    Gson gson = new Gson();
+    String json = gson.toJson(new JsonObservers(obsProviderClass, obsCols));
+    CuratorUtil.putData(curator, CONFIG_FLUO_OBSERVERS2, json.getBytes(UTF_8),
+        CuratorUtil.NodeExistsPolicy.OVERWRITE);
+
+  }
+
+  static ObserverProvider newObserverProvider(String obsProviderClass) {
+    ObserverProvider observerProvider;
+    try {
+      observerProvider =
+          Class.forName(obsProviderClass).asSubclass(ObserverProvider.class).newInstance();
+    } catch (ClassNotFoundException e1) {
+      throw new FluoException("ObserverProvider class '" + obsProviderClass + "' was not "
+          + "found.  Check for class name misspellings or failure to include "
+          + "the observer provider jar.", e1);
+    } catch (InstantiationException | IllegalAccessException e2) {
+      throw new FluoException("ObserverProvider class '" + obsProviderClass
+          + "' could not be created.", e2);
+    }
+    return observerProvider;
+  }
+
+  @Override
+  public RegisteredObservers load(CuratorFramework curator) throws Exception {
+    byte[] data;
+    try {
+      data = curator.getData().forPath(CONFIG_FLUO_OBSERVERS2);
+    } catch (NoNodeException nne) {
+      return null;
+    }
+    String json = new String(data, UTF_8);
+    JsonObservers jco = new Gson().fromJson(json, JsonObservers.class);
+
+    Set<Column> weakColumns = new HashSet<>();
+    Set<Column> strongColumns = new HashSet<>();
+
+    for (Entry<Column, NotificationType> entry : jco.getObservedColumns().entrySet()) {
+      switch (entry.getValue()) {
+        case STRONG:
+          strongColumns.add(entry.getKey());
+          break;
+        case WEAK:
+          weakColumns.add(entry.getKey());
+          break;
+        default:
+          throw new IllegalStateException("Unknown notification type " + entry.getValue());
+      }
+    }
+
+    return new RegisteredObservers() {
+
+      @Override
+      public Observers getObservers(Environment env) {
+        return new ObserversV2(env, jco, strongColumns, weakColumns);
+      }
+
+      @Override
+      public Set<Column> getObservedColumns(NotificationType nt) {
+        switch (nt) {
+          case STRONG:
+            return strongColumns;
+          case WEAK:
+            return weakColumns;
+          default:
+            throw new IllegalArgumentException("Unknown notification type " + nt);
+        }
+      }
+    };
+  }
+
+  @Override
+  public void clear(CuratorFramework curator) throws Exception {
+    try {
+      curator.delete().forPath(CONFIG_FLUO_OBSERVERS2);
+    } catch (NoNodeException nne) {
+      // nothing to delete
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserversV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserversV2.java b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserversV2.java
new file mode 100644
index 0000000..d9d4a97
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserversV2.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer.v2;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import com.google.common.collect.Sets.SetView;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.FluoException;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+import org.apache.fluo.api.observer.ObserverProvider;
+import org.apache.fluo.api.observer.ObserverProvider.Registry;
+import org.apache.fluo.api.observer.StringObserver;
+import org.apache.fluo.core.impl.Environment;
+import org.apache.fluo.core.observer.Observers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ObserversV2 implements Observers {
+
+  private static final Logger log = LoggerFactory.getLogger(ObserversV2.class);
+
+  Map<Column, Observer> observers;
+
+  public ObserversV2(Environment env, JsonObservers jco, Set<Column> strongColumns,
+      Set<Column> weakColumns) {
+    observers = new HashMap<>();
+
+    ObserverProvider obsProvider =
+        ObserverStoreV2.newObserverProvider(jco.getObserverProviderClass());
+
+    ObserverProviderContextImpl ctx = new ObserverProviderContextImpl(env);
+
+    Registry or = new Registry() {
+
+      @Override
+      public void register(Column col, NotificationType nt, Observer obs) {
+        try {
+          Method closeMethod = obs.getClass().getMethod("close");
+          if (!closeMethod.getDeclaringClass().equals(Observer.class)) {
+            log.warn(
+                "Observer {} implements close().  Close is not called on Observers created using ObserverProvider."
+                    + " Close is only called on Observers configured the old way.", obs.getClass()
+                    .getName());
+          }
+        } catch (NoSuchMethodException | SecurityException e) {
+          throw new RuntimeException("Failed to check if close() is implemented", e);
+        }
+
+        if (nt == NotificationType.STRONG && !strongColumns.contains(col)) {
+          throw new IllegalArgumentException("Column " + col
+              + " not previously configured for strong notifications");
+        }
+
+        if (nt == NotificationType.WEAK && !weakColumns.contains(col)) {
+          throw new IllegalArgumentException("Column " + col
+              + " not previously configured for weak notifications");
+        }
+
+        if (observers.containsKey(col)) {
+          throw new IllegalArgumentException("Duplicate observed column " + col);
+        }
+
+        observers.put(col, obs);
+      }
+
+      @Override
+      public void registers(Column col, NotificationType nt, StringObserver obs) {
+        register(col, nt, obs);
+      }
+    };
+
+    obsProvider.provide(or, ctx);
+
+    // the following check ensures observers are provided for all previously configured columns
+    SetView<Column> diff =
+        Sets.difference(observers.keySet(), Sets.union(strongColumns, weakColumns));
+    if (diff.size() > 0) {
+      throw new FluoException("ObserverProvider " + jco.getObserverProviderClass()
+          + " did not provide observers for columns " + diff);
+    }
+  }
+
+  @Override
+  public Observer getObserver(Column col) {
+    return observers.get(col);
+  }
+
+  @Override
+  public void returnObserver(Observer o) {}
+
+  @Override
+  public void close() {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
index 1834835..56aa78d 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
@@ -29,6 +29,7 @@ import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.Notification;
+import org.apache.fluo.core.observer.Observers;
 import org.apache.fluo.core.util.FluoExecutors;
 import org.apache.fluo.core.util.Hex;
 import org.slf4j.Logger;
@@ -50,7 +51,7 @@ public class NotificationProcessor implements AutoCloseable {
     this.queue = new PriorityBlockingQueue<>();
     this.executor = FluoExecutors.newFixedThreadPool(numThreads, queue, "ntfyProc");
     this.tracker = new NotificationTracker();
-    this.observers = new Observers(env);
+    this.observers = env.getConfiguredObservers().getObservers(env);
     env.getSharedResources().getMetricRegistry()
         .register(env.getMetricNames().getNotificationQueued(), new Gauge<Integer>() {
           @Override

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/core/src/main/java/org/apache/fluo/core/worker/ObserverContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/ObserverContext.java b/modules/core/src/main/java/org/apache/fluo/core/worker/ObserverContext.java
deleted file mode 100644
index 47d5997..0000000
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/ObserverContext.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.core.worker;
-
-import org.apache.fluo.api.config.SimpleConfiguration;
-import org.apache.fluo.api.metrics.MetricsReporter;
-import org.apache.fluo.api.observer.Observer;
-import org.apache.fluo.core.impl.Environment;
-import org.apache.fluo.core.metrics.DummyMetricsReporter;
-
-public class ObserverContext implements Observer.Context {
-
-  private final SimpleConfiguration observerConfig;
-  private final SimpleConfiguration appConfig;
-  private final Environment env;
-
-  public ObserverContext(SimpleConfiguration appConfig, SimpleConfiguration observerConfig) {
-    this.appConfig = appConfig;
-    this.observerConfig = observerConfig;
-    this.env = null;
-  }
-
-  public ObserverContext(Environment env, SimpleConfiguration observerConfig) {
-    this.env = env;
-    this.appConfig = null;
-    this.observerConfig = observerConfig;
-  }
-
-  @Override
-  public SimpleConfiguration getAppConfiguration() {
-    if (env == null) {
-      return appConfig;
-    }
-    return env.getAppConfiguration();
-  }
-
-  @Override
-  public SimpleConfiguration getObserverConfiguration() {
-    return observerConfig;
-  }
-
-  @Override
-  public MetricsReporter getMetricsReporter() {
-    if (env == null) {
-      return new DummyMetricsReporter();
-    }
-    return env.getMetricsReporter();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/core/src/main/java/org/apache/fluo/core/worker/Observers.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/Observers.java b/modules/core/src/main/java/org/apache/fluo/core/worker/Observers.java
deleted file mode 100644
index 285a69a..0000000
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/Observers.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.fluo.core.worker;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.observer.Observer;
-import org.apache.fluo.core.impl.Environment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class Observers implements AutoCloseable {
-
-  private static final Logger log = LoggerFactory.getLogger(Observers.class);
-
-  private Environment env;
-  Map<Column, List<Observer>> observers = new HashMap<>();
-
-  private List<Observer> getObserverList(Column col) {
-    List<Observer> observerList;
-    synchronized (observers) {
-      observerList = observers.get(col);
-      if (observerList == null) {
-        observerList = new ArrayList<>();
-        observers.put(col, observerList);
-      }
-    }
-    return observerList;
-  }
-
-  public Observers(Environment env) {
-    this.env = env;
-  }
-
-  public Observer getObserver(Column col) {
-
-    List<Observer> observerList;
-    observerList = getObserverList(col);
-
-    synchronized (observerList) {
-      if (observerList.size() > 0) {
-        return observerList.remove(observerList.size() - 1);
-      }
-    }
-
-    Observer observer = null;
-
-    ObserverSpecification observerConfig = env.getObservers().get(col);
-    if (observerConfig == null) {
-      observerConfig = env.getWeakObservers().get(col);
-    }
-
-    if (observerConfig != null) {
-      try {
-        observer =
-            Class.forName(observerConfig.getClassName()).asSubclass(Observer.class).newInstance();
-        observer.init(new ObserverContext(env, observerConfig.getConfiguration()));
-      } catch (RuntimeException e) {
-        throw e;
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-
-      if (!observer.getObservedColumn().getColumn().equals(col)) {
-        throw new IllegalStateException("Mismatch between configured column and class column "
-            + observerConfig.getClassName() + " " + col + " "
-            + observer.getObservedColumn().getColumn());
-      }
-    }
-
-    return observer;
-  }
-
-  public void returnObserver(Observer observer) {
-    List<Observer> olist = getObserverList(observer.getObservedColumn().getColumn());
-    synchronized (olist) {
-      olist.add(observer);
-    }
-  }
-
-  @Override
-  public void close() {
-    if (observers == null) {
-      return;
-    }
-
-    synchronized (observers) {
-      for (List<Observer> olist : observers.values()) {
-        synchronized (olist) {
-          for (Observer observer : olist) {
-            try {
-              observer.close();
-            } catch (Exception e) {
-              log.error("Failed to close observer", e);
-            }
-          }
-          olist.clear();
-        }
-      }
-    }
-
-    observers = null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java b/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
index 895fc21..ff582ed 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
@@ -23,6 +23,7 @@ import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.Notification;
 import org.apache.fluo.core.impl.TransactionImpl;
 import org.apache.fluo.core.log.TracingTransaction;
+import org.apache.fluo.core.observer.Observers;
 import org.apache.fluo.core.util.Hex;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/distribution/src/main/config/fluo.properties
----------------------------------------------------------------------
diff --git a/modules/distribution/src/main/config/fluo.properties b/modules/distribution/src/main/config/fluo.properties
index ad0c97f..f9f603d 100644
--- a/modules/distribution/src/main/config/fluo.properties
+++ b/modules/distribution/src/main/config/fluo.properties
@@ -60,10 +60,9 @@ fluo.admin.accumulo.classpath=\${fluo.admin.hdfs.root}/fluo/lib/fluo-api-${proje
 
 # Observer properties
 # -------------------
-# Specifies observers
-# fluo.observer.0=com.foo.Observer1
-# Can optionally have configuration key values
-# fluo.observer.1=com.foo.Observer2,configKey1=configVal1,configKey2=configVal2
+# Specifies an observer provider.  This should be the name of a class that
+# implements org.apache.fluo.api.observer.ObserverProvider.
+#fluo.observer.provider=com.foo.AppObserverProvider
 
 # Transaction properties
 # ----------------------

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java b/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
index 8f6b425..8f644eb 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/ITBase.java
@@ -16,8 +16,6 @@
 package org.apache.fluo.integration;
 
 import java.io.File;
-import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.accumulo.core.client.Connector;
@@ -30,8 +28,8 @@ import org.apache.commons.io.FileUtils;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.api.observer.ObserverProvider;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
@@ -80,8 +78,15 @@ public class ITBase {
     conn = miniAccumulo.getConnector(USER, new PasswordToken(PASSWORD));
   }
 
-  protected List<ObserverSpecification> getObservers() {
-    return Collections.emptyList();
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return null;
+  }
+
+  protected void setupObservers(FluoConfiguration fc) {
+    Class<? extends ObserverProvider> ofc = getObserverProviderClass();
+    if (ofc != null) {
+      fc.setObserverProvider(ofc);
+    }
   }
 
   public String getCurTableName() {

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java b/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java
index 656a5d9..481e6e8 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseImpl.java
@@ -75,7 +75,7 @@ public class ITBaseImpl extends ITBase {
     config.setAccumuloZookeepers(miniAccumulo.getZooKeepers());
     config.setInstanceZookeepers(miniAccumulo.getZooKeepers() + "/fluo");
     config.setTransactionRollbackTime(1, TimeUnit.SECONDS);
-    config.addObservers(getObservers());
+    setupObservers(config);
     config.setProperty(FluoConfigurationImpl.ZK_UPDATE_PERIOD_PROP, "1000");
     config.setMiniStartAccumulo(false);
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseMini.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseMini.java b/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseMini.java
index 0d28381..bcef827 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseMini.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/ITBaseMini.java
@@ -49,7 +49,7 @@ public class ITBaseMini extends ITBase {
     config.setInstanceZookeepers(miniAccumulo.getZooKeepers() + "/fluo");
     config.setAccumuloTable(getNextTableName());
     config.setWorkerThreads(5);
-    config.addObservers(getObservers());
+    setupObservers(config);
     config.setMiniStartAccumulo(false);
 
     setConfig(config);

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java
index 22f8ea2..7c48cc3 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java
@@ -15,9 +15,6 @@
 
 package org.apache.fluo.integration.impl;
 
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.fluo.api.client.FluoAdmin;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
@@ -25,25 +22,28 @@ import org.apache.fluo.api.client.Loader;
 import org.apache.fluo.api.client.LoaderExecutor;
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.config.SimpleConfiguration;
-import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.ObserverProvider;
 import org.apache.fluo.integration.ITBaseMini;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
+
 public class AppConfigIT extends ITBaseMini {
 
+  public static final Column DF_COL = new Column("data", "foo");
+  public static final Column DB_COL = new Column("data", "bar");
+
   @Override
   protected void setAppConfig(SimpleConfiguration config) {
     config.setProperty("myapp.sizeLimit", 50000);
   }
 
   @Override
-  protected List<ObserverSpecification> getObservers() {
-    return Collections.singletonList(new ObserverSpecification(TestObserver.class.getName()));
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return TestObserverProvider.class;
   }
 
   @Test
@@ -89,32 +89,24 @@ public class AppConfigIT extends ITBaseMini {
     public void load(TransactionBase tx, Context context) throws Exception {
       int limit = context.getAppConfiguration().getInt("myapp.sizeLimit");
       if (data < limit) {
-        tx.set(row, new Column("data", "foo"), Integer.toString(data));
+        tx.set(row, DF_COL, Integer.toString(data));
       }
     }
   }
 
-  public static class TestObserver extends AbstractObserver {
-
-    private int limit;
-
+  public static class TestObserverProvider implements ObserverProvider {
     @Override
-    public ObservedColumn getObservedColumn() {
-      return new ObservedColumn(new Column("data", "foo"), NotificationType.STRONG);
+    public void provide(Registry or, Context ctx) {
+      int limit = ctx.getAppConfiguration().getInt("myapp.sizeLimit");
+
+      or.registers(DF_COL, STRONG, (tx, row, col) -> {
+        int d = Integer.parseInt(tx.gets(row, col));
+        if (2 * d < limit) {
+          tx.set(row.toString(), DB_COL, Integer.toString(2 * d));
+        }
+      });
     }
 
-    @Override
-    public void init(Context context) {
-      limit = context.getAppConfiguration().getInt("myapp.sizeLimit");
-    }
-
-    @Override
-    public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
-      int d = Integer.parseInt(tx.gets(row.toString(), col));
-      if (2 * d < limit) {
-        tx.set(row.toString(), new Column("data", "bar"), Integer.toString(2 * d));
-      }
-    }
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
index 3af4fdc..f927102 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
@@ -15,9 +15,7 @@
 
 package org.apache.fluo.integration.impl;
 
-import java.util.Collections;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map.Entry;
 import java.util.Random;
 
@@ -32,11 +30,9 @@ import org.apache.fluo.api.client.LoaderExecutor;
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.observer.AbstractObserver;
-import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+import org.apache.fluo.api.observer.ObserverProvider;
 import org.apache.fluo.core.impl.FluoConfigurationImpl;
 import org.apache.fluo.core.util.UtilWaitThread;
 import org.apache.fluo.integration.ITBaseMini;
@@ -81,27 +77,22 @@ public class CollisionIT extends ITBaseMini {
     }
   }
 
-  public static class TotalObserver extends AbstractObserver {
-
+  public static class CollisionObserverProvider implements ObserverProvider {
     @Override
-    public Observer.ObservedColumn getObservedColumn() {
-      return new Observer.ObservedColumn(STAT_CHANGED, NotificationType.WEAK);
-    }
-
-    @Override
-    public void process(TransactionBase tx, Bytes rowBytes, Column col) throws Exception {
-      String row = rowBytes.toString();
-      int total = Integer.parseInt(tx.gets(row, STAT_TOTAL));
-      int processed = TestUtil.getOrDefault(tx, row, STAT_PROCESSED, 0);
-
-      tx.set(row, STAT_PROCESSED, total + "");
-      TestUtil.increment(tx, "all", STAT_TOTAL, total - processed);
+    public void provide(Registry or, Context ctx) {
+      or.registers(STAT_CHANGED, NotificationType.WEAK, (tx, row, col) -> {
+        int total = Integer.parseInt(tx.gets(row, STAT_TOTAL));
+        int processed = TestUtil.getOrDefault(tx, row, STAT_PROCESSED, 0);
+
+        tx.set(row, STAT_PROCESSED, total + "");
+        TestUtil.increment(tx, "all", STAT_TOTAL, total - processed);
+      });
     }
   }
 
   @Override
-  protected List<ObserverSpecification> getObservers() {
-    return Collections.singletonList(new ObserverSpecification(TotalObserver.class.getName()));
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return CollisionObserverProvider.class;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java
index 6b4d279..d86f2ef 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java
@@ -15,9 +15,7 @@
 
 package org.apache.fluo.integration.impl;
 
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map.Entry;
 import java.util.Random;
 
@@ -32,12 +30,12 @@ import org.apache.fluo.accumulo.util.LongUtil;
 import org.apache.fluo.accumulo.util.ZookeeperUtil;
 import org.apache.fluo.accumulo.values.DelLockValue;
 import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.exceptions.CommitException;
 import org.apache.fluo.api.exceptions.FluoException;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.ObserverProvider;
 import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
 import org.apache.fluo.core.exceptions.StaleScanException;
 import org.apache.fluo.core.impl.Notification;
@@ -54,6 +52,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
 import static org.apache.fluo.integration.BankUtil.BALANCE;
 
 public class FailureIT extends ITBaseImpl {
@@ -61,22 +60,21 @@ public class FailureIT extends ITBaseImpl {
   @Rule
   public ExpectedException exception = ExpectedException.none();
 
-  public static class NullObserver extends AbstractObserver {
-
+  public static class NullObserver implements Observer {
     @Override
     public void process(TransactionBase tx, Bytes row, Column col) throws Exception {}
+  }
 
+  public static class FailuresObserverProvider implements ObserverProvider {
     @Override
-    public ObservedColumn getObservedColumn() {
-      return new ObservedColumn(new Column("attr", "lastupdate"), NotificationType.STRONG);
+    public void provide(Registry or, Context ctx) {
+      or.register(new Column("attr", "lastupdate"), STRONG, new NullObserver());
     }
   }
 
   @Override
-  protected List<ObserverSpecification> getObservers() {
-    List<ObserverSpecification> observed = new ArrayList<>();
-    observed.add(new ObserverSpecification(NullObserver.class.getName()));
-    return observed;
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return FailuresObserverProvider.class;
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
index e67d4d9..288bed9 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
@@ -18,7 +18,6 @@ package org.apache.fluo.integration.impl;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 
 import com.google.common.collect.ImmutableMap;
@@ -28,16 +27,15 @@ import org.apache.fluo.api.client.FluoAdmin;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.client.scanner.CellScanner;
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumnValue;
 import org.apache.fluo.api.data.Span;
 import org.apache.fluo.api.exceptions.CommitException;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+import org.apache.fluo.api.observer.ObserverProvider;
 import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.TransactionImpl.CommitData;
@@ -52,23 +50,19 @@ import static org.apache.fluo.integration.BankUtil.BALANCE;
 
 public class FluoIT extends ITBaseImpl {
 
-  public static class BalanceObserver extends AbstractObserver {
-
-    @Override
-    public ObservedColumn getObservedColumn() {
-      return new ObservedColumn(BALANCE, NotificationType.STRONG);
-    }
-
+  public static class FluoITObserverProvider implements ObserverProvider {
     @Override
-    public void process(TransactionBase tx, Bytes row, Column col) {
-      Assert.fail();
+    public void provide(Registry or, Context ctx) {
+      or.register(BALANCE, NotificationType.STRONG, (tx, row, col) -> {
+        Assert.fail();
+      });
     }
   }
 
   @Override
-  protected List<org.apache.fluo.api.config.ObserverSpecification> getObservers() {
-    return Arrays.asList(new ObserverSpecification(BalanceObserver.class.getName()));
-  };
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return FluoITObserverProvider.class;
+  }
 
   @Test
   public void testFluoFactory() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/integration/src/test/java/org/apache/fluo/integration/impl/NotificationGcIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/NotificationGcIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/NotificationGcIT.java
index b496955..e0a08ac 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/NotificationGcIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/NotificationGcIT.java
@@ -15,8 +15,6 @@
 
 package org.apache.fluo.integration.impl;
 
-import java.util.Collections;
-import java.util.List;
 import java.util.Map.Entry;
 
 import com.google.common.collect.Iterables;
@@ -24,14 +22,14 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.fluo.accumulo.util.ColumnConstants;
-import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.ObserverProvider;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.Notification;
 import org.apache.fluo.core.util.ByteUtil;
 import org.apache.fluo.integration.ITBaseMini;
 import org.apache.fluo.integration.TestTransaction;
-import org.apache.fluo.integration.impl.WeakNotificationIT.SimpleObserver;
+import org.apache.fluo.integration.impl.WeakNotificationIT.WeakNotificationITObserverProvider;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -60,8 +58,8 @@ public class NotificationGcIT extends ITBaseMini {
   }
 
   @Override
-  protected List<ObserverSpecification> getObservers() {
-    return Collections.singletonList(new ObserverSpecification(SimpleObserver.class.getName()));
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return WeakNotificationITObserverProvider.class;
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java
index bfd8e25..5c9c02f 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/ObserverConfigIT.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.client.Transaction;
 import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.config.SimpleConfiguration;
 import org.apache.fluo.api.data.Bytes;
@@ -35,6 +36,7 @@ import org.apache.fluo.integration.ITBaseMini;
 import org.junit.Assert;
 import org.junit.Test;
 
+@Deprecated
 public class ObserverConfigIT extends ITBaseMini {
 
   public static class ConfigurableObserver extends AbstractObserver {
@@ -95,7 +97,7 @@ public class ObserverConfigIT extends ITBaseMini {
   }
 
   @Override
-  protected List<ObserverSpecification> getObservers() {
+  protected void setupObservers(FluoConfiguration fc) {
     List<ObserverSpecification> observers = new ArrayList<>();
 
     observers.add(new ObserverSpecification(ConfigurableObserver.class.getName(), newMap(
@@ -108,7 +110,7 @@ public class ObserverConfigIT extends ITBaseMini {
     observers.add(new ObserverSpecification(ConfigurableObserver.class.getName(), newMap(
         "observedCol", "fam1:col3:" + NotificationType.WEAK, "outputCQ", "col4")));
 
-    return observers;
+    fc.addObservers(observers);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
index fe4b0d6..5381952 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
@@ -22,14 +22,16 @@ import java.util.List;
 
 import org.apache.fluo.api.client.Transaction;
 import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.ObserverProvider;
 import org.apache.fluo.integration.ITBaseMini;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
+
 /**
  * Test an observer notifying the column its observing. This is a useful pattern for exporting data.
  */
@@ -39,15 +41,9 @@ public class SelfNotificationIT extends ITBaseMini {
   private static final Column EXPORT_CHECK_COL = new Column("export", "check");
   private static final Column EXPORT_COUNT_COL = new Column("export", "count");
 
-  @Override
-  protected List<ObserverSpecification> getObservers() {
-    return Collections.singletonList(new ObserverSpecification(ExportingObserver.class.getName()));
-  }
-
   private static List<String> exports = new ArrayList<>();
 
-  public static class ExportingObserver extends AbstractObserver {
-
+  public static class ExportingObserver implements Observer {
     @Override
     public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
       String r = row.toString();
@@ -69,13 +65,20 @@ public class SelfNotificationIT extends ITBaseMini {
     private void export(Bytes row, String exportCount) {
       exports.add(exportCount);
     }
+  }
 
+  public static class SelfNtfyObserverProvider implements ObserverProvider {
     @Override
-    public ObservedColumn getObservedColumn() {
-      return new ObservedColumn(EXPORT_COUNT_COL, NotificationType.STRONG);
+    public void provide(Registry or, Context ctx) {
+      or.register(EXPORT_COUNT_COL, STRONG, new ExportingObserver());
     }
   }
 
+  @Override
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return SelfNtfyObserverProvider.class;
+  }
+
   @Test
   public void test1() throws Exception {
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java
index 1d065e1..ce002cb 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java
@@ -15,15 +15,10 @@
 
 package org.apache.fluo.integration.impl;
 
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.ObserverProvider;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.TransactionImpl.CommitData;
 import org.apache.fluo.core.impl.TransactorNode;
@@ -32,30 +27,26 @@ import org.apache.fluo.integration.TestTransaction;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
+
 public class StrongNotificationIT extends ITBaseMini {
 
   private static final Column OC = new Column("f", "q");
   private static final Column RC = new Column("f", "r");
 
-  public static class SimpleObserver extends AbstractObserver {
-
-    @Override
-    public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
-      String r = row.toString();
-
-      String v = tx.gets(r, col);
-      tx.set(v, RC, r);
-    }
-
+  public static class StrongNtfyObserverProvider implements ObserverProvider {
     @Override
-    public ObservedColumn getObservedColumn() {
-      return new ObservedColumn(OC, NotificationType.STRONG);
+    public void provide(Registry or, Context ctx) {
+      or.register(OC, STRONG, (tx, row, col) -> {
+        Bytes v = tx.get(row, col);
+        tx.set(v, RC, row);
+      });
     }
   }
 
   @Override
-  protected List<ObserverSpecification> getObservers() {
-    return Collections.singletonList(new ObserverSpecification(SimpleObserver.class.getName()));
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return StrongNtfyObserverProvider.class;
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
index 22c6632..2bd4ce9 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
@@ -15,18 +15,15 @@
 
 package org.apache.fluo.integration.impl;
 
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.fluo.api.client.Transaction;
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.client.scanner.CellScanner;
-import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumnValue;
 import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.ObserverProvider;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.TransactionImpl.CommitData;
 import org.apache.fluo.core.oracle.Stamp;
@@ -36,13 +33,14 @@ import org.apache.fluo.integration.TestUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.fluo.api.observer.Observer.NotificationType.WEAK;
+
 public class WeakNotificationIT extends ITBaseMini {
 
   private static final Column STAT_COUNT = new Column("stat", "count");
   private static final Column STAT_CHECK = new Column("stat", "check");
 
-  public static class SimpleObserver extends AbstractObserver {
-
+  public static class SimpleObserver implements Observer {
     @Override
     public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
 
@@ -61,16 +59,18 @@ public class WeakNotificationIT extends ITBaseMini {
         tx.set(row.toString(), STAT_COUNT, sum + "");
       }
     }
+  }
 
+  public static class WeakNotificationITObserverProvider implements ObserverProvider {
     @Override
-    public ObservedColumn getObservedColumn() {
-      return new ObservedColumn(STAT_CHECK, NotificationType.WEAK);
+    public void provide(Registry or, Context ctx) {
+      or.register(STAT_CHECK, WEAK, new SimpleObserver());
     }
   }
 
   @Override
-  protected List<ObserverSpecification> getObservers() {
-    return Collections.singletonList(new ObserverSpecification(SimpleObserver.class.getName()));
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return WeakNotificationITObserverProvider.class;
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
index 13bb7a7..9dcf6dd 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
@@ -15,9 +15,7 @@
 
 package org.apache.fluo.integration.impl;
 
-import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.Scanner;
@@ -25,11 +23,10 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.ObserverProvider;
+import org.apache.fluo.api.observer.StringObserver;
 import org.apache.fluo.core.impl.Notification;
 import org.apache.fluo.core.impl.TransactionImpl.CommitData;
 import org.apache.fluo.core.oracle.Stamp;
@@ -39,38 +36,35 @@ import org.apache.fluo.integration.TestUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.fluo.api.observer.Observer.NotificationType.WEAK;
+
 public class WeakNotificationOverlapIT extends ITBaseImpl {
 
   private static final Column STAT_TOTAL = new Column("stat", "total");
   private static final Column STAT_PROCESSED = new Column("stat", "processed");
   private static final Column STAT_CHANGED = new Column("stat", "changed");
 
-  public static class TotalObserver extends AbstractObserver {
-
-    @Override
-    public ObservedColumn getObservedColumn() {
-      return new ObservedColumn(STAT_CHANGED, NotificationType.WEAK);
+  private static final StringObserver TOTAL_OBSERVER = (tx, row, col) -> {
+    String totalStr = tx.gets(row, STAT_TOTAL);
+    if (totalStr == null) {
+      return;
     }
+    Integer total = Integer.parseInt(totalStr);
+    int processed = TestUtil.getOrDefault(tx, row, STAT_PROCESSED, 0);
+    tx.set(row, new Column("stat", "processed"), total + "");
+    TestUtil.increment(tx, "all", new Column("stat", "total"), total - processed);
+  };
 
+  public static class WeakNtfyObserverProvider implements ObserverProvider {
     @Override
-    public void process(TransactionBase tx, Bytes row, Column col) {
-      String r = row.toString();
-      String totalStr = tx.gets(r, STAT_TOTAL);
-      if (totalStr == null) {
-        return;
-      }
-      Integer total = Integer.parseInt(totalStr);
-      int processed = TestUtil.getOrDefault(tx, r, STAT_PROCESSED, 0);
-      tx.set(r, new Column("stat", "processed"), total + "");
-      TestUtil.increment(tx, "all", new Column("stat", "total"), total - processed);
+    public void provide(Registry or, Context ctx) {
+      or.registers(STAT_CHANGED, WEAK, TOTAL_OBSERVER);
     }
   }
 
-
-
   @Override
-  protected List<ObserverSpecification> getObservers() {
-    return Collections.singletonList(new ObserverSpecification(TotalObserver.class.getName()));
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return WeakNtfyObserverProvider.class;
   }
 
   @Test
@@ -92,7 +86,7 @@ public class WeakNotificationOverlapIT extends ITBaseImpl {
 
     Assert.assertEquals(1, countNotifications());
 
-    new TotalObserver().process(ttx2, Bytes.of("1"), STAT_CHANGED);
+    TOTAL_OBSERVER.process(ttx2, Bytes.of("1"), STAT_CHANGED);
     // should not delete notification created by ttx3
     ttx2.done();
 
@@ -103,7 +97,7 @@ public class WeakNotificationOverlapIT extends ITBaseImpl {
     Assert.assertEquals(1, countNotifications());
 
     TestTransaction ttx4 = new TestTransaction(env, "1", STAT_CHANGED);
-    new TotalObserver().process(ttx4, Bytes.of("1"), STAT_CHANGED);
+    TOTAL_OBSERVER.process(ttx4, Bytes.of("1"), STAT_CHANGED);
     ttx4.done();
 
     Assert.assertEquals(0, countNotifications());
@@ -132,7 +126,7 @@ public class WeakNotificationOverlapIT extends ITBaseImpl {
 
     Assert.assertEquals(1, countNotifications());
 
-    new TotalObserver().process(ttx6, Bytes.of("1"), STAT_CHANGED);
+    TOTAL_OBSERVER.process(ttx6, Bytes.of("1"), STAT_CHANGED);
     // should not delete notification created by ttx7
     ttx6.done();
 
@@ -143,7 +137,7 @@ public class WeakNotificationOverlapIT extends ITBaseImpl {
     snap3.done();
 
     TestTransaction ttx8 = new TestTransaction(env, "1", STAT_CHANGED);
-    new TotalObserver().process(ttx8, Bytes.of("1"), STAT_CHANGED);
+    TOTAL_OBSERVER.process(ttx8, Bytes.of("1"), STAT_CHANGED);
     ttx8.done();
 
     Assert.assertEquals(0, countNotifications());
@@ -182,7 +176,7 @@ public class WeakNotificationOverlapIT extends ITBaseImpl {
 
     Assert.assertEquals(1, countNotifications());
 
-    new TotalObserver().process(ttx3, Bytes.of("1"), STAT_CHANGED);
+    TOTAL_OBSERVER.process(ttx3, Bytes.of("1"), STAT_CHANGED);
     ttx3.done();
 
     Assert.assertEquals(1, countNotifications());
@@ -191,7 +185,7 @@ public class WeakNotificationOverlapIT extends ITBaseImpl {
     }
 
     TestTransaction ttx4 = new TestTransaction(env, "1", STAT_CHANGED);
-    new TotalObserver().process(ttx4, Bytes.of("1"), STAT_CHANGED);
+    TOTAL_OBSERVER.process(ttx4, Bytes.of("1"), STAT_CHANGED);
     ttx4.done();
 
     Assert.assertEquals(0, countNotifications());

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
index 2406b82..9db0ce4 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
@@ -15,23 +15,19 @@
 
 package org.apache.fluo.integration.impl;
 
-import java.util.Collections;
-import java.util.List;
-
 import com.google.common.collect.Iterables;
 import org.apache.fluo.api.client.Snapshot;
 import org.apache.fluo.api.client.Transaction;
 import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.config.ObserverSpecification;
-import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.ObserverProvider;
+import org.apache.fluo.api.observer.StringObserver;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.impl.TransactionImpl.CommitData;
+import org.apache.fluo.core.observer.Observers;
 import org.apache.fluo.core.worker.NotificationFinder;
-import org.apache.fluo.core.worker.Observers;
 import org.apache.fluo.core.worker.finder.hash.HashNotificationFinder;
 import org.apache.fluo.integration.ITBaseMini;
 import org.apache.fluo.integration.TestTransaction;
@@ -39,6 +35,8 @@ import org.apache.fluo.mini.MiniFluoImpl;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
+
 /**
  * A simple test that added links between nodes in a graph. There is an observer that updates an
  * index of node degree.
@@ -50,21 +48,10 @@ public class WorkerIT extends ITBaseMini {
 
   private static Column observedColumn = LAST_UPDATE;
 
-  @Override
-  protected List<ObserverSpecification> getObservers() {
-    return Collections.singletonList(new ObserverSpecification(DegreeIndexer.class.getName()));
-  }
-
-  public static class DegreeIndexer implements Observer {
-
-    @Override
-    public void init(Context context) {}
+  public static class DegreeIndexer implements StringObserver {
 
     @Override
-    public void process(TransactionBase tx, Bytes rowBytes, Column col) throws Exception {
-
-      String row = rowBytes.toString();
-
+    public void process(TransactionBase tx, String row, Column col) throws Exception {
       // get previously calculated degree
       String degree = tx.gets(row, DEGREE);
 
@@ -84,14 +71,18 @@ public class WorkerIT extends ITBaseMini {
         tx.delete("IDEG" + degree, new Column("node", row));
       }
     }
+  }
 
+  public static class WorkerITObserverProvider implements ObserverProvider {
     @Override
-    public ObservedColumn getObservedColumn() {
-      return new ObservedColumn(observedColumn, NotificationType.STRONG);
+    public void provide(Registry or, Context ctx) {
+      or.register(observedColumn, STRONG, new DegreeIndexer());
     }
+  }
 
-    @Override
-    public void close() {}
+  @Override
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return WorkerITObserverProvider.class;
   }
 
   @Test
@@ -152,15 +143,16 @@ public class WorkerIT extends ITBaseMini {
   public void testDiffObserverConfig() throws Exception {
     observedColumn = new Column("attr2", "lastupdate");
     try {
-      try (Environment env = new Environment(config); Observers observers = new Observers(env)) {
-        observers.getObserver(LAST_UPDATE);
+      try (Environment env = new Environment(config);
+          Observers op = env.getConfiguredObservers().getObservers(env)) {
+        op.getObserver(LAST_UPDATE);
       }
 
       Assert.fail();
 
-    } catch (IllegalStateException ise) {
+    } catch (IllegalArgumentException ise) {
       Assert.assertTrue(ise.getMessage().contains(
-          "Mismatch between configured column and class column"));
+          "Column attr2 lastupdate  not previously configured for strong notifications"));
     } finally {
       observedColumn = LAST_UPDATE;
     }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
index 02a4fd3..01f04e4 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
@@ -18,7 +18,6 @@ package org.apache.fluo.integration.log;
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
 import java.util.Map;
 
 import com.google.common.collect.ImmutableMap;
@@ -32,14 +31,15 @@ import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.client.scanner.CellScanner;
 import org.apache.fluo.api.client.scanner.ColumnScanner;
 import org.apache.fluo.api.client.scanner.RowScanner;
-import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.ColumnValue;
 import org.apache.fluo.api.data.RowColumn;
 import org.apache.fluo.api.data.RowColumnValue;
 import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.ObserverProvider;
+import org.apache.fluo.api.observer.StringObserver;
 import org.apache.fluo.integration.ITBaseMini;
 import org.apache.fluo.integration.TestUtil;
 import org.apache.log4j.Level;
@@ -49,6 +49,8 @@ import org.apache.log4j.WriterAppender;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.fluo.api.observer.Observer.NotificationType.WEAK;
+
 public class LogIT extends ITBaseMini {
 
   private static final Column STAT_COUNT = new Column("stat", "count");
@@ -100,13 +102,7 @@ public class LogIT extends ITBaseMini {
     }
   }
 
-  public static class BinaryObserver extends AbstractObserver {
-
-    @Override
-    public ObservedColumn getObservedColumn() {
-      return new ObservedColumn(bCol2, NotificationType.WEAK);
-    }
-
+  public static class BinaryObserver implements Observer {
     @Override
     public void process(TransactionBase tx, Bytes row, Column col) {
       tx.get(bRow1, bCol2);
@@ -115,23 +111,24 @@ public class LogIT extends ITBaseMini {
     }
   }
 
-  public static class TestObserver extends AbstractObserver {
-
+  public static class TestObserver implements StringObserver {
     @Override
-    public ObservedColumn getObservedColumn() {
-      return new ObservedColumn(STAT_COUNT, NotificationType.WEAK);
+    public void process(TransactionBase tx, String row, Column col) {
+      TestUtil.increment(tx, "all", col, Integer.parseInt(tx.gets(row, col)));
     }
+  }
 
+  public static class LogItObserverProvider implements ObserverProvider {
     @Override
-    public void process(TransactionBase tx, Bytes row, Column col) {
-      TestUtil.increment(tx, "all", col, Integer.parseInt(tx.gets(row.toString(), col)));
+    public void provide(Registry or, Context ctx) {
+      or.registers(STAT_COUNT, WEAK, new TestObserver());
+      or.register(bCol2, WEAK, new BinaryObserver());
     }
   }
 
   @Override
-  protected List<ObserverSpecification> getObservers() {
-    return Arrays.asList(new ObserverSpecification(TestObserver.class.getName()),
-        new ObserverSpecification(BinaryObserver.class.getName()));
+  protected Class<? extends ObserverProvider> getObserverProviderClass() {
+    return LogItObserverProvider.class;
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/d6af386b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ef2eb78..9c0a694 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,6 +83,11 @@
         <version>1.32</version>
       </dependency>
       <dependency>
+        <groupId>com.google.code.gson</groupId>
+        <artifactId>gson</artifactId>
+        <version>2.8.0</version>
+      </dependency>
+      <dependency>
         <!-- Guava 13.0.1 is required by Twill (due to beta method usage) -->
         <groupId>com.google.guava</groupId>
         <artifactId>guava</artifactId>


Mime
View raw message