tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [1/4] git commit: TAJO-1040: Misuse netty HashedWheelTimer. (jinho)
Date Fri, 19 Sep 2014 08:24:19 GMT
Repository: tajo
Updated Branches:
  refs/heads/block_iteration c969a6151 -> 64ba68c76


TAJO-1040: Misuse netty HashedWheelTimer. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1a8a67ce
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1a8a67ce
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1a8a67ce

Branch: refs/heads/block_iteration
Commit: 1a8a67ceb4224f3f0f3adab7d3e8956d3d12137c
Parents: 15450e8
Author: jhkim <jhkim@apache.org>
Authored: Wed Sep 17 15:23:10 2014 +0900
Committer: jhkim <jhkim@apache.org>
Committed: Wed Sep 17 15:23:10 2014 +0900

----------------------------------------------------------------------
 CHANGES                                              |  2 ++
 .../apache/tajo/worker/ExecutionBlockContext.java    |  5 +++++
 .../main/java/org/apache/tajo/worker/Fetcher.java    |  9 ++-------
 .../src/main/java/org/apache/tajo/worker/Task.java   |  4 +++-
 .../org/apache/tajo/worker/TaskRunnerManager.java    | 13 +++++++++++++
 .../java/org/apache/tajo/worker/TestFetcher.java     | 15 ++++++++++-----
 6 files changed, 35 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/1a8a67ce/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 61fcec0..d1002b3 100644
--- a/CHANGES
+++ b/CHANGES
@@ -31,6 +31,8 @@ Release 0.9.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1040: Misuse netty HashedWheelTimer. (jinho)
+
     TAJO-1034: Reduce Explicit Use of JVM Internal Class. (Jihun Kang via hyunsik)
 
     TAJO-1027: Upgrade Hive to 0.13.0 and 0.13.1. (jaehwa)

http://git-wip-us.apache.org/repos/asf/tajo/blob/1a8a67ce/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 306ab66..d4b9861 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -42,6 +42,7 @@ import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.Pair;
 import org.apache.tajo.worker.event.TaskRunnerStartEvent;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.util.Timer;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -272,6 +273,10 @@ public class ExecutionBlockContext {
     return channelFactory;
   }
 
