incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [03/51] [partial] Fixed BLUR-126.
Date Thu, 06 Jun 2013 18:57:35 GMT
http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-util/src/main/java/org/apache/blur/CachedMap.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/CachedMap.java b/blur-util/src/main/java/org/apache/blur/CachedMap.java
new file mode 100644
index 0000000..80b1bf8
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/CachedMap.java
@@ -0,0 +1,63 @@
+package org.apache.blur;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.Map;
+
+public abstract class CachedMap {
+
+  /**
+   * Clears the in memory cache of the map, this forces a re-read from the
+   * source.
+   */
+  public abstract void clearCache() throws IOException;
+
+  /**
+   * Fetches the value by key, if the in memory cache is missing the value then
+   * re-read from source if missing from source return null.
+   * 
+   * @param key
+   *          the key.
+   * @return the value.
+   * @throws IOException
+   */
+  public abstract String get(String key) throws IOException;
+
+  /**
+   * Puts the value with the given key into the map if the key was missing.
+   * Returns true if the key with the given value was set otherwise false if a
+   * key already existed.
+   * 
+   * @param key
+   *          the key.
+   * @param value
+   *          the value.
+   * @return boolean true is successful, false if not.
+   */
+  public abstract boolean putIfMissing(String key, String value) throws IOException;
+
+  /**
+   * Fetches all the keys and values for the map from the source. That means
+   * this an expensive operation and should be used sparingly.
+   * 
+   * @return the map of all keys to values.
+   * @throws IOException 
+   */
+  public abstract Map<String, String> fetchAllFromSource() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-util/src/main/java/org/apache/blur/FindJavaHome.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/FindJavaHome.java b/blur-util/src/main/java/org/apache/blur/FindJavaHome.java
new file mode 100644
index 0000000..ad14164
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/FindJavaHome.java
@@ -0,0 +1,8 @@
+package org.apache.blur;
+
+
+public class FindJavaHome {
+  public static void main(String[] args) {
+    System.out.println(System.getProperty("java.home"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-util/src/main/java/org/apache/blur/concurrent/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/concurrent/ExecutionContext.java b/blur-util/src/main/java/org/apache/blur/concurrent/ExecutionContext.java
new file mode 100644
index 0000000..96365d2
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/concurrent/ExecutionContext.java
@@ -0,0 +1,61 @@
+package org.apache.blur.concurrent;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ExecutionContext {
+
+  public static class RecordTime {
+    public long _now;
+    public Enum<?> _e;
+    public long _timeNs;
+    public int _call;
+    public Object[] _args;
+
+    public RecordTime(Enum<?> e, long now, long start, int call, Object[] args) {
+      _e = e;
+      _timeNs = now - start;
+      _call = call;
+      _now = now;
+      _args = args;
+    }
+
+    @Override
+    public String toString() {
+      return "RecordTime [_call=" + _call + ", _e=" + _e + ", _now=" + _now + ", _timeNs=" + _timeNs + "]";
+    }
+  }
+
+  private List<RecordTime> _times = new ArrayList<RecordTime>();
+  private AtomicInteger _callCount = new AtomicInteger();
+
+  public void recordTime(Enum<?> e, long start, Object... args) {
+    _times.add(new RecordTime(e, System.nanoTime(), start, _callCount.incrementAndGet(), args));
+  }
+
+  public long startTime() {
+    return System.nanoTime();
+  }
+
+  public List<RecordTime> getTimes() {
+    return _times;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java b/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java
new file mode 100644
index 0000000..f6b8fb2
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/concurrent/Executors.java
@@ -0,0 +1,66 @@
+package org.apache.blur.concurrent;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class Executors {
+
+  public static ExecutorService newThreadPool(String prefix, int threadCount) {
+    return newThreadPool(prefix, threadCount, true);
+  }
+
+  public static ExecutorService newThreadPool(String prefix, int threadCount, boolean watch) {
+    ThreadPoolExecutor executorService = new ThreadPoolExecutor(threadCount, threadCount, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new BlurThreadFactory(prefix));
+    executorService.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+    if (watch) {
+      return ThreadWatcher.instance().watch(executorService);
+    }
+    return executorService;
+  }
+
+  public static ExecutorService newSingleThreadExecutor(String prefix) {
+    return java.util.concurrent.Executors.newSingleThreadExecutor(new BlurThreadFactory(prefix));
+  }
+
+  public static class BlurThreadFactory implements ThreadFactory {
+    private AtomicInteger threadNumber = new AtomicInteger(0);
+    private String prefix;
+
+    public BlurThreadFactory(String prefix) {
+      this.prefix = prefix;
+    }
+
+    public Thread newThread(Runnable r) {
+      Thread t = new Thread(r);
+      t.setName(prefix + threadNumber.getAndIncrement());
+      if (t.isDaemon()) {
+        t.setDaemon(false);
+      }
+      return t;
+    }
+  }
+
+  private Executors() {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-util/src/main/java/org/apache/blur/concurrent/SimpleUncaughtExceptionHandler.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/concurrent/SimpleUncaughtExceptionHandler.java b/blur-util/src/main/java/org/apache/blur/concurrent/SimpleUncaughtExceptionHandler.java
new file mode 100644
index 0000000..ccdd556
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/concurrent/SimpleUncaughtExceptionHandler.java
@@ -0,0 +1,30 @@
+package org.apache.blur.concurrent;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+
+public class SimpleUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+
+  private static final Log LOG = LogFactory.getLog(SimpleUncaughtExceptionHandler.class);
+
+  @Override
+  public void uncaughtException(Thread t, Throwable e) {
+    LOG.error("Unknown error in thread [{0}]", e, t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-util/src/main/java/org/apache/blur/concurrent/ThreadWatcher.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/concurrent/ThreadWatcher.java b/blur-util/src/main/java/org/apache/blur/concurrent/ThreadWatcher.java
new file mode 100644
index 0000000..2a2ff7e
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/concurrent/ThreadWatcher.java
@@ -0,0 +1,219 @@
+package org.apache.blur.concurrent;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+
+
+public class ThreadWatcher {
+
+  private static final Log LOG = LogFactory.getLog(ThreadWatcher.class);
+  private static ThreadWatcher _instance;
+
+  static class Watch {
+    public Watch(Thread thread) {
+      _thread = thread;
+    }
+
+    Thread _thread;
+    final long _start = System.currentTimeMillis();
+  }
+
+  private ConcurrentMap<Thread, Watch> _threads = new ConcurrentHashMap<Thread, Watch>();
+  private Timer _timer;
+
+  private ThreadWatcher() {
+  }
+
+  public void watch(Thread thread) {
+    _threads.put(thread, new Watch(thread));
+  }
+
+  public void release(Thread thread) {
+    _threads.remove(thread);
+  }
+
+  public ExecutorService watch(ExecutorService executorService) {
+    return new ThreadWatcherExecutorService(executorService, this);
+  }
+
+  public void init() {
+    _timer = new Timer("Thread-Watcher", true);
+    _timer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        try {
+          processRunningThreads();
+        } catch (Throwable t) {
+          LOG.error("Unknown error", t);
+        }
+      }
+    }, TimeUnit.SECONDS.toMillis(5), TimeUnit.SECONDS.toMillis(5));
+  }
+
+  private void processRunningThreads() {
+    for (Entry<Thread, Watch> entry : _threads.entrySet()) {
+      processWatch(entry.getValue());
+    }
+  }
+
+  private void processWatch(Watch watch) {
+    if (hasBeenExecutingLongerThan(TimeUnit.SECONDS.toMillis(5), watch)) {
+      long now = System.currentTimeMillis();
+      LOG.info("Thread [{0}] has been executing for [{1} ms]", watch._thread, now - watch._start);
+    }
+  }
+
+  private boolean hasBeenExecutingLongerThan(long period, Watch watch) {
+    if (watch._start + period < System.currentTimeMillis()) {
+      return true;
+    }
+    return false;
+  }
+
+  public void close() {
+    _timer.cancel();
+    _timer.purge();
+    _threads.clear();
+  }
+
+  public static class ThreadWatcherExecutorService implements ExecutorService {
+
+    private ExecutorService _executorService;
+    private ThreadWatcher _threadWatcher;
+
+    public ThreadWatcherExecutorService(ExecutorService executorService, ThreadWatcher threadWatcher) {
+      _executorService = executorService;
+      _threadWatcher = threadWatcher;
+    }
+
+    private Runnable wrap(final Runnable runnable) {
+      return new Runnable() {
+        @Override
+        public void run() {
+          Thread thread = Thread.currentThread();
+          _threadWatcher.watch(thread);
+          try {
+            runnable.run();
+          } finally {
+            _threadWatcher.release(thread);
+          }
+        }
+      };
+    }
+
+    private <T> Collection<? extends Callable<T>> wrapCallableCollection(Collection<? extends Callable<T>> tasks) {
+      List<Callable<T>> result = new ArrayList<Callable<T>>(tasks.size());
+      for (Callable<T> callable : tasks) {
+        result.add(wrapCallable(callable));
+      }
+      return result;
+    }
+
+    private <T> Callable<T> wrapCallable(final Callable<T> task) {
+      return new Callable<T>() {
+        @Override
+        public T call() throws Exception {
+          Thread thread = Thread.currentThread();
+          _threadWatcher.watch(thread);
+          try {
+            return task.call();
+          } finally {
+            _threadWatcher.release(thread);
+          }
+        }
+      };
+    }
+
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+      return _executorService.awaitTermination(timeout, unit);
+    }
+
+    public void execute(Runnable command) {
+      _executorService.execute(wrap(command));
+    }
+
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+      return _executorService.invokeAll(wrapCallableCollection(tasks), timeout, unit);
+    }
+
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+      return _executorService.invokeAll(wrapCallableCollection(tasks));
+    }
+
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+      return _executorService.invokeAny(wrapCallableCollection(tasks), timeout, unit);
+    }
+
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+      return _executorService.invokeAny(wrapCallableCollection(tasks));
+    }
+
+    public boolean isShutdown() {
+      return _executorService.isShutdown();
+    }
+
+    public boolean isTerminated() {
+      return _executorService.isTerminated();
+    }
+
+    public void shutdown() {
+      _executorService.shutdown();
+    }
+
+    public List<Runnable> shutdownNow() {
+      return _executorService.shutdownNow();
+    }
+
+    public <T> Future<T> submit(Callable<T> task) {
+      return _executorService.submit(wrapCallable(task));
+    }
+
+    public <T> Future<T> submit(Runnable task, T result) {
+      return _executorService.submit(wrap(task), result);
+    }
+
+    public Future<?> submit(Runnable task) {
+      return _executorService.submit(wrap(task));
+    }
+  }
+
+  public synchronized static ThreadWatcher instance() {
+    if (_instance == null) {
+      _instance = new ThreadWatcher();
+      _instance.init();
+    }
+    return _instance;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-util/src/main/java/org/apache/blur/log/Log.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/log/Log.java b/blur-util/src/main/java/org/apache/blur/log/Log.java
new file mode 100644
index 0000000..49cd1e1
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/log/Log.java
@@ -0,0 +1,45 @@
+package org.apache.blur.log;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+public interface Log extends org.apache.commons.logging.Log {
+
+  public void trace(Object message, Object... args);
+
+  public void trace(Object message, Throwable t, Object... args);
+
+  public void debug(Object message, Object... args);
+
+  public void debug(Object message, Throwable t, Object... args);
+
+  public void info(Object message, Object... args);
+
+  public void info(Object message, Throwable t, Object... args);
+
+  public void warn(Object message, Object... args);
+
+  public void warn(Object message, Throwable t, Object... args);
+
+  public void error(Object message, Object... args);
+
+  public void error(Object message, Throwable t, Object... args);
+
+  public void fatal(Object message, Object... args);
+
+  public void fatal(Object message, Throwable t, Object... args);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-util/src/main/java/org/apache/blur/log/LogFactory.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/log/LogFactory.java b/blur-util/src/main/java/org/apache/blur/log/LogFactory.java
new file mode 100644
index 0000000..816191f
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/log/LogFactory.java
@@ -0,0 +1,29 @@
+package org.apache.blur.log;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+public class LogFactory {
+
+  public static Log getLog(Class<?> c) {
+    return new LogImpl(org.apache.commons.logging.LogFactory.getLog(c));
+  }
+
+  public static Log getLog(String name) {
+    return new LogImpl(org.apache.commons.logging.LogFactory.getLog(name));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-util/src/main/java/org/apache/blur/log/LogImpl.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/log/LogImpl.java b/blur-util/src/main/java/org/apache/blur/log/LogImpl.java
new file mode 100644
index 0000000..b086640
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/log/LogImpl.java
@@ -0,0 +1,173 @@
+package org.apache.blur.log;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.text.MessageFormat;
+
+public class LogImpl implements Log {
+
+  private org.apache.commons.logging.Log log;
+
+  public LogImpl(org.apache.commons.logging.Log log) {
+    this.log = log;
+  }
+
+  public void trace(Object message, Object... args) {
+    if (isTraceEnabled()) {
+      log.trace(MessageFormat.format(message.toString(), args));
+    }
+  }
+
+  public void trace(Object message, Throwable t, Object... args) {
+    if (isTraceEnabled()) {
+      log.trace(MessageFormat.format(message.toString(), args), t);
+    }
+  }
+
+  public void debug(Object message, Object... args) {
+    if (isDebugEnabled()) {
+      log.debug(MessageFormat.format(message.toString(), args));
+    }
+  }
+
+  public void debug(Object message, Throwable t, Object... args) {
+    if (isDebugEnabled()) {
+      log.debug(MessageFormat.format(message.toString(), args), t);
+    }
+  }
+
+  public void info(Object message, Object... args) {
+    if (isInfoEnabled()) {
+      log.info(MessageFormat.format(message.toString(), args));
+    }
+  }
+
+  public void info(Object message, Throwable t, Object... args) {
+    if (isInfoEnabled()) {
+      log.info(MessageFormat.format(message.toString(), args), t);
+    }
+  }
+
+  public void warn(Object message, Object... args) {
+    if (isWarnEnabled()) {
+      log.warn(MessageFormat.format(message.toString(), args));
+    }
+  }
+
+  public void warn(Object message, Throwable t, Object... args) {
+    if (isWarnEnabled()) {
+      log.warn(MessageFormat.format(message.toString(), args), t);
+    }
+  }
+
+  public void error(Object message, Object... args) {
+    if (isErrorEnabled()) {
+      log.error(MessageFormat.format(message.toString(), args));
+    }
+  }
+
+  public void error(Object message, Throwable t, Object... args) {
+    if (isErrorEnabled()) {
+      log.error(MessageFormat.format(message.toString(), args), t);
+    }
+  }
+
+  public void fatal(Object message, Object... args) {
+    if (isFatalEnabled()) {
+      log.fatal(MessageFormat.format(message.toString(), args));
+    }
+  }
+
+  public void fatal(Object message, Throwable t, Object... args) {
+    if (isFatalEnabled()) {
+      log.fatal(MessageFormat.format(message.toString(), args), t);
+    }
+  }
+
+  public void debug(Object message, Throwable t) {
+    log.debug(message, t);
+  }
+
+  public void debug(Object message) {
+    log.debug(message);
+  }
+
+  public void error(Object message, Throwable t) {
+    log.error(message, t);
+  }
+
+  public void error(Object message) {
+    log.error(message);
+  }
+
+  public void fatal(Object message, Throwable t) {
+    log.fatal(message, t);
+  }
+
+  public void fatal(Object message) {
+    log.fatal(message);
+  }
+
+  public void info(Object message, Throwable t) {
+    log.info(message, t);
+  }
+
+  public void info(Object message) {
+    log.info(message);
+  }
+
+  public boolean isDebugEnabled() {
+    return log.isDebugEnabled();
+  }
+
+  public boolean isErrorEnabled() {
+    return log.isErrorEnabled();
+  }
+
+  public boolean isFatalEnabled() {
+    return log.isFatalEnabled();
+  }
+
+  public boolean isInfoEnabled() {
+    return log.isInfoEnabled();
+  }
+
+  public boolean isTraceEnabled() {
+    return log.isTraceEnabled();
+  }
+
+  public boolean isWarnEnabled() {
+    return log.isWarnEnabled();
+  }
+
+  public void trace(Object message, Throwable t) {
+    log.trace(message, t);
+  }
+
+  public void trace(Object message) {
+    log.trace(message);
+  }
+
+  public void warn(Object message, Throwable t) {
+    log.warn(message, t);
+  }
+
+  public void warn(Object message) {
+    log.warn(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-util/src/main/java/org/apache/blur/metrics/AtomicLongGauge.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/metrics/AtomicLongGauge.java b/blur-util/src/main/java/org/apache/blur/metrics/AtomicLongGauge.java
new file mode 100644
index 0000000..02e6fce
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/metrics/AtomicLongGauge.java
@@ -0,0 +1,24 @@
+package org.apache.blur.metrics;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.yammer.metrics.core.Gauge;
+
+public class AtomicLongGauge extends Gauge<Long> {
+
+  private final AtomicLong at;
+
+  public AtomicLongGauge(AtomicLong at) {
+    this.at = at;
+  }
+
+  @Override
+  public Long value() {
+    return at.get();
+  }
+
+  public static Gauge<Long> wrap(AtomicLong at) {
+    return new AtomicLongGauge(at);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-util/src/main/java/org/apache/blur/metrics/HDFSReporter.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/metrics/HDFSReporter.java b/blur-util/src/main/java/org/apache/blur/metrics/HDFSReporter.java
new file mode 100644
index 0000000..757febe
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/metrics/HDFSReporter.java
@@ -0,0 +1,313 @@
+package org.apache.blur.metrics;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Clock;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Metered;
+import com.yammer.metrics.core.Metric;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricProcessor;
+import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.core.Timer;
+import com.yammer.metrics.reporting.AbstractPollingReporter;
+import com.yammer.metrics.stats.Snapshot;
+
+public class HDFSReporter extends AbstractPollingReporter implements MetricProcessor<HDFSReporter.Context> {
+
+  private static Log LOG = LogFactory.getLog(HDFSReporter.class);
+
+  static class Context {
+
+    private final Path path;
+    private final SimpleDateFormat formatter;
+    private final String name;
+    private final FileSystem fileSystem;
+    private String currentOutputFilePattern;
+    private long now;
+    private PrintWriter printStream;
+    private FSDataOutputStream outputStream;
+    private Path currentOutputPath;
+    private long maxTimeToKeep;
+
+    public Context(Path path, Configuration configuration, String filePattern, String name) throws IOException {
+      this.path = path;
+      this.fileSystem = path.getFileSystem(configuration);
+      if (fileSystem.exists(path)) {
+        if (!fileSystem.getFileStatus(path).isDir()) {
+          throw new IOException("Path [" + path + "] is not a directory.");
+        }
+      } else {
+        fileSystem.mkdirs(path);
+      }
+      this.name = name;
+      this.formatter = new SimpleDateFormat(filePattern);
+      this.maxTimeToKeep = TimeUnit.MINUTES.toMillis(10);
+    }
+
+    public void open(long now) throws IOException {
+      this.now = now;
+      String outputFilePattern = formatter.format(new Date(now));
+      if (!outputFilePattern.equals(currentOutputFilePattern)) {
+        // roll file
+        rollFile(outputFilePattern);
+        cleanupOldMetrics();
+      }
+    }
+
+    private void cleanupOldMetrics() throws IOException {
+      FileStatus[] listStatus = fileSystem.listStatus(path);
+      for (FileStatus fileStatus : listStatus) {
+        Path filePath = fileStatus.getPath();
+        String fileName = filePath.getName();
+        if (fileName.startsWith(name + ".")) {
+          int sIndex = fileName.indexOf('.');
+          int eIndex = fileName.indexOf('.', sIndex + 1);
+          String pattern;
+          if (eIndex < 0) {
+            pattern = fileName.substring(sIndex + 1);
+          } else {
+            pattern = fileName.substring(sIndex + 1, eIndex);
+          }
+          Date date;
+          try {
+            date = formatter.parse(pattern);
+          } catch (ParseException e) {
+            throw new IOException(e);
+          }
+          if (date.getTime() + maxTimeToKeep < now) {
+            fileSystem.delete(filePath, false);
+          }
+        }
+      }
+    }
+
+    private void rollFile(String newOutputFilePattern) throws IOException {
+      if (printStream != null) {
+        printStream.close();
+      }
+      currentOutputPath = new Path(path, name + "." + newOutputFilePattern);
+      if (fileSystem.exists(currentOutputPath)) {
+        // try to append
+        try {
+          outputStream = fileSystem.append(currentOutputPath);
+        } catch (IOException e) {
+          currentOutputPath = new Path(path, name + "." + newOutputFilePattern + "." + now);
+          outputStream = fileSystem.create(currentOutputPath);
+        }
+      } else {
+        outputStream = fileSystem.create(currentOutputPath);
+      }
+      printStream = new PrintWriter(outputStream);
+      currentOutputFilePattern = newOutputFilePattern;
+    }
+
+    public void write(JSONObject jsonObject) throws JSONException {
+      jsonObject.put("timestamp", now);
+      printStream.println(jsonObject.toString());
+    }
+
+    public void flush() throws IOException {
+      printStream.flush();
+      outputStream.flush();
+      outputStream.sync();
+    }
+  }
+
+  public static void enable(Configuration configuration, Path path, String filePattern, String name, long period,
+      TimeUnit unit) throws IOException {
+    enable(Metrics.defaultRegistry(), configuration, path, filePattern, name, period, unit);
+  }
+
+  public static void enable(MetricsRegistry metricsRegistry, Configuration configuration, Path path,
+      String filePattern, String name, long period, TimeUnit unit) throws IOException {
+    final HDFSReporter reporter = new HDFSReporter(metricsRegistry, configuration, path, filePattern, name);
+    reporter.start(period, unit);
+  }
+
+  private final Context context;
+  private final Clock clock;
+
+  public HDFSReporter(Configuration configuration, Path path, String filePattern, String name) throws IOException {
+    this(Metrics.defaultRegistry(), configuration, path, filePattern, name);
+  }
+
+  public HDFSReporter(MetricsRegistry metricsRegistry, Configuration configuration, Path path, String filePattern,
+      String name) throws IOException {
+    this(metricsRegistry, configuration, path, filePattern, name, Clock.defaultClock());
+  }
+
+  public HDFSReporter(MetricsRegistry metricsRegistry, Configuration configuration, Path path, String filePattern,
+      String name, Clock clock) throws IOException {
+    super(metricsRegistry, "hdfs-reporter");
+    this.context = new Context(path, configuration, filePattern, name);
+    this.clock = clock;
+  }
+
+  @Override
+  public void run() {
+    try {
+      context.open(clock.time());
+      for (Entry<String, SortedMap<MetricName, Metric>> entry : getMetricsRegistry().groupedMetrics().entrySet()) {
+        for (Entry<MetricName, Metric> subEntry : entry.getValue().entrySet()) {
+          subEntry.getValue().processWith(this, subEntry.getKey(), context);
+        }
+      }
+      context.flush();
+    } catch (Throwable t) {
+      LOG.error("Unknown error during the processing of metrics.", t);
+    }
+  }
+
+  @Override
+  public void processGauge(MetricName name, Gauge<?> gauge, HDFSReporter.Context context) {
+    JSONObject jsonObject = new JSONObject();
+    try {
+      jsonObject.put("name", getName(name));
+      jsonObject.put("type", "gauge");
+      jsonObject.put("value", gauge.value());
+      context.write(jsonObject);
+    } catch (JSONException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void processCounter(MetricName name, Counter counter, HDFSReporter.Context context) {
+    JSONObject jsonObject = new JSONObject();
+    try {
+      jsonObject.put("name", getName(name));
+      jsonObject.put("type", "counter");
+      jsonObject.put("value", counter.count());
+      context.write(jsonObject);
+    } catch (JSONException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void processMeter(MetricName name, Metered meter, HDFSReporter.Context context) {
+    JSONObject jsonObject = new JSONObject();
+    try {
+      jsonObject.put("name", getName(name));
+      jsonObject.put("type", "meter");
+      JSONObject meterJsonObject = new JSONObject();
+
+      addMeterInfo(meter, meterJsonObject);
+
+      jsonObject.put("value", meterJsonObject);
+
+      context.write(jsonObject);
+    } catch (JSONException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void addMeterInfo(Metered meter, JSONObject meterJsonObject) throws JSONException {
+    meterJsonObject.put("rateUnit", meter.rateUnit());
+    meterJsonObject.put("eventType", meter.eventType());
+    meterJsonObject.put("count", meter.count());
+    meterJsonObject.put("meanRate", meter.meanRate());
+    meterJsonObject.put("oneMinuteRate", meter.oneMinuteRate());
+    meterJsonObject.put("fiveMinuteRate", meter.fiveMinuteRate());
+    meterJsonObject.put("fifteenMinuteRate", meter.fifteenMinuteRate());
+  }
+
+  @Override
+  public void processHistogram(MetricName name, Histogram histogram, HDFSReporter.Context context) {
+    JSONObject jsonObject = new JSONObject();
+    try {
+      jsonObject.put("name", getName(name));
+      jsonObject.put("type", "meter");
+      JSONObject histogramJsonObject = new JSONObject();
+
+      histogramJsonObject.put("min", histogram.min());
+      histogramJsonObject.put("max", histogram.max());
+      histogramJsonObject.put("mean", histogram.mean());
+      histogramJsonObject.put("stdDev", histogram.stdDev());
+
+      Snapshot snapshot = histogram.getSnapshot();
+      JSONObject snapshotJsonObject = new JSONObject();
+      snapshotJsonObject.put("median", snapshot.getMedian());
+      snapshotJsonObject.put("75%", snapshot.get75thPercentile());
+      snapshotJsonObject.put("95%", snapshot.get95thPercentile());
+      snapshotJsonObject.put("98%", snapshot.get98thPercentile());
+      snapshotJsonObject.put("99%", snapshot.get99thPercentile());
+      snapshotJsonObject.put("99.9%", snapshot.get999thPercentile());
+
+      histogramJsonObject.put("snapshot", snapshotJsonObject);
+
+      jsonObject.put("value", histogramJsonObject);
+      context.write(jsonObject);
+    } catch (JSONException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void processTimer(MetricName name, Timer timer, HDFSReporter.Context context) {
+    JSONObject jsonObject = new JSONObject();
+    try {
+      jsonObject.put("name", getName(name));
+      jsonObject.put("type", "meter");
+      JSONObject timerJsonObject = new JSONObject();
+
+      timerJsonObject.put("unit", timer.durationUnit());
+      timerJsonObject.put("min", timer.min());
+      timerJsonObject.put("max", timer.max());
+      timerJsonObject.put("mean", timer.mean());
+      timerJsonObject.put("stdDev", timer.stdDev());
+      addMeterInfo(timer, timerJsonObject);
+
+      Snapshot snapshot = timer.getSnapshot();
+      JSONObject snapshotJsonObject = new JSONObject();
+      snapshotJsonObject.put("median", snapshot.getMedian());
+      snapshotJsonObject.put("75%", snapshot.get75thPercentile());
+      snapshotJsonObject.put("95%", snapshot.get95thPercentile());
+      snapshotJsonObject.put("98%", snapshot.get98thPercentile());
+      snapshotJsonObject.put("99%", snapshot.get99thPercentile());
+      snapshotJsonObject.put("99.9%", snapshot.get999thPercentile());
+
+      timerJsonObject.put("snapshot", snapshotJsonObject);
+
+      jsonObject.put("value", timerJsonObject);
+
+      context.write(jsonObject);
+    } catch (JSONException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private JSONObject getName(MetricName metricName) throws JSONException {
+    String group = metricName.getGroup();
+    String name = metricName.getName();
+    String scope = metricName.getScope();
+    String type = metricName.getType();
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put("name", name);
+    jsonObject.put("group", group);
+    jsonObject.put("scope", scope);
+    jsonObject.put("type", type);
+    return jsonObject;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-util/src/main/java/org/apache/blur/metrics/JSONReporter.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/metrics/JSONReporter.java b/blur-util/src/main/java/org/apache/blur/metrics/JSONReporter.java
new file mode 100644
index 0000000..8ce7a70
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/metrics/JSONReporter.java
@@ -0,0 +1,260 @@
+package org.apache.blur.metrics;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.Metered;
+import com.yammer.metrics.core.Metric;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricProcessor;
+import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.core.Timer;
+import com.yammer.metrics.reporting.AbstractPollingReporter;
+import com.yammer.metrics.stats.Snapshot;
+
+public class JSONReporter extends AbstractPollingReporter implements MetricProcessor<JSONReporter.Context> {
+
+  private static final ResetableCharArrayWriter EMPTY = new ResetableCharArrayWriter() {
+    {
+      try {
+        write("[]");
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  };
+  private static Log LOG = LogFactory.getLog(JSONReporter.class);
+
+  private Context context;
+  private int _256K = 262144;
+  private ResetableCharArrayWriter writerInUse = new ResetableCharArrayWriter(_256K);
+  private ResetableCharArrayWriter writerWriting = new ResetableCharArrayWriter(_256K);
+  private static AtomicReference<ResetableCharArrayWriter> reading = new AtomicReference<ResetableCharArrayWriter>(
+      EMPTY);
+
+  public static void enable(String name, long period, TimeUnit unit, int numberOfElements) throws IOException {
+    enable(Metrics.defaultRegistry(), name, period, unit, numberOfElements);
+  }
+
+  public static void enable(MetricsRegistry metricsRegistry, String name, long period, TimeUnit unit,
+      int numberOfElements) throws IOException {
+    JSONReporter reporter = new JSONReporter(metricsRegistry, name, numberOfElements);
+    reporter.start(period, unit);
+  }
+
+  public static void writeJSONData(Writer writer) throws IOException {
+    synchronized (reading) {
+      ResetableCharArrayWriter reader = reading.get();
+      writer.write(reader.getBuffer(), 0, reader.size());
+    }
+  }
+
+  protected JSONReporter(MetricsRegistry registry, String name, int numberOfElements) {
+    super(registry, name);
+    this.context = new Context(numberOfElements);
+  }
+
+  static class Context {
+
+    private final Map<MetricName, MetricInfo> metricInfoMap = new HashMap<MetricName, MetricInfo>();
+    private final Map<MetricName, String> typeTable;
+    private final int numberOfElements;
+    private long time;
+
+    Context(int numberOfElements) {
+      this.typeTable = new HashMap<MetricName, String>();
+      this.numberOfElements = numberOfElements;
+    }
+
+    long getTime() {
+      return time;
+    }
+
+    void setTime(long time) {
+      this.time = time;
+    }
+
+    public MetricInfo getMetricInfo(MetricName name) {
+      MetricInfo info = metricInfoMap.get(name);
+      if (info == null) {
+        info = new MetricInfo(getName(name), typeTable.get(name), numberOfElements);
+        metricInfoMap.put(name, info);
+      }
+      return info;
+    }
+
+    private String getName(MetricName metricName) {
+      // String group = metricName.getGroup();
+      // String name = metricName.getName();
+      // String scope = metricName.getScope();
+      // String type = metricName.getType();
+      // JSONObject jsonObject = new JSONObject();
+      // jsonObject.put("name", name);
+      // jsonObject.put("group", group);
+      // jsonObject.put("scope", scope);
+      // jsonObject.put("type", type);
+      return metricName.toString();
+    }
+  }
+
+  @Override
+  public void run() {
+    try {
+      context.setTime(System.currentTimeMillis());
+      for (Entry<String, SortedMap<MetricName, Metric>> entry : getMetricsRegistry().groupedMetrics().entrySet()) {
+        for (Entry<MetricName, Metric> subEntry : entry.getValue().entrySet()) {
+          MetricName name = subEntry.getKey();
+          Metric metric = subEntry.getValue();
+          if (metric instanceof Counter) {
+            context.typeTable.put(name, "counter");
+          } else if (metric instanceof Gauge) {
+            context.typeTable.put(name, "gauge");
+          } else if (metric instanceof Histogram) {
+            context.typeTable.put(name, "histogram");
+          } else if (metric instanceof Meter) {
+            context.typeTable.put(name, "meter");
+          } else if (metric instanceof Timer) {
+            context.typeTable.put(name, "timer");
+          }
+          metric.processWith(this, name, context);
+        }
+      }
+      ResetableCharArrayWriter writer = getWriter();
+      writer.reset();
+      Set<Entry<MetricName, MetricInfo>> entrySet = context.metricInfoMap.entrySet();
+      writer.append('[');
+      boolean flag = false;
+      for (Entry<MetricName, MetricInfo> entry : entrySet) {
+        if (flag) {
+          writer.append(',');
+        }
+        entry.getValue().write(writer);
+        flag = true;
+      }
+      writer.append(']');
+      swapWriter();
+    } catch (Throwable t) {
+      LOG.error("Unknown error during the processing of metrics.", t);
+    }
+  }
+
+  private void swapWriter() {
+    synchronized (reading) {
+      ResetableCharArrayWriter tmp1 = writerWriting;
+      writerWriting = writerInUse;
+      writerInUse = tmp1;
+      reading.set(writerInUse);
+    }
+  }
+
+  private ResetableCharArrayWriter getWriter() {
+    return writerWriting;
+  }
+
+  @Override
+  public void processMeter(MetricName name, Metered meter, Context context) throws Exception {
+    MetricInfo info = context.getMetricInfo(name);
+    long time = context.getTime();
+    addMeterInfo(time, meter, info);
+  }
+
+  @Override
+  public void processCounter(MetricName name, Counter counter, Context context) throws Exception {
+    MetricInfo info = context.getMetricInfo(name);
+    long time = context.getTime();
+    info.addNumber("timestamp", time);
+    info.addNumber("value", counter.count());
+  }
+
+  @Override
+  public void processHistogram(MetricName name, Histogram histogram, Context context) throws Exception {
+    MetricInfo info = context.getMetricInfo(name);
+    long time = context.getTime();
+    info.addNumber("timestamp", time);
+    info.addNumber("min", histogram.min());
+    info.addNumber("max", histogram.max());
+    info.addNumber("mean", histogram.mean());
+    info.addNumber("stdDev", histogram.stdDev());
+
+    Snapshot snapshot = histogram.getSnapshot();
+    info.addNumber("median", snapshot.getMedian());
+    info.addNumber("75%", snapshot.get75thPercentile());
+    info.addNumber("95%", snapshot.get95thPercentile());
+    info.addNumber("98%", snapshot.get98thPercentile());
+    info.addNumber("99%", snapshot.get99thPercentile());
+    info.addNumber("99.9%", snapshot.get999thPercentile());
+  }
+
+  @Override
+  public void processTimer(MetricName name, Timer timer, Context context) throws Exception {
+    MetricInfo info = context.getMetricInfo(name);
+    long time = context.getTime();
+
+    addMeterInfo(time, timer, info);
+    info.setMetaData("unit", timer.durationUnit().toString());
+    info.addNumber("min", timer.min());
+    info.addNumber("max", timer.max());
+    info.addNumber("mean", timer.mean());
+    info.addNumber("stdDev", timer.stdDev());
+
+    Snapshot snapshot = timer.getSnapshot();
+    info.addNumber("median", snapshot.getMedian());
+    info.addNumber("75%", snapshot.get75thPercentile());
+    info.addNumber("95%", snapshot.get95thPercentile());
+    info.addNumber("98%", snapshot.get98thPercentile());
+    info.addNumber("99%", snapshot.get99thPercentile());
+    info.addNumber("99.9%", snapshot.get999thPercentile());
+  }
+
+  @Override
+  public void processGauge(MetricName name, Gauge<?> gauge, Context context) throws Exception {
+    MetricInfo info = context.getMetricInfo(name);
+    long time = context.getTime();
+    info.addNumber("timestamp", time);
+    info.addNumber("value", getDouble(gauge.value()));
+  }
+
+  private double getDouble(Object value) {
+    if (value instanceof Integer) {
+      Integer v = (Integer) value;
+      return (int) v;
+    } else if (value instanceof Long) {
+      Long v = (Long) value;
+      return (long) v;
+    } else if (value instanceof Double) {
+      Double v = (Double) value;
+      return v;
+    } else if (value instanceof Float) {
+      Float v = (Float) value;
+      return (float) v;
+    }
+    return 0;
+  }
+
+  private void addMeterInfo(Long time, Metered meter, MetricInfo info) {
+    info.addNumber("timestamp", time);
+    info.setMetaData("rateUnit", meter.rateUnit().toString());
+    info.setMetaData("eventType", meter.eventType());
+    info.addNumber("count", meter.count());
+    info.addNumber("meanRate", meter.meanRate());
+    info.addNumber("oneMinuteRate", meter.oneMinuteRate());
+    info.addNumber("fiveMinuteRate", meter.fiveMinuteRate());
+    info.addNumber("fifteenMinuteRate", meter.fifteenMinuteRate());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-util/src/main/java/org/apache/blur/metrics/JSONReporterServlet.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/metrics/JSONReporterServlet.java b/blur-util/src/main/java/org/apache/blur/metrics/JSONReporterServlet.java
new file mode 100644
index 0000000..0f4c4b8
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/metrics/JSONReporterServlet.java
@@ -0,0 +1,22 @@
+package org.apache.blur.metrics;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+public class JSONReporterServlet extends HttpServlet {
+  private static final long serialVersionUID = -3086441832701983642L;
+
+  @Override
+  protected void doGet(HttpServletRequest req, HttpServletResponse response) throws ServletException, IOException {
+    response.setContentType("application/json");
+    PrintWriter writer = response.getWriter();
+    JSONReporter.writeJSONData(writer);
+    writer.flush();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-util/src/main/java/org/apache/blur/metrics/MetricInfo.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/metrics/MetricInfo.java b/blur-util/src/main/java/org/apache/blur/metrics/MetricInfo.java
new file mode 100644
index 0000000..ddb92e3
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/metrics/MetricInfo.java
@@ -0,0 +1,177 @@
+package org.apache.blur.metrics;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.jboss.netty.util.internal.ConcurrentHashMap;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class MetricInfo {
+
+  public static void main(String[] args) throws IOException {
+    MetricInfo info = new MetricInfo("name", "type", 10);
+    info.addNumber("test1", 0.1);
+    info.addNumber("test1", 0.2);
+    info.addNumber("test1", 0.3);
+    info.addNumber("test1", 0.4);
+    info.addNumber("test1", 0.5);
+    info.addNumber("test1", 0.6);
+    info.addNumber("test1", 0.7);
+    info.addNumber("test1", 0.9);
+    info.addNumber("test1", 1.0);
+    info.addNumber("test1", 1.1);
+    info.addNumber("test1", 1.2);
+
+    StringWriter writer = new StringWriter();
+    info.write(writer);
+    System.out.println(writer);
+  }
+
+  static interface ArrayWriter {
+    void writeArray(Writer writer) throws IOException;
+  }
+
+  static class PrimitiveLongQueue implements ArrayWriter {
+    int head = -1;
+    int length = 0;
+    final long[] data;
+    final int numberOfElements;
+
+    PrimitiveLongQueue(int numberOfElements) {
+      this.data = new long[numberOfElements];
+      this.numberOfElements = numberOfElements;
+    }
+
+    void add(long d) {
+      head++;
+      if (head >= numberOfElements) {
+        head = 0;
+      }
+      data[head] = d;
+      if (length < numberOfElements) {
+        length++;
+      }
+    }
+
+    public void writeArray(Writer writer) throws IOException {
+      writer.append("[");
+      int position = head;
+      for (int i = 0; i < length; i++) {
+        if (i != 0) {
+          writer.append(',');
+        }
+        writer.append(Long.toString(data[position]));
+        position--;
+        if (position < 0) {
+          position = numberOfElements - 1;
+        }
+      }
+      writer.append("]");
+    }
+  }
+
+  static class PrimitiveDoubleQueue implements ArrayWriter {
+    int head = -1;
+    int length = 0;
+    final double[] data;
+    final int numberOfElements;
+
+    PrimitiveDoubleQueue(int numberOfElements) {
+      this.data = new double[numberOfElements];
+      this.numberOfElements = numberOfElements;
+    }
+
+    void add(double d) {
+      head++;
+      if (head >= numberOfElements) {
+        head = 0;
+      }
+      data[head] = d;
+      if (length < numberOfElements) {
+        length++;
+      }
+    }
+
+    public void writeArray(Writer writer) throws IOException {
+      writer.append("[");
+      int position = head;
+      for (int i = 0; i < length; i++) {
+        if (i != 0) {
+          writer.append(',');
+        }
+        double d = data[position];
+        try {
+          writer.append(JSONObject.numberToString(d));
+        } catch (JSONException e) {
+          throw new IOException(e);
+        }
+        position--;
+        if (position < 0) {
+          position = numberOfElements - 1;
+        }
+      }
+      writer.append("]");
+    }
+  }
+
+  private final String name;
+  private final String type;
+  private final Map<String, ArrayWriter> metricMap = new ConcurrentHashMap<String, ArrayWriter>();
+  private int numberOfEntries;
+
+  public MetricInfo(String name, String type, int numberOfEntries) {
+    this.name = name;
+    this.type = type;
+    this.numberOfEntries = numberOfEntries;
+  }
+
+  public void addNumber(String name, long l) {
+    PrimitiveLongQueue queue = (PrimitiveLongQueue) metricMap.get(name);
+    if (queue == null) {
+      queue = new PrimitiveLongQueue(numberOfEntries);
+      metricMap.put(name, queue);
+    }
+    queue.add(l);
+  }
+
+  public void addNumber(String name, double d) {
+    PrimitiveDoubleQueue queue = (PrimitiveDoubleQueue) metricMap.get(name);
+    if (queue == null) {
+      queue = new PrimitiveDoubleQueue(numberOfEntries);
+      metricMap.put(name, queue);
+    }
+    queue.add(d);
+  }
+
+  public void write(Writer writer) throws IOException {
+    writer.append("{\"name\":\"");
+    writer.append(name);
+    writer.append("\",\"type\":\"");
+    writer.append(type);
+    writer.append("\",\"data\":{");
+    writeMetricMap(writer);
+    writer.append("}}");
+  }
+
+  private void writeMetricMap(Writer writer) throws IOException {
+    boolean flag = false;
+    for (Entry<String, ArrayWriter> entry : metricMap.entrySet()) {
+      if (flag) {
+        writer.append(',');
+      }
+      writer.append("\"");
+      writer.append(entry.getKey());
+      writer.append("\":");
+      entry.getValue().writeArray(writer);
+      flag = true;
+    }
+  }
+
+  public void setMetaData(String key, String value) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-util/src/main/java/org/apache/blur/metrics/MetricsConstants.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/metrics/MetricsConstants.java b/blur-util/src/main/java/org/apache/blur/metrics/MetricsConstants.java
new file mode 100644
index 0000000..76981ba
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/metrics/MetricsConstants.java
@@ -0,0 +1,34 @@
+package org.apache.blur.metrics;
+
+public class MetricsConstants {
+  public static final String LUCENE = "Lucene";
+  public static final String BLUR = "Blur";
+  public static final String ORG_APACHE_BLUR = "org.apache.blur";
+  public static final String INTERNAL_BUFFERS = "Internal Buffers";
+  public static final String OTHER_SIZES_ALLOCATED = "Other Sizes Allocated";
+  public static final String _8K_SIZE_ALLOCATED = "8K Size Allocated";
+  public static final String _1K_SIZE_ALLOCATED = "1K Size Allocated";
+  public static final String LOST = "Lost";
+  public static final String THRIFT_CALLS = "Thrift Calls in \u00B5s";
+  public static final String REQUESTS = "Requests";
+  public static final String DELETE = "delete";
+  public static final String UPDATE = "update";
+  public static final String ADD = "add";
+  public static final String SEARCH = "search";
+  public static final String REMOTE = "remote";
+  public static final String HDFS = "HDFS";
+  public static final String LOCAL = "local";
+  public static final String HIT = "Hit";
+  public static final String MISS = "Miss";
+  public static final String CACHE = "Cache";
+  public static final String EVICTION = "Eviction";
+  public static final String TABLE_COUNT = "Table Count";
+  public static final String FILES_IN_QUEUE_TO_BE_DELETED = "Files in Queue to be Deleted";
+  public static final String INDEX_MEMORY_USAGE = "Index Memory Usage";
+  public static final String SEGMENT_COUNT = "Segment Count";
+  public static final String INDEX_COUNT = "Index Count";
+  public static final String INTERNAL_SEARCH = "internal search";
+  public static final String SEARCH_TIMER = "search-timer";
+  public static final String ENTRIES = "Entries";
+  public static final String SIZE = "Size";
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-util/src/main/java/org/apache/blur/metrics/ResetableCharArrayWriter.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/metrics/ResetableCharArrayWriter.java b/blur-util/src/main/java/org/apache/blur/metrics/ResetableCharArrayWriter.java
new file mode 100644
index 0000000..66cb384
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/metrics/ResetableCharArrayWriter.java
@@ -0,0 +1,23 @@
+package org.apache.blur.metrics;
+
+import java.io.CharArrayWriter;
+
+public class ResetableCharArrayWriter extends CharArrayWriter {
+
+  public ResetableCharArrayWriter() {
+    super();
+  }
+
+  public ResetableCharArrayWriter(int initialSize) {
+    super(initialSize);
+  }
+
+  public char[] getBuffer() {
+    return buf;
+  }
+
+  public void reset() {
+    this.count = 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-util/src/main/java/org/apache/blur/zookeeper/WatchChildren.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/zookeeper/WatchChildren.java b/blur-util/src/main/java/org/apache/blur/zookeeper/WatchChildren.java
new file mode 100644
index 0000000..e9c3b72
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/zookeeper/WatchChildren.java
@@ -0,0 +1,192 @@
+package org.apache.blur.zookeeper;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.Closeable;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+public class WatchChildren implements Closeable {
+
+  private final static Log LOG = LogFactory.getLog(WatchChildren.class);
+  private final ZooKeeper _zooKeeper;
+  private final String _path;
+  private final Object _lock = new Object();
+  private final AtomicBoolean _running = new AtomicBoolean(true);
+  private long _delay = TimeUnit.SECONDS.toMillis(3);
+  private List<String> _children;
+  private final String instance = UUID.randomUUID().toString();
+  private Thread _doubleCheckThread;
+  private Thread _watchThread;
+  private final boolean _debug = true;
+  private String _debugStackTrace;
+
+  public static abstract class OnChange {
+    public abstract void action(List<String> children);
+  }
+
+  public WatchChildren(ZooKeeper zooKeeper, String path) {
+    _zooKeeper = zooKeeper;
+    _path = path;
+    LOG.debug("Creating watch [{0}]", instance);
+  }
+
+  public WatchChildren watch(final OnChange onChange) {
+    if (_debug) {
+      StringWriter writer = new StringWriter();
+      PrintWriter printWriter = new PrintWriter(writer);
+      new Throwable().printStackTrace(printWriter);
+      printWriter.close();
+      _debugStackTrace = writer.toString();
+    }
+    _watchThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        Watcher watcher = new Watcher() {
+          @Override
+          public void process(WatchedEvent event) {
+            synchronized (_lock) {
+              _lock.notify();
+            }
+          }
+        };
+        startDoubleCheckThread();
+        while (_running.get()) {
+          synchronized (_lock) {
+            try {
+              _children = _zooKeeper.getChildren(_path, watcher);
+              try {
+                onChange.action(_children);
+              } catch (Throwable t) {
+                LOG.error("Unknown error during onchange action [" + this + "].", t);
+              }
+              _lock.wait();
+            } catch (KeeperException e) {
+              LOG.error("Error in instance [{0}]", e, instance);
+              if (!_running.get()) {
+                LOG.info("Error [{0}]", e.getMessage());
+                return;
+              }
+              if (e.code() == Code.NONODE) {
+                if (_debug) {
+                  LOG.debug("Path for watching not found [{0}], no longer watching, debug [{1}].", _path, _debugStackTrace);
+                } else {
+                  LOG.debug("Path for watching not found [{0}], no longer watching.", _path);
+                }
+                close();
+                return;
+              }
+              if (_debug) {
+                LOG.error("Unknown error [{0}]", e, _debugStackTrace);
+              } else {
+                LOG.error("Unknown error", e);
+              }
+              throw new RuntimeException(e);
+            } catch (InterruptedException e) {
+              return;
+            }
+          }
+        }
+        _running.set(false);
+      }
+    });
+    _watchThread.setName("Watch Children [" + _path + "][" + instance + "]");
+    _watchThread.setDaemon(true);
+    _watchThread.start();
+    return this;
+  }
+
+  private void startDoubleCheckThread() {
+    _doubleCheckThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (_running.get()) {
+          try {
+            synchronized (_running) {
+              _running.wait(_delay);
+            }
+            if (!_running.get()) {
+              return;
+            }
+            if (_zooKeeper.exists(_path, false) == null) {
+              LOG.debug("Path for watching not found [{0}], no longer double checking.", _path);
+              return;
+            }
+            List<String> children = _zooKeeper.getChildren(_path, false);
+            if (!isCorrect(children)) {
+              LOG.error("Double check triggered for [" + _path + "] [" + instance + "]");
+              synchronized (_lock) {
+                _lock.notify();
+              }
+            }
+          } catch (KeeperException e) {
+            if (!_running.get()) {
+              LOG.info("Error [{0}]", e.getMessage());
+              return;
+            }
+            if (e.code() == Code.SESSIONEXPIRED) {
+              LOG.warn("Session expired for [" + _path + "] [" + instance + "]");
+              return;
+            }
+            LOG.error("Unknown error", e);
+            throw new RuntimeException(e);
+          } catch (InterruptedException e) {
+            return;
+          }
+        }
+      }
+    });
+    _doubleCheckThread.setName("Poll Watch Children [" + _path + "][" + instance + "]");
+    _doubleCheckThread.setDaemon(true);
+    _doubleCheckThread.start();
+  }
+
+  protected boolean isCorrect(List<String> children) {
+    if (children == null && _children == null) {
+      return true;
+    }
+    if (children == null || _children == null) {
+      return false;
+    }
+    return children.equals(_children);
+  }
+
+  public void close() {
+    if (_running.get()) {
+      LOG.warn("Closing [{0}]", instance);
+      _running.set(false);
+      if (_doubleCheckThread != null) {
+        _doubleCheckThread.interrupt();
+      }
+      if (_watchThread != null) {
+        _watchThread.interrupt();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-util/src/main/java/org/apache/blur/zookeeper/WatchNodeData.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/zookeeper/WatchNodeData.java b/blur-util/src/main/java/org/apache/blur/zookeeper/WatchNodeData.java
new file mode 100644
index 0000000..9a3c1b6
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/zookeeper/WatchNodeData.java
@@ -0,0 +1,176 @@
+package org.apache.blur.zookeeper;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.Closeable;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.Stat;
+
+public class WatchNodeData implements Closeable {
+
+  private final static Log LOG = LogFactory.getLog(WatchNodeData.class);
+  private final ZooKeeper _zooKeeper;
+  private final String _path;
+  private final Object _lock = new Object();
+  private final AtomicBoolean _running = new AtomicBoolean(true);
+  private long _delay = TimeUnit.SECONDS.toMillis(3);
+  private byte[] _data;
+  private final String instance = UUID.randomUUID().toString();
+  private Thread _doubleCheckThread;
+  private Thread _watchThread;
+
+  public static abstract class OnChange {
+    public abstract void action(byte[] data);
+  }
+
+  public WatchNodeData(ZooKeeper zooKeeper, String path) {
+    _zooKeeper = zooKeeper;
+    _path = path;
+    LOG.debug("Creating watch [{0}]", instance);
+  }
+
+  public WatchNodeData watch(final OnChange onChange) {
+    _watchThread = new Thread(new Runnable() {
+
+      @Override
+      public void run() {
+        Watcher watcher = new Watcher() {
+          @Override
+          public void process(WatchedEvent event) {
+            synchronized (_lock) {
+              _lock.notify();
+            }
+          }
+        };
+        startDoubleCheckThread();
+        while (_running.get()) {
+          synchronized (_lock) {
+            try {
+              Stat stat = _zooKeeper.exists(_path, false);
+              if (stat == null) {
+                LOG.debug("Path [{0}] not found.", _path);
+                return;
+              }
+              _data = _zooKeeper.getData(_path, watcher, stat);
+              onChange.action(_data);
+              _lock.wait();
+            } catch (KeeperException e) {
+              if (!_running.get()) {
+                LOG.info("Error [{0}]", e.getMessage());
+                return;
+              }
+              LOG.error("Unknown error", e);
+              throw new RuntimeException(e);
+            } catch (InterruptedException e) {
+              return;
+            }
+          }
+        }
+      }
+    });
+    _watchThread.setName("Watch Data [" + _path + "][" + instance + "]");
+    _watchThread.setDaemon(true);
+    _watchThread.start();
+    return this;
+  }
+
+  private void startDoubleCheckThread() {
+    _doubleCheckThread = new Thread(new Runnable() {
+
+      @Override
+      public void run() {
+        while (_running.get()) {
+          try {
+            synchronized (_running) {
+              _running.wait(_delay);
+            }
+            if (!_running.get()) {
+              return;
+            }
+            Stat stat = _zooKeeper.exists(_path, false);
+            if (stat == null) {
+              LOG.debug("Path [{0}] not found.", _path);
+              synchronized (_lock) {
+                _lock.notify();
+              }
+              return;
+            }
+
+            byte[] data = _zooKeeper.getData(_path, false, stat);
+            if (!isCorrect(data)) {
+              LOG.debug("Double check triggered for [" + _path + "]");
+              synchronized (_lock) {
+                _lock.notify();
+              }
+            }
+          } catch (KeeperException e) {
+            if (!_running.get()) {
+              LOG.info("Error [{0}]", e.getMessage());
+              return;
+            }
+            if (e.code() == Code.SESSIONEXPIRED) {
+              LOG.warn("Session expired for [" + _path + "] [" + instance + "]");
+              return;
+            }
+            LOG.error("Unknown error", e);
+            throw new RuntimeException(e);
+          } catch (InterruptedException e) {
+            return;
+          }
+        }
+      }
+    });
+    _doubleCheckThread.setName("Poll Watch Data [" + _path + "][" + instance + "]");
+    _doubleCheckThread.setDaemon(true);
+    _doubleCheckThread.start();
+  }
+
+  protected boolean isCorrect(byte[] data) {
+    if (data == null && _data == null) {
+      return true;
+    }
+    if (data == null || _data == null) {
+      return false;
+    }
+    return Arrays.equals(data, _data);
+  }
+
+  public void close() {
+    if (_running.get()) {
+      LOG.debug("Closing [{0}]", instance);
+      _running.set(false);
+      if (_doubleCheckThread != null) {
+        _doubleCheckThread.interrupt();
+      }
+      if (_watchThread != null) {
+        _watchThread.interrupt();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-util/src/main/java/org/apache/blur/zookeeper/WatchNodeExistance.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/zookeeper/WatchNodeExistance.java b/blur-util/src/main/java/org/apache/blur/zookeeper/WatchNodeExistance.java
new file mode 100644
index 0000000..d23a4c3
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/zookeeper/WatchNodeExistance.java
@@ -0,0 +1,159 @@
+package org.apache.blur.zookeeper;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.Closeable;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.Stat;
+
+
+public class WatchNodeExistance implements Closeable {
+
+  private final static Log LOG = LogFactory.getLog(WatchNodeExistance.class);
+  private final ZooKeeper _zooKeeper;
+  private final String _path;
+  private final Object _lock = new Object();
+  private final AtomicBoolean _running = new AtomicBoolean(true);
+  private long _delay = TimeUnit.SECONDS.toMillis(3);
+  private Stat _stat;
+  private final String instance = UUID.randomUUID().toString();
+  private Thread _doubleCheckThread;
+  private Thread _watchThread;
+
+  public static abstract class OnChange {
+    public abstract void action(Stat stat);
+  }
+
+  public WatchNodeExistance(ZooKeeper zooKeeper, String path) {
+    _zooKeeper = zooKeeper;
+    _path = path;
+  }
+
+  public WatchNodeExistance watch(final OnChange onChange) {
+    _watchThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        Watcher watcher = new Watcher() {
+          @Override
+          public void process(WatchedEvent event) {
+            synchronized (_lock) {
+              _lock.notify();
+            }
+          }
+        };
+        startDoubleCheckThread();
+        while (_running.get()) {
+          synchronized (_lock) {
+            try {
+              _stat = _zooKeeper.exists(_path, watcher);
+              onChange.action(_stat);
+              _lock.wait();
+            } catch (KeeperException e) {
+              if (!_running.get()) {
+                LOG.info("Error [{0}]", e.getMessage());
+                return;
+              }
+              LOG.error("Unknown error", e);
+              throw new RuntimeException(e);
+            } catch (InterruptedException e) {
+              return;
+            }
+          }
+        }
+      }
+    });
+    _watchThread.setName("Watch Existance [" + _path + "][" + instance + "]");
+    _watchThread.setDaemon(true);
+    _watchThread.start();
+    return this;
+  }
+
+  private void startDoubleCheckThread() {
+    _doubleCheckThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (_running.get()) {
+          try {
+            synchronized (_running) {
+              _running.wait(_delay);
+            }
+            if (!_running.get()) {
+              return;
+            }
+            Stat stat = _zooKeeper.exists(_path, false);
+            if (!isCorrect(stat)) {
+              LOG.debug("Double check triggered for [" + _path + "]");
+              synchronized (_lock) {
+                _lock.notify();
+              }
+            }
+          } catch (KeeperException e) {
+            if (!_running.get()) {
+              LOG.info("Error [{0}]", e.getMessage());
+              return;
+            }
+            if (e.code() == Code.SESSIONEXPIRED) {
+              LOG.warn("Session expired for [" + _path + "] [" + instance + "]");
+              return;
+            }
+            LOG.error("Unknown error", e);
+            throw new RuntimeException(e);
+          } catch (InterruptedException e) {
+            return;
+          }
+        }
+      }
+    });
+    _doubleCheckThread.setName("Poll Watch Existance [" + _path + "][" + instance + "]");
+    _doubleCheckThread.setDaemon(true);
+    _doubleCheckThread.start();
+  }
+
+  protected boolean isCorrect(Stat stat) {
+    if (stat == null && _stat == null) {
+      return true;
+    }
+    if (stat == null || _stat == null) {
+      return false;
+    }
+    return stat.equals(_stat);
+  }
+
+  public void close() {
+    if (_running.get()) {
+      LOG.warn("Closing [{0}]", instance);
+      _running.set(false);
+      if (_doubleCheckThread != null) {
+        _doubleCheckThread.interrupt();
+      }
+      if (_watchThread != null) {
+        _watchThread.interrupt();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b0e26648/blur-util/src/main/java/org/apache/blur/zookeeper/ZkCachedMap.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/zookeeper/ZkCachedMap.java b/blur-util/src/main/java/org/apache/blur/zookeeper/ZkCachedMap.java
new file mode 100644
index 0000000..22eb9e6
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/zookeeper/ZkCachedMap.java
@@ -0,0 +1,225 @@
+package org.apache.blur.zookeeper;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.blur.CachedMap;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * This is an simple implementation of a set-once map of string-to-string that
+ * is backed by ZooKeeper. Meaning that once the value is set a single time it
+ * cannot be set to a different value. The clear cache method is called when the
+ * internal cache is to be cleared and re-read from ZooKeeper. <br>
+ * <br>
+ * Usage:<br>
+ * <br>
+ * ZkCachedMap map = new ZkCachedMap(zooKeeper, path);<br>
+ * String key = "key";<br>
+ * String newValue = "value";<br>
+ * String value = map.get(key);<br>
+ * if (value == null) {<br>
+ * &nbsp;&nbsp;if (map.putIfMissing(key, newValue)) {<br>
+ * &nbsp;&nbsp;&nbsp;&nbsp;System.out.println("Yay! My value was taken.");<br>
+ * &nbsp;&nbsp;&nbsp;&nbsp;value = newValue;<br>
+ * &nbsp;&nbsp;} else {<br>
+ * &nbsp;&nbsp;&nbsp;&nbsp;System.out.println("Boo! Someone beat me to it.");<br>
+ * &nbsp;&nbsp;&nbsp;&nbsp;value = map.get(key);<br>
+ * &nbsp;&nbsp;}<br>
+ * }<br>
+ * System.out.println("key [" + key + "] value [" + value + "]");<br>
+ * 
+ */
+public class ZkCachedMap extends CachedMap {
+
+  private static final String SEP = "-";
+
+  private final Map<String, String> cache = new ConcurrentHashMap<String, String>();
+  private final ZooKeeper zooKeeper;
+  private final String basePath;
+
+  public ZkCachedMap(ZooKeeper zooKeeper, String basePath) {
+    this.zooKeeper = zooKeeper;
+    this.basePath = basePath;
+  }
+
+  @Override
+  public void clearCache() {
+    cache.clear();
+  }
+
+  /**
+   * Checks the in memory map first, then fetches from ZooKeeper.
+   * 
+   * @param key
+   *          the key.
+   * @return the value, null if it does not exist.
+   * @exception IOException
+   *              if there is an io error.
+   */
+  @Override
+  public String get(String key) throws IOException {
+    String value = cache.get(key);
+    if (value != null) {
+      return value;
+    }
+    return getFromZooKeeper(key);
+  }
+
+  /**
+   * Checks the in memory map first, if it exists then return true. If missing
+   * then check ZooKeeper.
+   * 
+   * @param key
+   *          the key.
+   * @param value
+   *          the value.
+   * @return boolean, true if the put was successful, false if a value already
+   *         exists.
+   * @exception IOException
+   *              if there is an io error.
+   */
+  @Override
+  public boolean putIfMissing(String key, String value) throws IOException {
+    String existingValue = cache.get(key);
+    if (existingValue != null) {
+      return false;
+    }
+    return putIfMissingFromZooKeeper(key, value);
+  }
+
+  private String getFromZooKeeper(String key) throws IOException {
+    try {
+      List<String> keys = new ArrayList<String>(zooKeeper.getChildren(basePath, false));
+      Collections.sort(keys);
+      for (String k : keys) {
+        String realKey = getRealKey(k);
+        if (realKey.equals(key)) {
+          String path = getPath(k);
+          byte[] data = getValue(path);
+          if (data == null) {
+            return null;
+          }
+          String value = new String(data);
+          cache.put(key, value);
+          return value;
+        }
+      }
+      return null;
+    } catch (KeeperException e) {
+      throw new IOException(e);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private byte[] getValue(String path) throws KeeperException, InterruptedException {
+    Stat stat = zooKeeper.exists(path, false);
+    if (stat == null) {
+      return null;
+    }
+    byte[] data = zooKeeper.getData(path, false, stat);
+    if (data == null) {
+      return null;
+    }
+    return data;
+  }
+
+  private boolean putIfMissingFromZooKeeper(String key, String value) throws IOException {
+    try {
+      String path = getPath(key);
+      String newPath = zooKeeper.create(path + SEP, value.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
+      String keyWithSeq = getKeyWithSeq(newPath);
+      List<String> keys = new ArrayList<String>(zooKeeper.getChildren(basePath, false));
+      Collections.sort(keys);
+      for (String k : keys) {
+        String realKey = getRealKey(k);
+        if (realKey.equals(key)) {
+          if (keyWithSeq.equals(k)) {
+            // got the lock
+            cache.put(key, value);
+            return true;
+          } else {
+            // remove duplicate key
+            zooKeeper.delete(newPath, -1);
+            return false;
+          }
+        }
+      }
+      return false;
+    } catch (KeeperException e) {
+      throw new IOException(e);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private String getKeyWithSeq(String newPath) {
+    int lastIndexOf = newPath.lastIndexOf('/');
+    if (lastIndexOf < 0) {
+      throw new RuntimeException("Path [" + newPath + "] does not contain [/]");
+    }
+    return newPath.substring(lastIndexOf + 1);
+  }
+
+  private String getRealKey(String keyWithSeq) {
+    int lastIndexOf = keyWithSeq.lastIndexOf(SEP);
+    if (lastIndexOf < 0) {
+      throw new RuntimeException("Key [" + keyWithSeq + "] does not contain [" + SEP + "]");
+    }
+    return keyWithSeq.substring(0, lastIndexOf);
+  }
+
+  private String getPath(String key) {
+    return basePath + "/" + key;
+  }
+
+  @Override
+  public Map<String, String> fetchAllFromSource() throws IOException {
+    try {
+      Map<String, String> result = new HashMap<String, String>();
+      List<String> keys = new ArrayList<String>(zooKeeper.getChildren(basePath, false));
+      Collections.sort(keys);
+      for (String k : keys) {
+        String realKey = getRealKey(k);
+        String path = getPath(k);
+        byte[] value = getValue(path);
+        if (value != null) {
+          result.put(realKey, new String(value));
+        }
+      }
+      return result;
+    } catch (KeeperException e) {
+      throw new IOException(e);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+
+  }
+
+}


Mime
View raw message