tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [28/43] git commit: TAJO-1103: Insert clause of partitioned table loses some FetchImpls. (jinho)
Date Fri, 10 Oct 2014 04:33:34 GMT
TAJO-1103: Insert clause of partitioned table loses some FetchImpls. (jinho)

Closes #186


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/de28c829
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/de28c829
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/de28c829

Branch: refs/heads/index_support
Commit: de28c82942ae0fd2d06b45bcf629d660c30a9888
Parents: 2eba8aa
Author: jhkim <jhkim@apache.org>
Authored: Wed Oct 8 11:04:24 2014 +0900
Committer: jhkim <jhkim@apache.org>
Committed: Wed Oct 8 11:04:24 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  5 +-
 .../java/org/apache/tajo/worker/FetchImpl.java  | 16 ++--
 .../apache/tajo/master/TestRepartitioner.java   | 85 ++++++++++++++++++++
 .../org/apache/tajo/worker/TestHistory.java     |  9 ++-
 4 files changed, 103 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/de28c829/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index de21976..bff9583 100644
--- a/CHANGES
+++ b/CHANGES
@@ -161,7 +161,10 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
 
-    TAJO-1101: Broadcast join with a zero-length file table returns wrong result data.(Hyoungjun
Kim)
+    TAJO-1103: Insert clause of partitioned table loses some FetchImpls. (jinho)
+
+    TAJO-1101: Broadcast join with a zero-length file table returns wrong result data.
+    (Hyoungjun Kim)
 
     TAJO-1067: INSERT OVERWRITE INTO should not remove all partitions. (jaehwa)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/de28c829/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
index 964da5d..f411793 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
@@ -36,8 +36,6 @@ import java.util.List;
  * <code>FetchImpl</code> information to indicate the locations of intermediate