+  public Timer getRPCTimer() {
+    return manager.getRPCTimer();
+  }
+
   protected void releaseShuffleChannelFactory(){
     if(channelFactory != null) {
       channelFactory.shutdown();

http://git-wip-us.apache.org/repos/asf/tajo/blob/1a8a67ce/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index 64475fe..4867fe4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -30,7 +30,6 @@ import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.handler.codec.http.*;
 import org.jboss.netty.handler.timeout.ReadTimeoutException;
 import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
-import org.jboss.netty.util.HashedWheelTimer;
 import org.jboss.netty.util.Timer;
 
 import java.io.File;
@@ -67,11 +66,12 @@ public class Fetcher {
 
   private ClientBootstrap bootstrap;
 
-  public Fetcher(TajoConf conf, URI uri, File file, ClientSocketChannelFactory factory) {
+  public Fetcher(TajoConf conf, URI uri, File file, ClientSocketChannelFactory factory, Timer
timer) {
     this.uri = uri;
     this.file = file;
     this.state = TajoProtos.FetcherState.FETCH_INIT;
     this.conf = conf;
+    this.timer = timer;
 
     String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
     this.host = uri.getHost() == null ? "localhost" : uri.getHost();
@@ -154,9 +154,6 @@ public class Fetcher {
 
       this.finishTime = System.currentTimeMillis();
       LOG.info("Fetcher finished:" + (finishTime - startTime) + " ms, " + getState() + ",
URI:" + uri);
-      if (timer != null) {
-        timer.stop();
-      }
     }
   }
 
@@ -302,8 +299,6 @@ public class Fetcher {
     public ChannelPipeline getPipeline() throws Exception {
       ChannelPipeline pipeline = pipeline();
 
-      timer = new HashedWheelTimer();
-
       int maxChunkSize = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE);
       int readTimeout = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/1a8a67ce/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 7b4cbe1..c9c83d1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -53,6 +53,7 @@ import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.util.Timer;
 
 import java.io.File;
 import java.io.IOException;
@@ -672,6 +673,7 @@ public class Task {
 
     if (fetches.size() > 0) {
       ClientSocketChannelFactory channelFactory = executionBlockContext.getShuffleChannelFactory();
+      Timer timer = executionBlockContext.getRPCTimer();
       Path inputDir = executionBlockContext.getLocalDirAllocator().
           getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
       File storeDir;
@@ -686,7 +688,7 @@ public class Task {
             storeDir.mkdirs();
           }
           storeFile = new File(storeDir, "in_" + i);
-          Fetcher fetcher = new Fetcher(systemConf, uri, storeFile, channelFactory);
+          Fetcher fetcher = new Fetcher(systemConf, uri, storeFile, channelFactory, timer);
           runnerList.add(fetcher);
           i++;
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1a8a67ce/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index 5eb66b8..c3713d1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -33,6 +33,8 @@ import org.apache.tajo.engine.utils.TupleCache;
 import org.apache.tajo.worker.event.TaskRunnerEvent;
 import org.apache.tajo.worker.event.TaskRunnerStartEvent;
 import org.apache.tajo.worker.event.TaskRunnerStopEvent;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timer;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -51,6 +53,7 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
   private AtomicBoolean stop = new AtomicBoolean(false);
   private FinishedTaskCleanThread finishedTaskCleanThread;
   private Dispatcher dispatcher;
+  private HashedWheelTimer rpcTimer;
 
   public TaskRunnerManager(TajoWorker.WorkerContext workerContext, Dispatcher dispatcher)
{
     super(TaskRunnerManager.class.getName());
@@ -75,6 +78,7 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
   public void start() {
     finishedTaskCleanThread = new FinishedTaskCleanThread();
     finishedTaskCleanThread.start();
+    rpcTimer = new HashedWheelTimer();
     super.start();
   }
 
@@ -98,6 +102,11 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
     if(finishedTaskCleanThread != null) {
       finishedTaskCleanThread.interrupted();
     }
+
+    if(rpcTimer != null){
+      rpcTimer.stop();
+    }
+
     super.stop();
     if(workerContext.isYarnContainerMode()) {
       workerContext.stopWorker(true);
@@ -206,6 +215,10 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
     return tajoConf;
   }
 
+  public Timer getRPCTimer(){
+    return rpcTimer;
+  }
+
   class FinishedTaskCleanThread extends Thread {
     //TODO if history size is large, the historyMap should remove immediately
     public void run() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/1a8a67ce/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
index b755e02..b15d523 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -29,6 +29,8 @@ import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -47,6 +49,7 @@ public class TestFetcher {
   private TajoConf conf = new TajoConf();
   private TajoPullServerService pullServerService;
   private ClientSocketChannelFactory channelFactory;
+  private Timer timer;
 
   @Before
   public void setUp() throws Exception {
@@ -62,12 +65,14 @@ public class TestFetcher {
     pullServerService.start();
 
     channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", 1);
+    timer = new HashedWheelTimer();
   }
 
   @After
   public void tearDown(){
     pullServerService.stop();
     channelFactory.releaseExternalResources();
+    timer.stop();
   }
 
   @Test
@@ -93,7 +98,7 @@ public class TestFetcher {
     stream.close();
 
     URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory);
+    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory,
timer);
     assertNotNull(fetcher.get());
 
     FileSystem fs = FileSystem.getLocal(new TajoConf());
@@ -135,7 +140,7 @@ public class TestFetcher {
     stream.close();
 
     URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory);
+    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory,
timer);
     assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
 
     fetcher.get();
@@ -163,7 +168,7 @@ public class TestFetcher {
     stream.close();
 
     URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory);
+    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory,
timer);
     assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
 
     fetcher.get();
@@ -195,7 +200,7 @@ public class TestFetcher {
     stream.close();
 
     URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory);
+    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory,
timer);
     assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
 
     fetcher.get();
@@ -213,7 +218,7 @@ public class TestFetcher {
     String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId,
sid, partId, "h", ta);
 
     URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
-    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory);
+    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory,
timer);
     assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
 
     pullServerService.stop();


Mime
View raw message