tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject git commit: TEZ-1324. OnFileSortedOutput: send host/port/pathComponent details only when one of the partitions has data
Date Tue, 29 Jul 2014 22:51:40 GMT
Repository: tez
Updated Branches:
  refs/heads/master cd0fc63e1 -> bec90035b


TEZ-1324. OnFileSortedOutput: send host/port/pathComponent details only when one of the partitions
has data


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bec90035
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bec90035
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bec90035

Branch: refs/heads/master
Commit: bec90035b7597a48607299e7ae2cc881086f50e7
Parents: cd0fc63
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Wed Jul 30 04:20:38 2014 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Wed Jul 30 04:20:38 2014 +0530

----------------------------------------------------------------------
 .../library/output/OnFileSortedOutput.java      |  11 +-
 .../library/output/TestOnFileSortedOutput.java  | 263 +++++++++++++++++++
 2 files changed, 271 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/bec90035/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index 240d2d6..f10cb20 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -154,6 +154,7 @@ public class OnFileSortedOutput extends AbstractLogicalOutput {
     DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
         .newBuilder();
 
+    boolean outputGenerated = true;
     if (sendEmptyPartitionDetails) {
       Path indexFile = sorter.getMapOutput().getOutputIndexFile();
       TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
@@ -166,6 +167,7 @@ public class OnFileSortedOutput extends AbstractLogicalOutput {
           emptyPartitions++;
         }
       }
+      outputGenerated = (spillRecord.size() != emptyPartitions);
       if (emptyPartitions > 0) {
         ByteString emptyPartitionsBytesString =
             TezCommonUtils.compressByteArrayToByteString(TezUtils.toByteArray(emptyPartitionDetails));
@@ -175,9 +177,12 @@ public class OnFileSortedOutput extends AbstractLogicalOutput {
               + ", compressedSize=" + emptyPartitionsBytesString.size());
       }
     }
-    payloadBuilder.setHost(host);
-    payloadBuilder.setPort(shufflePort);
-    payloadBuilder.setPathComponent(getContext().getUniqueIdentifier());
+    if (!sendEmptyPartitionDetails || outputGenerated) {
+      payloadBuilder.setHost(host);
+      payloadBuilder.setPort(shufflePort);
+      payloadBuilder.setPathComponent(getContext().getUniqueIdentifier());
+    }
+
     payloadBuilder.setRunDuration((int) ((endTime - startTime) / 1000));
     DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
     byte[] payloadBytes = payloadProto.toByteArray();