data.
  */
 public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cloneable
{
-  private TajoWorkerProtocol.FetchProto.Builder builder = null;
-
   private QueryUnit.PullHost host;             // The pull server host information
   private TajoWorkerProtocol.ShuffleType type; // hash or range partition method.
   private ExecutionBlockId executionBlockId;   // The executionBlock id
@@ -53,7 +51,6 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>,
Cl
   private long length = -1;
 
   public FetchImpl() {
-    builder = TajoWorkerProtocol.FetchProto.newBuilder();
     taskIds = new ArrayList<Integer>();
     attemptIds = new ArrayList<Integer>();
   }
@@ -108,14 +105,14 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>,
Cl
 
   @Override
   public int hashCode() {
-    return Objects.hashCode(host, type, executionBlockId, partitionId, name, rangeParams,
hasNext, taskIds, attemptIds);
+    return Objects.hashCode(host, type, executionBlockId, partitionId, name, rangeParams,
+        hasNext, taskIds, attemptIds, offset, length);
   }
 
   @Override
   public TajoWorkerProtocol.FetchProto getProto() {
-    if (builder == null) {
-      builder = TajoWorkerProtocol.FetchProto.newBuilder();
-    }
+    TajoWorkerProtocol.FetchProto.Builder builder = TajoWorkerProtocol.FetchProto.newBuilder();
+
     builder.setHost(host.getHost());
     builder.setPort(host.getPort());
     builder.setType(type);
@@ -235,7 +232,6 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>,
Cl
   public FetchImpl clone() throws CloneNotSupportedException {
     FetchImpl newFetchImpl = (FetchImpl) super.clone();
 
-    newFetchImpl.builder = TajoWorkerProtocol.FetchProto.newBuilder();
     newFetchImpl.host = host.clone();
     newFetchImpl.type = type;
     newFetchImpl.executionBlockId = executionBlockId;
@@ -273,6 +269,8 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>,
Cl
         TUtil.checkEquals(name, fetch.name) &&
         TUtil.checkEquals(rangeParams, fetch.rangeParams) &&
         TUtil.checkEquals(taskIds, fetch.taskIds) &&
-        TUtil.checkEquals(type, fetch.type);
+        TUtil.checkEquals(type, fetch.type) &&
+        TUtil.checkEquals(offset, fetch.offset) &&
+        TUtil.checkEquals(length, fetch.length);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/de28c829/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index f969a08..afa330e 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -29,6 +29,7 @@ import org.apache.tajo.master.querymaster.QueryUnit;
 import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
 import org.apache.tajo.master.querymaster.Repartitioner;
 import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.FetchImpl;
 import org.jboss.netty.handler.codec.http.QueryStringDecoder;
 import org.junit.Test;
@@ -39,6 +40,7 @@ import java.util.*;
 import static junit.framework.Assert.assertEquals;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType;
 import static org.apache.tajo.master.querymaster.Repartitioner.FetchGroupMeta;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 public class TestRepartitioner {
@@ -403,6 +405,89 @@ public class TestRepartitioner {
     }
   }
 
+  @Test
+  public void testSplitIntermediatesWithUniqueHost() {
+    List<IntermediateEntry> intermediateEntries = new ArrayList<IntermediateEntry>();
+
+    int[] pageLengths = new int[20];  //195MB
+    for (int i = 0 ; i < pageLengths.length; i++) {
+      if (i < pageLengths.length - 1) {
+        pageLengths[i] =  10 * 1024 * 1024;
+      } else {
+        pageLengths[i] =  5 * 1024 * 1024;
+      }
+    }
+
+    long expectedTotalLength = 0;
+    QueryUnit.PullHost pullHost = new QueryUnit.PullHost("host", 0);
+
+    for (int i = 0; i < 20; i++) {
+      List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
+      long offset = 0;
+      for (int j = 0; j < pageLengths.length; j++) {
+        pages.add(new Pair(offset, pageLengths[j]));
+        offset += pageLengths[j];
+        expectedTotalLength += pageLengths[j];
+      }
+      IntermediateEntry interm = new IntermediateEntry(i, -1, 0, pullHost);
+      interm.setPages(pages);
+      interm.setVolume(offset);
+      intermediateEntries.add(interm);
+    }
+
+    long splitVolume = 128 * 1024 * 1024;
+    List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null,
intermediateEntries,
+        splitVolume, 10 * 1024 * 1024);
+    assertEquals(32, fetches.size());
+
+    int expectedSize = 0;
+    Set<FetchImpl> fetchSet = TUtil.newHashSet();
+    for(List<FetchImpl> list : fetches){
+      expectedSize += list.size();
+      fetchSet.addAll(list);
+    }
+    assertEquals(expectedSize, fetchSet.size());
+
+
+    int index = 0;
+    int numZeroPosFetcher = 0;
+    long totalLength = 0;
+    Set<String> uniqPullHost = new HashSet<String>();
+
+    for (List<FetchImpl> eachFetchList: fetches) {
+      long length = 0;
+      for (FetchImpl eachFetch: eachFetchList) {
+        if (eachFetch.getOffset() == 0) {
+          numZeroPosFetcher++;
+        }
+        totalLength += eachFetch.getLength();
+        length += eachFetch.getLength();
+        uniqPullHost.add(eachFetch.getPullHost().toString());
+      }
+      assertTrue(length + " should be smaller than splitVolume", length < splitVolume);
+      if (index < fetches.size() - 1) {
+        assertTrue(length + " should be great than 100MB" + fetches.size() + "," + index,
length >= 100 * 1024 * 1024);
+      }
+      index++;
+    }
+    assertEquals(20, numZeroPosFetcher);
+    assertEquals(1, uniqPullHost.size());
+    assertEquals(expectedTotalLength, totalLength);
+  }
+
+  @Test
+  public void testFetchImpl() {
+    ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+    QueryUnit.PullHost pullHost = new QueryUnit.PullHost("localhost", 0);
+
+    FetchImpl expected = new FetchImpl(pullHost, ShuffleType.SCATTERED_HASH_SHUFFLE, ebId,
1);
+    FetchImpl fetch2 = new FetchImpl(pullHost, ShuffleType.SCATTERED_HASH_SHUFFLE, ebId,
1);
+    assertEquals(expected, fetch2);
+    fetch2.setOffset(5);
+    fetch2.setLength(10);
+    assertNotEquals(expected, fetch2);
+  }
+
   private static void assertFetchImpl(FetchImpl [] expected, Map<String, List<FetchImpl>>[]
result) {
     Set<FetchImpl> expectedURLs = Sets.newHashSet();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/de28c829/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
index 15ead84..3a85c14 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
@@ -76,8 +76,13 @@ public class TestHistory {
 
     TaskRunnerHistory history = histories.iterator().next();
     assertEquals(Service.STATE.STOPPED, history.getState());
-
-    assertEquals(history, new TaskRunnerHistory(history.getProto()));
+    TaskRunnerHistory fromProto = new TaskRunnerHistory(history.getProto());
+    assertEquals(history.getExecutionBlockId(), fromProto.getExecutionBlockId());
+    assertEquals(history.getFinishTime(), fromProto.getFinishTime());
+    assertEquals(history.getStartTime(), fromProto.getStartTime());
+    assertEquals(history.getState(), fromProto.getState());
+    assertEquals(history.getContainerId(), fromProto.getContainerId());
+    assertEquals(history.getProto().getTaskHistoriesCount(), fromProto.getProto().getTaskHistoriesCount());
   }
 
   @Test


Mime
View raw message