tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gop...@apache.org
Subject git commit: TEZ-1525. BroadcastLoadGen testcase (gopalv and sseth)
Date Tue, 21 Oct 2014 18:53:02 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.5 fbb25a9a0 -> 2edbebaac


TEZ-1525. BroadcastLoadGen testcase (gopalv and sseth)

(cherry picked from commit 00c5a93f3b250facb957231eb2cee01854bb2369)

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2edbebaa
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2edbebaa
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2edbebaa

Branch: refs/heads/branch-0.5
Commit: 2edbebaace14a6a2d0e236ecb55c3ba8053f3ef8
Parents: fbb25a9
Author: Gopal V <gopalv@apache.org>
Authored: Mon Oct 20 16:15:26 2014 -0700
Committer: Gopal V <gopalv@apache.org>
Committed: Tue Oct 21 11:52:07 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../mapreduce/examples/BroadcastLoadGen.java    | 148 +++++++++++++++++++
 .../tez/mapreduce/examples/ExampleDriver.java   |   1 +
 3 files changed, 150 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/2edbebaa/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ca2768b..44ed877 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -36,6 +36,7 @@ ALL CHANGES:
   TEZ-1682. Tez AM hangs at times when there are task failures.
   TEZ-1683. Do ugi::getGroups only when necessary when checking ACLs.
   TEZ-1584. Restore counters from DAGFinishedEvent when DAG is completed.
+  TEZ-1525. BroadcastLoadGen testcase.
   TEZ-1686. TestRecoveryParser.testGetLastCompletedDAG fails sometimes
 
 Release 0.5.1: 2014-10-02

http://git-wip-us.apache.org/repos/asf/tez/blob/2edbebaa/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastLoadGen.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastLoadGen.java
b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastLoadGen.java
new file mode 100644
index 0000000..bdea1ac
--- /dev/null
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastLoadGen.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.examples;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+import org.apache.tez.runtime.library.output.UnorderedKVOutput;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+
+import com.google.common.base.Preconditions;
+
+public class BroadcastLoadGen extends TezExampleBase {
+
+  private static final Log LOG = LogFactory.getLog(RPCLoadGen.class);
+
+  public static class InputGenProcessor extends SimpleProcessor {
+
+    final int bytesToGenerate;
+
+    public InputGenProcessor(ProcessorContext context) {
+      super(context);
+      bytesToGenerate = context.getUserPayload().getPayload().getInt(0);
+    }
+
+    @Override
+    public void run() throws Exception {
+      Random random = new Random();
+      Preconditions.checkArgument(getOutputs().size() == 1);
+      LogicalOutput out = getOutputs().values().iterator().next();
+      if (out instanceof UnorderedKVOutput) {
+        UnorderedKVOutput output = (UnorderedKVOutput) out;
+        KeyValueWriter kvWriter = output.getWriter();
+        int approxNumInts = bytesToGenerate / 6;
+        for (int i = 0 ; i < approxNumInts ; i++) {
+          kvWriter.write(NullWritable.get(), new IntWritable(random.nextInt()));
+        }
+      }
+    }
+  }
+
+  public static class InputFetchProcessor extends SimpleProcessor {
+    public InputFetchProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      Preconditions.checkArgument(inputs.size() == 1);
+      KeyValueReader broadcastKvReader = (KeyValueReader) getInputs().values().iterator().next().getReader();
+      long sum = 0;
+      int count = 0;
+      while (broadcastKvReader.next()) {
+        count++;
+        sum += ((IntWritable) broadcastKvReader.getCurrentValue()).get();
+      }
+      System.err.println("Count = " + getContext().getTaskIndex() + " * " + count + ", Sum="
+ sum);
+    }
+  }
+
+  private DAG createDAG(int numGenTasks, int totalSourceDataSize, int numFetcherTasks) {
+    int bytesPerSource = totalSourceDataSize / numGenTasks;
+    LOG.info("DataPerSourceTask(bytes)=" + bytesPerSource);
+    ByteBuffer payload = ByteBuffer.allocate(4);
+    payload.putInt(0, bytesPerSource);
+
+    Vertex broadcastVertex = Vertex.create("DataGen",
+        ProcessorDescriptor.create(InputGenProcessor.class.getName())
+            .setUserPayload(UserPayload.create(payload)), numGenTasks);
+    Vertex fetchVertex = Vertex.create("FetchVertex",
+        ProcessorDescriptor.create(InputFetchProcessor.class.getName()), numFetcherTasks);
+    UnorderedKVEdgeConfig edgeConf = UnorderedKVEdgeConfig.newBuilder(NullWritable.class
+    .getName(), IntWritable.class.getName()).setCompression(false, null, null).build();
+
+    DAG dag = DAG.create("BroadcastLoadGen");
+    dag.addVertex(broadcastVertex).addVertex(fetchVertex).addEdge(
+        Edge.create(broadcastVertex, fetchVertex, edgeConf.createDefaultBroadcastEdgeProperty()));
+    return dag;
+  }
+
+  @Override
+  protected final int runJob(String[] args, TezConfiguration tezConf, TezClient tezClient)
throws
+      TezException, InterruptedException, IOException {
+    LOG.info("Running: " + this.getClass().getSimpleName() + StringUtils.join(args, " "));
+
+    int numSourceTasks = Integer.parseInt(args[0]);
+    int totalSourceData = Integer.parseInt(args[1]);
+    int numFetcherTasks = Integer.parseInt(args[2]);
+    LOG.info("Parameters: numSourceTasks=" + numSourceTasks + ", totalSourceDataSize(bytes)="
+ totalSourceData +
+        ", numFetcherTasks=" + numFetcherTasks);
+
+    DAG dag = createDAG(numSourceTasks, totalSourceData, numFetcherTasks);
+    return runDag(dag, false, LOG);
+  }
+
+  @Override
+  protected void printUsage() {
+    System.err.println(
+        "Usage: " + "BroadcastLoadGen <num_source_tasks>  <total_source_data>
<num_destination_tasks>");
+    ToolRunner.printGenericCommandUsage(System.err);
+  }
+
+  @Override
+  protected final int validateArgs(String[] otherArgs) {
+    return otherArgs.length != 3 ? 2 : 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new BroadcastLoadGen(), args);
+    System.exit(res);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/2edbebaa/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index 3824607..d413e8b 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -44,6 +44,7 @@ public class ExampleDriver {
     int exitCode = -1;
     ProgramDriver pgd = new ProgramDriver();
     try {
+      pgd.addClass("broadcastloadgen", BroadcastLoadGen.class, "Run a DAG to generate load
for Broadcast Shuffle");
       pgd.addClass("rpcloadgen", RPCLoadGen.class, "Run a DAG to generate load for the task
to AM RPC");
       pgd.addClass("wordcount", MapredWordCount.class,
           "A map/reduce program that counts the words in the input files.");


Mime
View raw message