incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [3/9] git commit: Adding a way to report on thread progress when the thread watcher is triggered.
Date Thu, 29 Jan 2015 15:15:38 GMT
Adding a way to report on thread progress when the thread watcher is triggered.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/2dacedea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/2dacedea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/2dacedea

Branch: refs/heads/master
Commit: 2dacedea39610060e9f3e11dae29309ca5342653
Parents: 3496463
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Jan 29 09:41:29 2015 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Jan 29 09:41:29 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/blur/utils/BlurUtil.java    |  8 ++--
 .../apache/blur/concurrent/ThreadWatcher.java   | 46 +++++++++++++++++---
 2 files changed, 44 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2dacedea/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java b/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
index f6f5597..9d5fff0 100644
--- a/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
+++ b/blur-core/src/main/java/org/apache/blur/utils/BlurUtil.java
@@ -63,6 +63,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongArray;
 
 import org.apache.blur.BlurConfiguration;
+import org.apache.blur.concurrent.ThreadWatcher;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.lucene.search.PrimeDocCache;
@@ -698,17 +699,14 @@ public class BlurUtil {
   }
 
   public static void setupFileSystem(String uri, int shardCount, Configuration configuration)
throws IOException {
+    ThreadWatcher.status("creating shard dirs", 0.0f);
     Path tablePath = new Path(uri);
     FileSystem fileSystem = tablePath.getFileSystem(configuration);
     if (createPath(fileSystem, tablePath)) {
       LOG.info("Table uri existed.");
       validateShardCount(shardCount, fileSystem, tablePath);
     }
-    for (int i = 0; i < shardCount; i++) {
-      String shardName = ShardUtil.getShardName(SHARD_PREFIX, i);
-      Path shardPath = new Path(tablePath, shardName);
-      createPath(fileSystem, shardPath);
-    }
+    ThreadWatcher.resetStatus();
   }
 
   public static void validateShardCount(int shardCount, FileSystem fileSystem, Path tablePath)
throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2dacedea/blur-util/src/main/java/org/apache/blur/concurrent/ThreadWatcher.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/concurrent/ThreadWatcher.java b/blur-util/src/main/java/org/apache/blur/concurrent/ThreadWatcher.java
index 2a2ff7e..df491b7 100644
--- a/blur-util/src/main/java/org/apache/blur/concurrent/ThreadWatcher.java
+++ b/blur-util/src/main/java/org/apache/blur/concurrent/ThreadWatcher.java
@@ -34,7 +34,6 @@ import java.util.concurrent.TimeoutException;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 
-
 public class ThreadWatcher {
 
   private static final Log LOG = LogFactory.getLog(ThreadWatcher.class);
@@ -47,9 +46,21 @@ public class ThreadWatcher {
 
     Thread _thread;
     final long _start = System.currentTimeMillis();
+    String _task;
+    float _complete;
+
+    public void status(String task, float complete) {
+      _task = task;
+      _complete = complete;
+    }
+
+    public void resetStatus() {
+      _complete = 0;
+      _task = null;
+    }
   }
 
-  private ConcurrentMap<Thread, Watch> _threads = new ConcurrentHashMap<Thread,
Watch>();
+  private final ConcurrentMap<Thread, Watch> _threads = new ConcurrentHashMap<Thread,
Watch>();
   private Timer _timer;
 
   private ThreadWatcher() {
@@ -90,7 +101,14 @@ public class ThreadWatcher {
   private void processWatch(Watch watch) {
     if (hasBeenExecutingLongerThan(TimeUnit.SECONDS.toMillis(5), watch)) {
       long now = System.currentTimeMillis();
-      LOG.info("Thread [{0}] has been executing for [{1} ms]", watch._thread, now - watch._start);
+      String task = watch._task;
+      float complete = watch._complete;
+      if (task == null) {
+        LOG.info("Thread [{0}] has been executing for [{1} ms]", watch._thread, now - watch._start);
+      } else {
+        LOG.info("Thread [{0}] has been executing task [{1}] is [{2}] complete for [{4} ms]",
watch._thread, task,
+            complete, now - watch._start);
+      }
     }
   }
 
@@ -163,7 +181,8 @@ public class ThreadWatcher {
       _executorService.execute(wrap(command));
     }
 
-    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>>
tasks, long timeout, TimeUnit unit) throws InterruptedException {
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>>
tasks, long timeout, TimeUnit unit)
+        throws InterruptedException {
       return _executorService.invokeAll(wrapCallableCollection(tasks), timeout, unit);
     }
 
@@ -171,7 +190,8 @@ public class ThreadWatcher {
       return _executorService.invokeAll(wrapCallableCollection(tasks));
     }
 
-    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long
timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
{
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long
timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException, TimeoutException {
       return _executorService.invokeAny(wrapCallableCollection(tasks), timeout, unit);
     }
 
@@ -216,4 +236,20 @@ public class ThreadWatcher {
     return _instance;
   }
 
+  public static void status(String task, float complete) {
+    Watch watch = _instance._threads.get(Thread.currentThread());
+    if (watch == null) {
+      return;
+    }
+    watch.status(task, complete);
+  }
+
+  public static void resetStatus() {
+    Watch watch = _instance._threads.get(Thread.currentThread());
+    if (watch == null) {
+      return;
+    }
+    watch.resetStatus();
+  }
+
 }


Mime
View raw message