Repository: tajo
Updated Branches:
refs/heads/block_iteration c969a6151 -> 64ba68c76
TAJO-1040: Misuse netty HashedWheelTimer. (jinho)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1a8a67ce
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1a8a67ce
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1a8a67ce
Branch: refs/heads/block_iteration
Commit: 1a8a67ceb4224f3f0f3adab7d3e8956d3d12137c
Parents: 15450e8
Author: jhkim <jhkim@apache.org>
Authored: Wed Sep 17 15:23:10 2014 +0900
Committer: jhkim <jhkim@apache.org>
Committed: Wed Sep 17 15:23:10 2014 +0900
----------------------------------------------------------------------
CHANGES | 2 ++
.../apache/tajo/worker/ExecutionBlockContext.java | 5 +++++
.../main/java/org/apache/tajo/worker/Fetcher.java | 9 ++-------
.../src/main/java/org/apache/tajo/worker/Task.java | 4 +++-
.../org/apache/tajo/worker/TaskRunnerManager.java | 13 +++++++++++++
.../java/org/apache/tajo/worker/TestFetcher.java | 15 ++++++++++-----
6 files changed, 35 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/1a8a67ce/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 61fcec0..d1002b3 100644
--- a/CHANGES
+++ b/CHANGES
@@ -31,6 +31,8 @@ Release 0.9.0 - unreleased
IMPROVEMENT
+ TAJO-1040: Misuse netty HashedWheelTimer. (jinho)
+
TAJO-1034: Reduce Explicit Use of JVM Internal Class. (Jihun Kang via hyunsik)
TAJO-1027: Upgrade Hive to 0.13.0 and 0.13.1. (jaehwa)
http://git-wip-us.apache.org/repos/asf/tajo/blob/1a8a67ce/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 306ab66..d4b9861 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -42,6 +42,7 @@ import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.util.Pair;
import org.apache.tajo.worker.event.TaskRunnerStartEvent;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.util.Timer;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -272,6 +273,10 @@ public class ExecutionBlockContext {
return channelFactory;
}
+ public Timer getRPCTimer() {
+ return manager.getRPCTimer();
+ }
+
protected void releaseShuffleChannelFactory(){
if(channelFactory != null) {
channelFactory.shutdown();
http://git-wip-us.apache.org/repos/asf/tajo/blob/1a8a67ce/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index 64475fe..4867fe4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -30,7 +30,6 @@ import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.handler.codec.http.*;
import org.jboss.netty.handler.timeout.ReadTimeoutException;
import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
-import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timer;
import java.io.File;
@@ -67,11 +66,12 @@ public class Fetcher {
private ClientBootstrap bootstrap;
- public Fetcher(TajoConf conf, URI uri, File file, ClientSocketChannelFactory factory) {
+ public Fetcher(TajoConf conf, URI uri, File file, ClientSocketChannelFactory factory, Timer
timer) {
this.uri = uri;
this.file = file;
this.state = TajoProtos.FetcherState.FETCH_INIT;
this.conf = conf;
+ this.timer = timer;
String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
this.host = uri.getHost() == null ? "localhost" : uri.getHost();
@@ -154,9 +154,6 @@ public class Fetcher {
this.finishTime = System.currentTimeMillis();
LOG.info("Fetcher finished:" + (finishTime - startTime) + " ms, " + getState() + ",
URI:" + uri);
- if (timer != null) {
- timer.stop();
- }
}
}
@@ -302,8 +299,6 @@ public class Fetcher {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = pipeline();
- timer = new HashedWheelTimer();
-
int maxChunkSize = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE);
int readTimeout = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT);
http://git-wip-us.apache.org/repos/asf/tajo/blob/1a8a67ce/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 7b4cbe1..c9c83d1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -53,6 +53,7 @@ import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.fragment.FileFragment;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.util.Timer;
import java.io.File;
import java.io.IOException;
@@ -672,6 +673,7 @@ public class Task {
if (fetches.size() > 0) {
ClientSocketChannelFactory channelFactory = executionBlockContext.getShuffleChannelFactory();
+ Timer timer = executionBlockContext.getRPCTimer();
Path inputDir = executionBlockContext.getLocalDirAllocator().
getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
File storeDir;
@@ -686,7 +688,7 @@ public class Task {
storeDir.mkdirs();
}
storeFile = new File(storeDir, "in_" + i);
- Fetcher fetcher = new Fetcher(systemConf, uri, storeFile, channelFactory);
+ Fetcher fetcher = new Fetcher(systemConf, uri, storeFile, channelFactory, timer);
runnerList.add(fetcher);
i++;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1a8a67ce/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index 5eb66b8..c3713d1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -33,6 +33,8 @@ import org.apache.tajo.engine.utils.TupleCache;
import org.apache.tajo.worker.event.TaskRunnerEvent;
import org.apache.tajo.worker.event.TaskRunnerStartEvent;
import org.apache.tajo.worker.event.TaskRunnerStopEvent;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timer;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -51,6 +53,7 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
private AtomicBoolean stop = new AtomicBoolean(false);
private FinishedTaskCleanThread finishedTaskCleanThread;
private Dispatcher dispatcher;
+ private HashedWheelTimer rpcTimer;
public TaskRunnerManager(TajoWorker.WorkerContext workerContext, Dispatcher dispatcher)
{
super(TaskRunnerManager.class.getName());
@@ -75,6 +78,7 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
public void start() {
finishedTaskCleanThread = new FinishedTaskCleanThread();
finishedTaskCleanThread.start();
+ rpcTimer = new HashedWheelTimer();
super.start();
}
@@ -98,6 +102,11 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
if(finishedTaskCleanThread != null) {
finishedTaskCleanThread.interrupted();
}
+
+ if(rpcTimer != null){
+ rpcTimer.stop();
+ }
+
super.stop();
if(workerContext.isYarnContainerMode()) {
workerContext.stopWorker(true);
@@ -206,6 +215,10 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
return tajoConf;
}
+ public Timer getRPCTimer(){
+ return rpcTimer;
+ }
+
class FinishedTaskCleanThread extends Thread {
//TODO if history size is large, the historyMap should remove immediately
public void run() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/1a8a67ce/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
index b755e02..b15d523 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -29,6 +29,8 @@ import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.util.CommonTestingUtil;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -47,6 +49,7 @@ public class TestFetcher {
private TajoConf conf = new TajoConf();
private TajoPullServerService pullServerService;
private ClientSocketChannelFactory channelFactory;
+ private Timer timer;
@Before
public void setUp() throws Exception {
@@ -62,12 +65,14 @@ public class TestFetcher {
pullServerService.start();
channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", 1);
+ timer = new HashedWheelTimer();
}
@After
public void tearDown(){
pullServerService.stop();
channelFactory.releaseExternalResources();
+ timer.stop();
}
@Test
@@ -93,7 +98,7 @@ public class TestFetcher {
stream.close();
URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
- final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory);
+ final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory,
timer);
assertNotNull(fetcher.get());
FileSystem fs = FileSystem.getLocal(new TajoConf());
@@ -135,7 +140,7 @@ public class TestFetcher {
stream.close();
URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
- final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory);
+ final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory,
timer);
assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
fetcher.get();
@@ -163,7 +168,7 @@ public class TestFetcher {
stream.close();
URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
- final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory);
+ final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory,
timer);
assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
fetcher.get();
@@ -195,7 +200,7 @@ public class TestFetcher {
stream.close();
URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
- final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory);
+ final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory,
timer);
assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
fetcher.get();
@@ -213,7 +218,7 @@ public class TestFetcher {
String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId,
sid, partId, "h", ta);
URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
- final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory);
+ final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory,
timer);
assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
pullServerService.stop();
|