http://git-wip-us.apache.org/repos/asf/tez/blob/bec90035/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
new file mode 100644
index 0000000..40447aa
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java
@@ -0,0 +1,263 @@
+package org.apache.tez.runtime.library.output;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.utils.EnvironmentUpdateUtils;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.MemoryUpdateCallback;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.library.api.KeyValuesWriter;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.TestCase.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@RunWith(Parameterized.class)
+public class TestOnFileSortedOutput {
+  private static final Random rnd = new Random();
+  private static final String UniqueID = "UUID";
+  private static final String HOST = "localhost";
+  private static final int PORT = 80;
+
+  private Configuration conf;
+  private FileSystem fs;
+  private Path workingDir;
+  //no of outputs
+  private int partitions;
+  //For sorter (pipelined / Default)
+  private int sorterThreads;
+
+  private KeyValuesWriter writer;
+  private OnFileSortedOutput sortedOutput;
+  private boolean sendEmptyPartitionViaEvent;
+  //Partition index for which data should not be written to.
+  private int emptyPartitionIdx;
+
+  /**
+   * Constructor
+   *
+   * @param sendEmptyPartitionViaEvent
+   * @param threads number of threads needed for sorter (pipelinedsorter or default sorter)
+   * @param emptyPartitionIdx for which data should not be generated
+   */
+  public TestOnFileSortedOutput(boolean sendEmptyPartitionViaEvent, int threads,
+      int emptyPartitionIdx) throws IOException {
+    this.sendEmptyPartitionViaEvent = sendEmptyPartitionViaEvent;
+    this.emptyPartitionIdx = emptyPartitionIdx;
+    this.sorterThreads = threads;
+
+    conf = new Configuration();
+
+    workingDir = new Path(".", this.getClass().getName());
+    String localDirs = workingDir.toString();
+    conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
+    fs = FileSystem.getLocal(conf);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_THREADS, sorterThreads);
+
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
+        HashPartitioner.class.getName());
+
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
+        sendEmptyPartitionViaEvent);
+
+    EnvironmentUpdateUtils.put(ApplicationConstants.Environment.NM_HOST.toString(), HOST);
+    fs.mkdirs(workingDir);
+    this.partitions = Math.max(1, rnd.nextInt(10));
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    fs.delete(workingDir, true);
+  }
+
+  @Parameterized.Parameters(name = "test[{0}, {1}, {2}]")
+  public static Collection<Object[]> getParameters() {
+    Collection<Object[]> parameters = new ArrayList<Object[]>();
+    //empty_partition_via_events_enabled, noOfSortThreads, partitionToBeEmpty
+    parameters.add(new Object[] { false, 1, -1 });
+    parameters.add(new Object[] { false, 1, 0 });
+    parameters.add(new Object[] { true, 1, -1 });
+    parameters.add(new Object[] { true, 1, 0 });
+
+    //Pipelined sorter
+    parameters.add(new Object[] { false, 2, -1 });
+    parameters.add(new Object[] { false, 2, 0 });
+    parameters.add(new Object[] { true, 2, -1 });
+    parameters.add(new Object[] { true, 2, 0 });
+
+    return parameters;
+  }
+
+  private void startSortedOutput(int partitions) throws Exception {
+    sortedOutput = new OnFileSortedOutput();
+    sortedOutput.setNumPhysicalOutputs(partitions);
+    TezOutputContext context = createTezOutputContext();
+    sortedOutput.initialize(context);
+    sortedOutput.start();
+    writer = sortedOutput.getWriter();
+  }
+
+  @Test
+  public void baseTest() throws Exception {
+    startSortedOutput(partitions);
+
+    //Write random set of keys
+    for (int i = 0; i < Math.max(1, rnd.nextInt(50)); i++) {
+      Text key = new Text(new BigInteger(256, rnd).toString());
+      LinkedList values = new LinkedList();
+      for (int j = 0; j < Math.max(2, rnd.nextInt(10)); j++) {
+        values.add(new Text(new BigInteger(256, rnd).toString()));
+      }
+      writer.write(key, values);
+    }
+
+    List<Event> eventList = sortedOutput.close();
+    assertTrue(eventList != null && eventList.size() == 2);
+
+    ShuffleUserPayloads.DataMovementEventPayloadProto
+        payload = ShuffleUserPayloads.DataMovementEventPayloadProto
+        .parseFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload());
+
+    assertEquals(HOST, payload.getHost());
+    assertEquals(PORT, payload.getPort());
+    assertEquals(UniqueID, payload.getPathComponent());
+  }
+
+  @Test
+  public void testWithSomeEmptyPartition() throws Exception {
+    //ensure atleast 2 partitions are available
+    partitions = Math.max(2, partitions);
+    startSortedOutput(partitions);
+
+    //write random data
+    for (int i = 0; i < 2 * partitions; i++) {
+      Text key = new Text(new BigInteger(256, rnd).toString());
+      Text value = new Text(new BigInteger(256, rnd).toString());
+      //skip writing to certain partitions
+      if (i % partitions != emptyPartitionIdx) {
+        writer.write(key, value);
+      }
+    }
+
+    List<Event> eventList = sortedOutput.close();
+    assertTrue(eventList != null && eventList.size() == 2);
+
+    ShuffleUserPayloads.DataMovementEventPayloadProto
+        payload = ShuffleUserPayloads.DataMovementEventPayloadProto
+        .parseFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload());
+
+    assertEquals(HOST, payload.getHost());
+    assertEquals(PORT, payload.getPort());
+    assertEquals(UniqueID, payload.getPathComponent());
+  }
+
+  @Test
+  public void testAllEmptyPartition() throws Exception {
+    startSortedOutput(partitions);
+
+    //Close output without writing any data to it.
+    List<Event> eventList = sortedOutput.close();
+    assertTrue(eventList != null && eventList.size() == 2);
+
+    ShuffleUserPayloads.DataMovementEventPayloadProto
+        payload = ShuffleUserPayloads.DataMovementEventPayloadProto
+        .parseFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload());
+    if (sendEmptyPartitionViaEvent) {
+      assertEquals("", payload.getHost());
+      assertEquals(0, payload.getPort());
+      assertEquals("", payload.getPathComponent());
+    } else {
+      assertEquals(HOST, payload.getHost());
+      assertEquals(PORT, payload.getPort());
+      assertEquals(UniqueID, payload.getPathComponent());
+    }
+  }
+
+  private TezOutputContext createTezOutputContext() throws IOException {
+    String[] workingDirs = { workingDir.toString() };
+    byte[] payLoad = TezUtils.createUserPayloadFromConf(conf);
+    DataOutputBuffer serviceProviderMetaData = new DataOutputBuffer();
+    serviceProviderMetaData.writeInt(PORT);
+
+    TezCounters counters = new TezCounters();
+
+    TezOutputContext context = mock(TezOutputContext.class);
+    doReturn(counters).when(context).getCounters();
+    doReturn(workingDirs).when(context).getWorkDirs();
+    doReturn(payLoad).when(context).getUserPayload();
+    doReturn(100 * 1024 * 1024l).when(context).getTotalMemoryAvailableToTask();
+    doReturn(UniqueID).when(context).getUniqueIdentifier();
+    doReturn("v1").when(context).getDestinationVertexName();
+    doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(context)
+        .getServiceProviderMetaData
+            (ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+    doAnswer(new Answer() {
+      @Override public Object answer(InvocationOnMock invocation) throws Throwable {
+        long requestedSize = (Long) invocation.getArguments()[0];
+        MemoryUpdateCallbackHandler callback = (MemoryUpdateCallbackHandler) invocation
+            .getArguments()[1];
+        callback.memoryAssigned(requestedSize);
+        return null;
+      }
+    }).when(context).requestInitialMemory(anyLong(), any(MemoryUpdateCallback.class));
+    return context;
+  }
+
+}


Mime
View raw message