fluo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject incubator-fluo git commit: fixes #829 Changed API to allow giving observers an optional ID
Date Tue, 09 May 2017 15:17:15 GMT
Repository: incubator-fluo
Updated Branches:
  refs/heads/master a1c869ca5 -> ab7ce36c2


fixes #829 Changed API to allow giving observers an optional ID


Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo/commit/ab7ce36c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo/tree/ab7ce36c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo/diff/ab7ce36c

Branch: refs/heads/master
Commit: ab7ce36c2b801bf5291ed7209293dcffa31c914f
Parents: a1c869c
Author: Keith Turner <kturner@apache.org>
Authored: Fri May 5 18:48:04 2017 -0400
Committer: Keith Turner <kturner@apache.org>
Committed: Mon May 8 18:12:20 2017 -0400

----------------------------------------------------------------------
 docs/applications.md                            |  18 ++-
 .../java/org/apache/fluo/api/client/Loader.java |   3 +-
 .../apache/fluo/api/client/LoaderExecutor.java  |   9 ++
 .../api/observer/ColumnProviderRegistry.java    |  54 +++++++++
 .../fluo/api/observer/ObserverProvider.java     |  71 ++++++++----
 .../apache/fluo/core/async/CommitManager.java   |  15 ++-
 .../core/client/LoaderExecutorAsyncImpl.java    |  16 ++-
 .../java/org/apache/fluo/core/impl/TxStats.java |  23 ++--
 .../fluo/core/log/TracingTransaction.java       |  13 ++-
 .../apache/fluo/core/observer/ObserverUtil.java |   5 +
 .../apache/fluo/core/observer/Observers.java    |   2 +
 .../fluo/core/observer/v1/ObserversV1.java      |  20 ++++
 .../fluo/core/observer/v2/ObserverRegistry.java | 112 +++++++++++++++++++
 .../fluo/core/observer/v2/ObserversV2.java      |  61 ++--------
 .../java/org/apache/fluo/core/util/Hex.java     |   6 +-
 .../apache/fluo/core/worker/WorkTaskAsync.java  |   5 +-
 .../fluo/integration/impl/AppConfigIT.java      |   2 +-
 .../fluo/integration/impl/CollisionIT.java      |   2 +-
 .../apache/fluo/integration/impl/FailureIT.java |   2 +-
 .../apache/fluo/integration/impl/FluoIT.java    |   2 +-
 .../integration/impl/SelfNotificationIT.java    |   2 +-
 .../integration/impl/StrongNotificationIT.java  |   2 +-
 .../integration/impl/WeakNotificationIT.java    |   2 +-
 .../impl/WeakNotificationOverlapIT.java         |   2 +-
 .../apache/fluo/integration/impl/WorkerIT.java  |   2 +-
 .../org/apache/fluo/integration/log/LogIT.java  |   4 +-
 26 files changed, 332 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/docs/applications.md
