tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-1089. Change CompositeDataMovementEvent endIndex to a count of number of events. Contributed by Chen He.
Date Wed, 16 Jul 2014 19:28:10 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master 55524f0ad -> 171da568e


TEZ-1089. Change CompositeDataMovementEvent endIndex to a count of
number of events. Contributed by Chen He.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/171da568
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/171da568
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/171da568

Branch: refs/heads/master
Commit: 171da568e9d46089e5722b43d94f115629dc90e6
Parents: 55524f0
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Jul 16 12:27:17 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Jul 16 12:28:02 2014 -0700

----------------------------------------------------------------------
 .../api/events/CompositeDataMovementEvent.java  | 18 +++----
 tez-api/src/main/proto/Events.proto             |  2 +-
 .../event/TestCompositeDataMovementEvent.java   | 50 ++++++++++++++++++++
 .../org/apache/tez/common/ProtoConverters.java  |  4 +-
 .../TestUnorderedPartitionedKVWriter.java       |  4 +-
 5 files changed, 64 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/171da568/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
index 5abfc06..267dfff 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CompositeDataMovementEvent.java
@@ -39,7 +39,7 @@ import org.apache.tez.runtime.api.Event;
 public class CompositeDataMovementEvent extends Event {
 
   protected final int sourceIndexStart;
-  protected final int sourceIndexEnd;
+  protected final int count;
   protected int version;
 
   protected final TezUserPayload userPayload;
@@ -48,15 +48,15 @@ public class CompositeDataMovementEvent extends Event {
    * @param srcIndexStart
    *          the startIndex of the physical source which generated the event
    *          (inclusive)
-   * @param srcIndexEnd
-   *          the endIndex of the physical source which generated the event
-   *          (non-inclusive)
+   * @param count
+   *          the number of physical sources represented by this event,
+   *          starting from the srcIndexStart(non-inclusive)
    * @param userPayload
    *          the common payload between all the events.
    */
-  public CompositeDataMovementEvent(int srcIndexStart, int srcIndexEnd, byte[] userPayload)
{
+  public CompositeDataMovementEvent(int srcIndexStart, int count, byte[] userPayload) {
     this.sourceIndexStart = srcIndexStart;
-    this.sourceIndexEnd = srcIndexEnd;
+    this.count = count;
     this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
   }
 
@@ -64,8 +64,8 @@ public class CompositeDataMovementEvent extends Event {
     return sourceIndexStart;
   }
 
-  public int getSourceIndexEnd() {
-    return sourceIndexEnd;
+  public int getCount() {
+    return count;
   }
 
   public byte[] getUserPayload() {
@@ -92,7 +92,7 @@ public class CompositeDataMovementEvent extends Event {
 
           @Override
           public boolean hasNext() {
-            return currentPos < sourceIndexEnd;
+            return currentPos < (count + sourceIndexStart);
           }
 
           @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/171da568/tez-api/src/main/proto/Events.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/Events.proto b/tez-api/src/main/proto/Events.proto
index 9f12508..94838af 100644
--- a/tez-api/src/main/proto/Events.proto
+++ b/tez-api/src/main/proto/Events.proto
@@ -51,7 +51,7 @@ message RootInputDataInformationEventProto {
 
 message CompositeEventProto {
   optional int32 start_index = 1;
-  optional int32 end_index = 2;
+  optional int32 count = 2;
   optional bytes user_payload = 3;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/171da568/tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java
b/tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java
new file mode 100644
index 0000000..595fb63
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java
@@ -0,0 +1,50 @@
+package org.apache.tez.runtime.api.event;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestCompositeDataMovementEvent {
+  byte[] userPayload = "Dummy userPayLoad".getBytes();
+
+  @Test
+  public void testGetCount(){
+    int numPartitions = 2;
+    int startIndex = 2;
+    CompositeDataMovementEvent cdme1 =
+        new CompositeDataMovementEvent(startIndex, numPartitions, userPayload);
+    Assert.assertEquals(numPartitions, cdme1.getCount());
+    Assert.assertEquals(startIndex, cdme1.getSourceIndexStart());
+  }
+
+  @Test
+  public void testGetEvents(){
+    int numOutputs = 0;
+    int startIndex = 1;
+    CompositeDataMovementEvent cdme2 =
+        new CompositeDataMovementEvent(startIndex, numOutputs, userPayload);
+    for(DataMovementEvent dme: cdme2.getEvents()){
+      numOutputs++;
+    }
+    Assert.assertEquals(numOutputs, cdme2.getCount());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/171da568/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
index 2213f79..33dd1ee 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
@@ -53,7 +53,7 @@ public class ProtoConverters {
     EventProtos.CompositeEventProto.Builder builder =
         EventProtos.CompositeEventProto.newBuilder();
     builder.setStartIndex(event.getSourceIndexStart());
-    builder.setEndIndex(event.getSourceIndexEnd());
+    builder.setCount(event.getCount());
     if (event.getUserPayload() != null) {
       builder.setUserPayload(ByteString.copyFrom(event.getUserPayload()));
     }
@@ -63,7 +63,7 @@ public class ProtoConverters {
   public static CompositeDataMovementEvent convertCompositeDataMovementEventFromProto(
       EventProtos.CompositeEventProto proto) {
     return new CompositeDataMovementEvent(proto.getStartIndex(),
-        proto.getEndIndex(),
+        proto.getCount(),
         proto.hasUserPayload() ?
             proto.getUserPayload().toByteArray() : null);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/171da568/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 2bf45d1..f5fd276 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -309,7 +309,7 @@ public class TestUnorderedPartitionedKVWriter {
     assertTrue(events.get(0) instanceof CompositeDataMovementEvent);
     CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(0);
     assertEquals(0, cdme.getSourceIndexStart());
-    assertEquals(numPartitions, cdme.getSourceIndexEnd());
+    assertEquals(numPartitions, cdme.getCount());
     DataMovementEventPayloadProto eventProto = DataMovementEventPayloadProto.parseFrom(cdme
         .getUserPayload());
     assertFalse(eventProto.hasData());
@@ -496,7 +496,7 @@ public class TestUnorderedPartitionedKVWriter {
     assertTrue(events.get(0) instanceof CompositeDataMovementEvent);
     CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(0);
     assertEquals(0, cdme.getSourceIndexStart());
-    assertEquals(numOutputs, cdme.getSourceIndexEnd());
+    assertEquals(numOutputs, cdme.getCount());
     DataMovementEventPayloadProto eventProto = DataMovementEventPayloadProto.parseFrom(cdme
         .getUserPayload());
     assertFalse(eventProto.hasData());


Mime
View raw message