tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject git commit: TAJO-488: Data fetcher doesn't close small file in shuffle. (jinho)
Date Tue, 14 Jan 2014 01:40:56 GMT
Updated Branches:
  refs/heads/master 2e246cc64 -> c4b81569a


TAJO-488: Data fetcher doesn't close small file in shuffle. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/c4b81569
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/c4b81569
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/c4b81569

Branch: refs/heads/master
Commit: c4b81569ad2a1ee5fa1695016a7aded6c9f7e512
Parents: 2e246cc
Author: jinossy <jinossy@gmail.com>
Authored: Tue Jan 14 10:40:19 2014 +0900
Committer: jinossy <jinossy@gmail.com>
Committed: Tue Jan 14 10:40:19 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../java/org/apache/tajo/worker/Fetcher.java    | 52 ++++++++++++++------
 .../tajo/pullserver/TajoPullServerService.java  | 12 +++++
 3 files changed, 51 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c4b81569/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 494b60a..1be92c0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -207,6 +207,8 @@ Release 0.8.0 - unreleased
 
   BUG FIXES
 
+    TAJO-488: Data fetcher doesn't close small file in shuffle. (jinho)
+
     TAJO-496: java.lang.NoSuchFieldError: IS_SECURITY_ENABLED when debugging
     tajo. (Min Zhou via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c4b81569/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
index 360bebd..76f3049 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -18,8 +18,10 @@
 
 package org.apache.tajo.worker;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IOUtils;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.*;
@@ -34,6 +36,7 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.nio.channels.FileChannel;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 import static org.jboss.netty.channel.Channels.pipeline;
 
@@ -54,6 +57,8 @@ public class Fetcher {
   private long finishTime;
   private long fileLen;
   private int messageReceiveCount;
+  private ChannelFactory factory;
+  private ClientBootstrap bootstrap;
 
   public Fetcher(URI uri, File file) {
     this.uri = uri;
@@ -69,6 +74,25 @@ public class Fetcher {
         this.port = 443;
       }
     }
+
+    ThreadFactory bossFactory = new ThreadFactoryBuilder()
+        .setNameFormat("Fetcher Netty Boss #%d")
+        .build();
+    ThreadFactory workerFactory = new ThreadFactoryBuilder()
+        .setNameFormat("Fetcher Netty Worker #%d")
+        .build();
+
+    factory = new NioClientSocketChannelFactory(
+        Executors.newCachedThreadPool(bossFactory),
+        Executors.newCachedThreadPool(workerFactory));
+
+    bootstrap = new ClientBootstrap(factory);
+    bootstrap.setOption("connectTimeoutMillis", 5000L); // set 5 sec
+    bootstrap.setOption("receiveBufferSize", 1048576); // set 1M
+    bootstrap.setOption("tcpNoDelay", true);
+
+    ChannelPipelineFactory factory = new HttpClientPipelineFactory(file);
+    bootstrap.setPipelineFactory(factory);
   }
 
   public long getStartTime() {
@@ -101,13 +125,6 @@ public class Fetcher {
 
   public File get() throws IOException {
     startTime = System.currentTimeMillis();
-    ClientBootstrap bootstrap = new ClientBootstrap(
-        new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
-            Executors.newCachedThreadPool()));
-    bootstrap.setOption("connectTimeoutMillis", 5000L); // set 5 sec
-    bootstrap.setOption("receiveBufferSize", 1048576); // set 1M
-    ChannelPipelineFactory factory = new HttpClientPipelineFactory(file);
-    bootstrap.setPipelineFactory(factory);
 
     ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
 
@@ -127,12 +144,15 @@ public class Fetcher {
     request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
     request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
 
+
     // Send the HTTP request.
-    channel.write(request);
+    ChannelFuture channelFuture = channel.write(request);
 
     // Wait for the server to close the connection.
     channel.getCloseFuture().awaitUninterruptibly();
 
+    channelFuture.addListener(ChannelFutureListener.CLOSE);
+
     // Shut down executor threads to exit.
     bootstrap.releaseExternalResources();
     finishTime = System.currentTimeMillis();
@@ -152,8 +172,6 @@ public class Fetcher {
 
     public HttpClientHandler(File file) throws FileNotFoundException {
       this.file = file;
-      this.raf = new RandomAccessFile(file, "rw");
-      this.fc = raf.getChannel();
     }
 
     @Override
@@ -188,11 +206,12 @@ public class Fetcher {
 
           if (response.getStatus() == HttpResponseStatus.NO_CONTENT) {
             LOG.info("There are no data corresponding to the request");
-            fc.close();
-            raf.close();
             return;
           }
 
+          this.raf = new RandomAccessFile(file, "rw");
+          this.fc = raf.getChannel();
+
           if (response.isChunked()) {
             readingChunks = true;
           } else {
@@ -205,9 +224,7 @@ public class Fetcher {
           HttpChunk chunk = (HttpChunk) e.getMessage();
           if (chunk.isLast()) {
             readingChunks = false;
-            long fileLength = fc.position();
-            fc.close();
-            raf.close();
+            long fileLength = file.length();
             if (fileLength == length) {
               LOG.info("Data fetch is done (total received bytes: " + fileLength
                   + ")");
@@ -223,6 +240,11 @@ public class Fetcher {
         if(raf != null) {
           fileLen = file.length();
         }
+
+        if(fileLen >= length){
+          IOUtils.cleanup(LOG, fc, raf);
+
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c4b81569/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 837cb71..1b0cadc 100644
--- a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -229,6 +229,8 @@ public class TajoPullServerService extends AbstractService {
   public synchronized void start() {
     Configuration conf = getConfig();
     ServerBootstrap bootstrap = new ServerBootstrap(selector);
+    bootstrap.setOption("tcpNoDelay", true);
+
     try {
       pipelineFact = new HttpPipelineFactory(conf);
     } catch (Exception ex) {
@@ -348,6 +350,16 @@ public class TajoPullServerService extends AbstractService {
     }
 
     @Override
+    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
+        throws Exception {
+
+      accepted.add(evt.getChannel());
+      LOG.info(String.format("Current number of shuffle connections (%d)", accepted.size()));
+      super.channelOpen(ctx, evt);
+
+    }
+
+    @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
         throws Exception {
 


Mime
View raw message