----------------------------------------------------------------------
diff --git a/docs/applications.md b/docs/applications.md
index cbf12ba..45e3412 100644
--- a/docs/applications.md
+++ b/docs/applications.md
@@ -118,24 +118,22 @@ To create an observer, follow these steps:
     ```
 2.  Create a class that implements [ObserverProvider] like the example below.  The purpose
of this
     class is associate a set Observers with columns that trigger the observers.  The class
can
-    create multiple observers.
+    register multiple observers.
 
     ```java
     class AppObserverProvider implements ObserverProvider {
       @Override
       public void provide(Registry or, Context ctx) {
         //setup InvertObserver to be triggered when the column obs:data is modified
-        or.register(new Column("obs", "data"),
-                           NotificationType.STRONG,
-                           new InvertObserver());
+        or.forColumn(new Column("obs", "data"), NotificationType.STRONG)
+          .useObserver(new InvertObserver());
         
         //Observer is a Functional interface.  So Obsevers can be written as lambdas.
-        or.register(new Column("new","data"),
-                           NotificationType.WEAK,
-                           (tx,row,col) -> { 
-                             Bytes combined = combineNewAndOld(tx,row);
-                             tx.set(row, new Column("current","data"), combined);
-                           });
+        or.forColumn(new Column("new","data"), NotificationType.WEAK)
+          .useObserver((tx,row,col) -> {
+             Bytes combined = combineNewAndOld(tx,row);
+             tx.set(row, new Column("current","data"), combined);
+           });
       }
     }
     ```

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/api/src/main/java/org/apache/fluo/api/client/Loader.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/Loader.java b/modules/api/src/main/java/org/apache/fluo/api/client/Loader.java
index 903e2f6..b4512b9 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/Loader.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/Loader.java
@@ -24,6 +24,7 @@ import org.apache.fluo.api.metrics.MetricsReporter;
  *
  * @since 1.0.0
  */
+@FunctionalInterface
 public interface Loader {
 
   /**
@@ -37,7 +38,7 @@ public interface Loader {
     SimpleConfiguration getAppConfiguration();
 
     /**
-     * @return A {@link MetricsReporter} to report application metrics from this observer
+     * @return A {@link MetricsReporter} to report application metrics from this loader
      */
     MetricsReporter getMetricsReporter();
   }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java b/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java
index c06e072..a593d46 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/client/LoaderExecutor.java
@@ -30,6 +30,15 @@ public interface LoaderExecutor extends AutoCloseable {
   void execute(Loader loader);
 
   /**
+   * Same as {@link #execute(Loader)}, but allows specifing an identity. The identity is
used in
+   * metrics and trace logging. When an identity is not supplied, the class name is used.
In the
+   * case of lambdas the class name may not be the same in different processes.
+   * 
+   * @since 1.1.0
+   */
+  void execute(String identity, Loader loader);
+
+  /**
    * Waits for all queued and running Loader task to complete, then cleans up resources.
    */
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/api/src/main/java/org/apache/fluo/api/observer/ColumnProviderRegistry.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/observer/ColumnProviderRegistry.java
b/modules/api/src/main/java/org/apache/fluo/api/observer/ColumnProviderRegistry.java
new file mode 100644
index 0000000..2c2f772
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/fluo/api/observer/ColumnProviderRegistry.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.fluo.api.observer;
+
+import java.util.function.BiConsumer;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+import org.apache.fluo.api.observer.ObserverProvider.Registry.ObserverArgument;
+
+// Intentionally package private
+class ColumnProviderRegistry implements ObserverProvider.Registry.ObserverArgument,
+    ObserverProvider.Registry.IdentityOption {
+
+  private BiConsumer<Column, NotificationType> colRegistry;
+  private NotificationType nt;
+  private Column col;
+
+  ColumnProviderRegistry(Column col, NotificationType nt,
+      BiConsumer<Column, NotificationType> colRegistry) {
+    this.col = col;
+    this.nt = nt;
+    this.colRegistry = colRegistry;
+  }
+
+  @Override
+  public ObserverArgument withId(String alias) {
+    return this;
+  }
+
+  @Override
+  public void useObserver(Observer observer) {
+    colRegistry.accept(col, nt);
+  }
+
+  @Override
+  public void useStrObserver(StringObserver observer) {
+    useObserver(observer);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java
----------------------------------------------------------------------
diff --git a/modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java
b/modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java
index c172268..a4b42bb 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/observer/ObserverProvider.java
@@ -64,23 +64,59 @@ public interface ObserverProvider {
    * @since 1.1.0
    */
   interface Registry {
-    void register(Column observedColumn, NotificationType ntfyType, Observer observer);
 
     /**
-     * This method was created to allow Observers written as lambda to be passed {@link String}
-     * instead of {@link Bytes} for the row.
+     * The terminal part of a Fluent API for registering observers.
      * 
-     * <pre>
-     * <code>
-     *   void provide(ObserverRegistry or, Context ctx) {
-     *     or.registers(someColumn, WEAK, (tx,row,col) -> {
-     *      //row is of type String
-     *     };
-     *   }
-     * </code>
-     * </pre>
+     * @since 1.1.0
      */
-    void registers(Column observedColumn, NotificationType ntfyType, StringObserver observer);
+    interface ObserverArgument {
+
+      /**
+       * Calling this method registers the given observer using the parameters previously
passed to
+       * the Fluent API.
+       * 
+       */
+      void useObserver(Observer observer);
+
+      /**
+       * Calling this method registers the given observer using the parameters previously
passed to
+       * the Fluent API.
+       * 
+       * <p>
+       * This method was created to allow Observers written as lambda to be passed {@link
String}
+       * instead of {@link Bytes} for the row.
+       * 
+       * <pre>
+       * <code>
+       *   void provide(ObserverRegistry or, Context ctx) {
+       *     or.forColumn(someColumn, WEAK).useStrObserver((tx,row,col) -> {
+       *      //row is of type String
+       *     };
+       *   }
+       * </code>
+       * </pre>
+       */
+      void useStrObserver(StringObserver observer);
+    }
+
+    /**
+     * One part of a Fluent API for registering observers.
+     * 
+     * @since 1.1.0
+     */
+    interface IdentityOption extends ObserverArgument {
+      /**
+       * Optionally set the name used to identify the observer in logging and metrics. If
not set,
+       * the column name is used.
+       */
+      ObserverArgument withId(String identity);
+    }
+
+    /**
+     * A fluent entry point for registering an observer.
+     */
+    IdentityOption forColumn(Column observedColumn, NotificationType ntfyType);
   }
 
   /**
@@ -103,13 +139,8 @@ public interface ObserverProvider {
   default void provideColumns(BiConsumer<Column, NotificationType> colRegistry, Context
ctx) {
     Registry or = new Registry() {
       @Override
-      public void registers(Column oc, NotificationType nt, StringObserver obs) {
-        colRegistry.accept(oc, nt);
-      }
-
-      @Override
-      public void register(Column oc, NotificationType nt, Observer obs) {
-        colRegistry.accept(oc, nt);
+      public IdentityOption forColumn(Column observedColumn, NotificationType ntfyType) {
+        return new ColumnProviderRegistry(observedColumn, ntfyType, colRegistry);
       }
     };
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/core/src/main/java/org/apache/fluo/core/async/CommitManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/async/CommitManager.java b/modules/core/src/main/java/org/apache/fluo/core/async/CommitManager.java
index 403fb66..1b589cf 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/async/CommitManager.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/async/CommitManager.java
@@ -60,13 +60,13 @@ public class CommitManager {
     private final AsyncCommitObserver aco;
     private final int size;
     private final AtomicBoolean finished = new AtomicBoolean(false);
-    private final Class<?> txExecClass;
+    private final String alias;
 
     private void finish(TxResult status) {
       if (finished.compareAndSet(false, true)) {
         commitingTransactions.decrementAndGet();
         tx.getStats().setCommitFinishTime(System.currentTimeMillis());
-        tx.getStats().report(status.toString(), txExecClass);
+        tx.getStats().report(status.toString(), alias);
         memoryLimit.release(size);
         try {
           tx.close();
@@ -76,12 +76,11 @@ public class CommitManager {
       }
     }
 
-    public CQCommitObserver(AsyncTransaction tx, AsyncCommitObserver aco, Class<?>
txExecClass,
-        int size) {
+    public CQCommitObserver(AsyncTransaction tx, AsyncCommitObserver aco, String alias, int
size) {
       this.tx = tx;
       this.aco = aco;
       this.size = size;
-      this.txExecClass = txExecClass;
+      this.alias = alias;
     }
 
     @Override
@@ -123,15 +122,15 @@ public class CommitManager {
   }
 
 
-  public void beginCommit(AsyncTransaction tx, Class<?> txExecClass, AsyncCommitObserver
aco) {
+  public void beginCommit(AsyncTransaction tx, String alias, AsyncCommitObserver aco) {
     Objects.requireNonNull(tx);
-    Objects.requireNonNull(txExecClass);
+    Objects.requireNonNull(alias);
     Objects.requireNonNull(aco);
 
     int size = tx.getSize();
     memoryLimit.acquire(size);
     commitingTransactions.incrementAndGet();
-    CQCommitObserver myAco = new CQCommitObserver(tx, aco, txExecClass, size);
+    CQCommitObserver myAco = new CQCommitObserver(tx, aco, alias, size);
     tx.getStats().setCommitBeginTime(System.currentTimeMillis());
     tx.commitAsync(myAco);
   }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
b/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
index ad36fd2..916dcca 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/LoaderExecutorAsyncImpl.java
@@ -61,6 +61,7 @@ public class LoaderExecutorAsyncImpl implements LoaderExecutor {
     AsyncTransaction txi;
     Loader loader;
     private AtomicBoolean done = new AtomicBoolean(false);
+    private String identity;
 
     private void close() {
       txi = null;
@@ -75,7 +76,8 @@ public class LoaderExecutorAsyncImpl implements LoaderExecutor {
     }
 
 
-    public LoaderCommitObserver(Loader loader2) {
+    public LoaderCommitObserver(String alias, Loader loader2) {
+      this.identity = alias;
       this.loader = loader2;
     }
 
@@ -111,7 +113,7 @@ public class LoaderExecutorAsyncImpl implements LoaderExecutor {
       txi = new TransactionImpl(env);
 
       if (TracingTransaction.isTracingEnabled()) {
-        txi = new TracingTransaction(txi, loader.getClass());
+        txi = new TracingTransaction(txi, loader.getClass(), identity);
       }
 
       Loader.Context context = new Loader.Context() {
@@ -128,7 +130,7 @@ public class LoaderExecutorAsyncImpl implements LoaderExecutor {
 
       try {
         loader.load(txi, context);
-        env.getSharedResources().getCommitManager().beginCommit(txi, loader.getClass(), this);
+        env.getSharedResources().getCommitManager().beginCommit(txi, identity, this);
       } catch (Exception e) {
         setException(e);
         close();
@@ -183,6 +185,11 @@ public class LoaderExecutorAsyncImpl implements LoaderExecutor {
 
   @Override
   public void execute(Loader loader) {
+    execute(loader.getClass().getSimpleName(), loader);
+  }
+
+  @Override
+  public void execute(String alias, Loader loader) {
     if (exceptionRef.get() != null) {
       throw new RuntimeException("Previous failure", exceptionRef.get());
     }
@@ -199,7 +206,7 @@ public class LoaderExecutorAsyncImpl implements LoaderExecutor {
 
     try {
       commiting.increment();
-      executor.execute(new QueueReleaseRunnable(new LoaderCommitObserver(loader)));
+      executor.execute(new QueueReleaseRunnable(new LoaderCommitObserver(alias, loader)));
     } catch (RejectedExecutionException rje) {
       semaphore.release();
       commiting.decrement();
@@ -235,5 +242,4 @@ public class LoaderExecutorAsyncImpl implements LoaderExecutor {
       env.getSharedResources().getBatchWriter().waitForAsyncFlush();
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/core/src/main/java/org/apache/fluo/core/impl/TxStats.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TxStats.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TxStats.java
index f1ae480..030014c 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TxStats.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TxStats.java
@@ -142,29 +142,28 @@ public class TxStats {
     timedOutLocks += amt;
   }
 
-  public void report(String status, Class<?> execClass) {
+  public void report(String status, String alias) {
     MetricNames names = env.getMetricNames();
     MetricRegistry registry = env.getSharedResources().getMetricRegistry();
-    String sn = execClass.getSimpleName();
     if (getLockWaitTime() > 0) {
-      MetricsUtil.getTimer(env.getConfiguration(), registry, names.getTxLockWaitTime(sn)).update(
-          getLockWaitTime(), TimeUnit.MILLISECONDS);
+      MetricsUtil.getTimer(env.getConfiguration(), registry, names.getTxLockWaitTime(alias))
+          .update(getLockWaitTime(), TimeUnit.MILLISECONDS);
     }
-    MetricsUtil.getTimer(env.getConfiguration(), registry, names.getTxExecTime(sn)).update(
+    MetricsUtil.getTimer(env.getConfiguration(), registry, names.getTxExecTime(alias)).update(
         getReadTime(), TimeUnit.MILLISECONDS);
     if (getCollisions() > 0) {
-      registry.meter(names.getTxWithCollision(sn)).mark();
-      registry.meter(names.getTxCollisions(sn)).mark(getCollisions());
+      registry.meter(names.getTxWithCollision(alias)).mark();
+      registry.meter(names.getTxCollisions(alias)).mark(getCollisions());
     }
-    registry.meter(names.getTxEntriesSet(sn)).mark(getEntriesSet());
-    registry.meter(names.getTxEntriesRead(sn)).mark(getEntriesReturned());
+    registry.meter(names.getTxEntriesSet(alias)).mark(getEntriesSet());
+    registry.meter(names.getTxEntriesRead(alias)).mark(getEntriesReturned());
     if (getTimedOutLocks() > 0) {
-      registry.meter(names.getTxLocksTimedout(sn)).mark(getTimedOutLocks());
+      registry.meter(names.getTxLocksTimedout(alias)).mark(getTimedOutLocks());
     }
     if (getDeadLocks() > 0) {
-      registry.meter(names.getTxLocksDead(sn)).mark(getDeadLocks());
+      registry.meter(names.getTxLocksDead(alias)).mark(getDeadLocks());
     }
-    registry.meter(names.getTxStatus(status.toLowerCase(), sn)).mark();
+    registry.meter(names.getTxStatus(status.toLowerCase(), alias)).mark();
   }
 
   public void setCommitBeginTime(long t) {

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java b/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
index 646113f..62f61a1 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/log/TracingTransaction.java
@@ -61,11 +61,11 @@ public class TracingTransaction extends AbstractTransactionBase implements
Async
   }
 
   public TracingTransaction(AsyncTransaction tx) {
-    this(tx, null, null);
+    this(tx, null, null, null);
   }
 
-  public TracingTransaction(AsyncTransaction tx, Class<?> clazz) {
-    this(tx, null, clazz);
+  public TracingTransaction(AsyncTransaction tx, Class<?> clazz, String txExecId) {
+    this(tx, null, clazz, txExecId);
   }
 
   private String encB(Collection<Bytes> columns) {
@@ -96,7 +96,8 @@ public class TracingTransaction extends AbstractTransactionBase implements
Async
         + "=" + enc(e.getValue())));
   }
 
-  public TracingTransaction(AsyncTransaction tx, Notification notification, Class<?>
clazz) {
+  public TracingTransaction(AsyncTransaction tx, Notification notification, Class<?>
clazz,
+      String txExecId) {
     this.tx = tx;
     this.txid = tx.getStartTimestamp();
 
@@ -114,6 +115,10 @@ public class TracingTransaction extends AbstractTransactionBase implements
Async
       if (clazz != null) {
         log.trace("txid: {} class: {}", txid, clazz.getName());
       }
+
+      if (txExecId != null) {
+        log.trace("txid: {} identity: {}", txid, txExecId);
+      }
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverUtil.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverUtil.java b/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverUtil.java
index 0e2f94a..b311282 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/ObserverUtil.java
@@ -90,6 +90,11 @@ public class ObserverUtil {
             public Observer getObserver(Column col) {
               throw new UnsupportedOperationException();
             }
+
+            @Override
+            public String getObserverId(Column col) {
+              throw new UnsupportedOperationException();
+            }
           };
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/core/src/main/java/org/apache/fluo/core/observer/Observers.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/Observers.java b/modules/core/src/main/java/org/apache/fluo/core/observer/Observers.java
index d4cc366..09576f0 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/observer/Observers.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/Observers.java
@@ -19,6 +19,8 @@ import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.observer.Observer;
 
 public interface Observers extends AutoCloseable {
+  String getObserverId(Column col);
+
   Observer getObserver(Column col);
 
   void returnObserver(Observer o);

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
index 65c1b2f..f75377e 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v1/ObserversV1.java
@@ -19,7 +19,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
+import com.google.common.collect.Iterables;
 import org.apache.fluo.api.config.ObserverSpecification;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.observer.Observer;
@@ -37,6 +39,7 @@ class ObserversV1 implements Observers {
   Map<Column, List<Observer>> observers = new HashMap<>();
   Map<Column, ObserverSpecification> strongObservers;
   Map<Column, ObserverSpecification> weakObservers;
+  Map<Column, String> aliases;
 
   private List<Observer> getObserverList(Column col) {
     List<Observer> observerList;
@@ -55,6 +58,19 @@ class ObserversV1 implements Observers {
     this.env = env;
     this.strongObservers = strongObservers;
     this.weakObservers = weakObservers;
+    this.aliases = new HashMap<>();
+
+    for (Entry<Column, ObserverSpecification> e : Iterables.concat(strongObservers.entrySet(),
+        weakObservers.entrySet())) {
+      ObserverSpecification observerConfig = e.getValue();
+      try {
+        String alias =
+            Class.forName(observerConfig.getClassName()).asSubclass(Observer.class).getSimpleName();
+        aliases.put(e.getKey(), alias);
+      } catch (ClassNotFoundException e1) {
+        throw new RuntimeException(e1);
+      }
+    }
   }
 
   public Observer getObserver(Column col) {
@@ -127,4 +143,8 @@ class ObserversV1 implements Observers {
     observers = null;
   }
 
+  @Override
+  public String getObserverId(Column col) {
+    return aliases.get(col);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverRegistry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverRegistry.java
b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverRegistry.java
new file mode 100644
index 0000000..979b867
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserverRegistry.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.fluo.core.observer.v2;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.Observer;
+import org.apache.fluo.api.observer.Observer.NotificationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.fluo.api.observer.ObserverProvider;
+import org.apache.fluo.api.observer.StringObserver;
+
+public class ObserverRegistry implements ObserverProvider.Registry {
+
+  private static final Logger log = LoggerFactory.getLogger(ObserverRegistry.class);
+
+  Map<Column, Observer> observers;
+  Map<Column, String> aliases;
+  private Set<Column> strongColumns;
+  private Set<Column> weakColumns;
+
+  private class FluentRegistration implements ObserverProvider.Registry.IdentityOption,
+      ObserverProvider.Registry.ObserverArgument {
+
+    private Column col;
+    private NotificationType ntfyType;
+    private String alias;
+
+    FluentRegistration(Column col, NotificationType ntfyType) {
+      this.col = col;
+      this.ntfyType = ntfyType;
+    }
+
+    @Override
+    public void useObserver(Observer observer) {
+      register(col, ntfyType, alias, observer);
+    }
+
+    @Override
+    public void useStrObserver(StringObserver observer) {
+      register(col, ntfyType, alias, observer);
+    }
+
+    @Override
+    public ObserverArgument withId(String alias) {
+      this.alias = alias;
+      return this;
+    }
+  }
+
+  ObserverRegistry(Set<Column> strongColumns, Set<Column> weakColumns) {
+    this.observers = new HashMap<>();
+    this.aliases = new HashMap<>();
+    this.strongColumns = strongColumns;
+    this.weakColumns = weakColumns;
+  }
+
+  @Override
+  public IdentityOption forColumn(Column observedColumn, NotificationType ntfyType) {
+    return new FluentRegistration(observedColumn, ntfyType);
+  }
+
+  private void register(Column col, NotificationType nt, String alias, Observer obs) {
+    try {
+      Method closeMethod = obs.getClass().getMethod("close");
+      if (!closeMethod.getDeclaringClass().equals(Observer.class)) {
+        log.warn(
+            "Observer {} implements close().  Close is not called on Observers registered
using ObserverProvider."
+                + " Close is only called on Observers configured the old way.", obs.getClass()
+                .getName());
+      }
+    } catch (NoSuchMethodException | SecurityException e) {
+      throw new RuntimeException("Failed to check if close() is implemented", e);
+    }
+
+    if (nt == NotificationType.STRONG && !strongColumns.contains(col)) {
+      throw new IllegalArgumentException("Column " + col
+          + " not previously configured for strong notifications");
+    }
+
+    if (nt == NotificationType.WEAK && !weakColumns.contains(col)) {
+      throw new IllegalArgumentException("Column " + col
+          + " not previously configured for weak notifications");
+    }
+
+    if (observers.containsKey(col)) {
+      throw new IllegalArgumentException("Duplicate observed column " + col);
+    }
+
+    observers.put(col, obs);
+    aliases.put(col, alias);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserversV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserversV2.java
b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserversV2.java
index d9d4a97..22b4fab 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserversV2.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/observer/v2/ObserversV2.java
@@ -15,8 +15,6 @@
 
 package org.apache.fluo.core.observer.v2;
 
-import java.lang.reflect.Method;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
@@ -25,71 +23,31 @@ import com.google.common.collect.Sets.SetView;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.exceptions.FluoException;
 import org.apache.fluo.api.observer.Observer;
-import org.apache.fluo.api.observer.Observer.NotificationType;
 import org.apache.fluo.api.observer.ObserverProvider;
-import org.apache.fluo.api.observer.ObserverProvider.Registry;
-import org.apache.fluo.api.observer.StringObserver;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.observer.Observers;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.fluo.core.util.Hex;
 
 class ObserversV2 implements Observers {
 
-  private static final Logger log = LoggerFactory.getLogger(ObserversV2.class);
-
   Map<Column, Observer> observers;
+  Map<Column, String> aliases;
 
   public ObserversV2(Environment env, JsonObservers jco, Set<Column> strongColumns,
       Set<Column> weakColumns) {
-    observers = new HashMap<>();
 
     ObserverProvider obsProvider =
         ObserverStoreV2.newObserverProvider(jco.getObserverProviderClass());
 
     ObserverProviderContextImpl ctx = new ObserverProviderContextImpl(env);
 
-    Registry or = new Registry() {
-
-      @Override
-      public void register(Column col, NotificationType nt, Observer obs) {
-        try {
-          Method closeMethod = obs.getClass().getMethod("close");
-          if (!closeMethod.getDeclaringClass().equals(Observer.class)) {
-            log.warn(
-                "Observer {} implements close().  Close is not called on Observers created
using ObserverProvider."
-                    + " Close is only called on Observers configured the old way.", obs.getClass()
-                    .getName());
-          }
-        } catch (NoSuchMethodException | SecurityException e) {
-          throw new RuntimeException("Failed to check if close() is implemented", e);
-        }
-
-        if (nt == NotificationType.STRONG && !strongColumns.contains(col)) {
-          throw new IllegalArgumentException("Column " + col
-              + " not previously configured for strong notifications");
-        }
-
-        if (nt == NotificationType.WEAK && !weakColumns.contains(col)) {
-          throw new IllegalArgumentException("Column " + col
-              + " not previously configured for weak notifications");
-        }
-
-        if (observers.containsKey(col)) {
-          throw new IllegalArgumentException("Duplicate observed column " + col);
-        }
-
-        observers.put(col, obs);
-      }
-
-      @Override
-      public void registers(Column col, NotificationType nt, StringObserver obs) {
-        register(col, nt, obs);
-      }
-    };
-
+    ObserverRegistry or = new ObserverRegistry(strongColumns, weakColumns);
     obsProvider.provide(or, ctx);
 
+    this.observers = or.observers;
+    this.aliases = or.aliases;
+    this.observers.forEach((k, v) -> aliases.computeIfAbsent(k, col -> Hex.encNonAscii(col,
":")));
+
     // the following check ensures observers are provided for all previously configured columns
     SetView<Column> diff =
         Sets.difference(observers.keySet(), Sets.union(strongColumns, weakColumns));
@@ -109,4 +67,9 @@ class ObserversV2 implements Observers {
 
   @Override
   public void close() {}
+
+  @Override
+  public String getObserverId(Column col) {
+    return aliases.get(col);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/core/src/main/java/org/apache/fluo/core/util/Hex.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/Hex.java b/modules/core/src/main/java/org/apache/fluo/core/util/Hex.java
index d2c1bd7..fefa2a5 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/Hex.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/Hex.java
@@ -56,8 +56,12 @@ public class Hex {
   }
 
   public static String encNonAscii(Column col) {
+    return encNonAscii(col, " ");
+  }
+
+  public static String encNonAscii(Column col, String sep) {
     StringBuilder sb = new StringBuilder();
-    encNonAscii(sb, col, " ");
+    encNonAscii(sb, col, sep);
     return sb.toString();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java b/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
index ff582ed..76c6c66 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/worker/WorkTaskAsync.java
@@ -77,17 +77,18 @@ public class WorkTaskAsync implements Runnable {
   @Override
   public void run() {
     Observer observer = observers.getObserver(notification.getColumn());
+    String observerId = observers.getObserverId(notification.getColumn());
     try {
       AsyncTransaction atx = new TransactionImpl(env, notification);
 
       if (TracingTransaction.isTracingEnabled()) {
-        atx = new TracingTransaction(atx, notification, observer.getClass());
+        atx = new TracingTransaction(atx, notification, observer.getClass(), observerId);
       }
 
       observer.process(atx, notification.getRow(), notification.getColumn());
 
       CommitManager commitManager = env.getSharedResources().getCommitManager();
-      commitManager.beginCommit(atx, observer.getClass(), new WorkTaskCommitObserver());
+      commitManager.beginCommit(atx, observerId, new WorkTaskCommitObserver());
 
     } catch (Exception e) {
       log.error("Failed to process work " + Hex.encNonAscii(notification), e);

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java
b/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java
index 7c48cc3..809eba7 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/AppConfigIT.java
@@ -99,7 +99,7 @@ public class AppConfigIT extends ITBaseMini {
     public void provide(Registry or, Context ctx) {
       int limit = ctx.getAppConfiguration().getInt("myapp.sizeLimit");
 
-      or.registers(DF_COL, STRONG, (tx, row, col) -> {
+      or.forColumn(DF_COL, STRONG).useStrObserver((tx, row, col) -> {
         int d = Integer.parseInt(tx.gets(row, col));
         if (2 * d < limit) {
           tx.set(row.toString(), DB_COL, Integer.toString(2 * d));

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
b/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
index f927102..e8e7aac 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
@@ -80,7 +80,7 @@ public class CollisionIT extends ITBaseMini {
   public static class CollisionObserverProvider implements ObserverProvider {
     @Override
     public void provide(Registry or, Context ctx) {
-      or.registers(STAT_CHANGED, NotificationType.WEAK, (tx, row, col) -> {
+      or.forColumn(STAT_CHANGED, NotificationType.WEAK).useStrObserver((tx, row, col) ->
{
         int total = Integer.parseInt(tx.gets(row, STAT_TOTAL));
         int processed = TestUtil.getOrDefault(tx, row, STAT_PROCESSED, 0);
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java
b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java
index d86f2ef..e18d72a 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FailureIT.java
@@ -68,7 +68,7 @@ public class FailureIT extends ITBaseImpl {
   public static class FailuresObserverProvider implements ObserverProvider {
     @Override
     public void provide(Registry or, Context ctx) {
-      or.register(new Column("attr", "lastupdate"), STRONG, new NullObserver());
+      or.forColumn(new Column("attr", "lastupdate"), STRONG).useObserver(new NullObserver());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
index 288bed9..0265d0e 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/FluoIT.java
@@ -53,7 +53,7 @@ public class FluoIT extends ITBaseImpl {
   public static class FluoITObserverProvider implements ObserverProvider {
     @Override
     public void provide(Registry or, Context ctx) {
-      or.register(BALANCE, NotificationType.STRONG, (tx, row, col) -> {
+      or.forColumn(BALANCE, NotificationType.STRONG).useObserver((tx, row, col) -> {
         Assert.fail();
       });
     }

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
b/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
index 5381952..345bde2 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/SelfNotificationIT.java
@@ -70,7 +70,7 @@ public class SelfNotificationIT extends ITBaseMini {
   public static class SelfNtfyObserverProvider implements ObserverProvider {
     @Override
     public void provide(Registry or, Context ctx) {
-      or.register(EXPORT_COUNT_COL, STRONG, new ExportingObserver());
+      or.forColumn(EXPORT_COUNT_COL, STRONG).useObserver(new ExportingObserver());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java
b/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java
index ce002cb..820f48c 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/StrongNotificationIT.java
@@ -37,7 +37,7 @@ public class StrongNotificationIT extends ITBaseMini {
   public static class StrongNtfyObserverProvider implements ObserverProvider {
     @Override
     public void provide(Registry or, Context ctx) {
-      or.register(OC, STRONG, (tx, row, col) -> {
+      or.forColumn(OC, STRONG).useObserver((tx, row, col) -> {
         Bytes v = tx.get(row, col);
         tx.set(v, RC, row);
       });

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
index 2bd4ce9..52803ef 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationIT.java
@@ -64,7 +64,7 @@ public class WeakNotificationIT extends ITBaseMini {
   public static class WeakNotificationITObserverProvider implements ObserverProvider {
     @Override
     public void provide(Registry or, Context ctx) {
-      or.register(STAT_CHECK, WEAK, new SimpleObserver());
+      or.forColumn(STAT_CHECK, WEAK).useObserver(new SimpleObserver());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
index 9dcf6dd..0711c72 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WeakNotificationOverlapIT.java
@@ -58,7 +58,7 @@ public class WeakNotificationOverlapIT extends ITBaseImpl {
   public static class WeakNtfyObserverProvider implements ObserverProvider {
     @Override
     public void provide(Registry or, Context ctx) {
-      or.registers(STAT_CHANGED, WEAK, TOTAL_OBSERVER);
+      or.forColumn(STAT_CHANGED, WEAK).useStrObserver(TOTAL_OBSERVER);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
index 66791b5..cd2584a 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/WorkerIT.java
@@ -76,7 +76,7 @@ public class WorkerIT extends ITBaseMini {
   public static class WorkerITObserverProvider implements ObserverProvider {
     @Override
     public void provide(Registry or, Context ctx) {
-      or.register(observedColumn, STRONG, new DegreeIndexer());
+      or.forColumn(observedColumn, STRONG).useObserver(new DegreeIndexer());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/ab7ce36c/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
----------------------------------------------------------------------
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
index 01f04e4..7e1e279 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/log/LogIT.java
@@ -121,8 +121,8 @@ public class LogIT extends ITBaseMini {
   public static class LogItObserverProvider implements ObserverProvider {
     @Override
     public void provide(Registry or, Context ctx) {
-      or.registers(STAT_COUNT, WEAK, new TestObserver());
-      or.register(bCol2, WEAK, new BinaryObserver());
+      or.forColumn(STAT_COUNT, WEAK).useStrObserver(new TestObserver());
+      or.forColumn(bCol2, WEAK).useObserver(new BinaryObserver());
     }
   }
 



Mime
View raw message