tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-1126. Add a data generator and validator for the intersect example. (sseth)
Date Tue, 20 May 2014 17:38:11 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master 62bace35c -> 60905cd25


TEZ-1126. Add a data generator and validator for the intersect example.
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/60905cd2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/60905cd2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/60905cd2

Branch: refs/heads/master
Commit: 60905cd2599eb1c56e95b99d8723ef6b94cb4f93
Parents: 62bace3
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue May 20 10:37:34 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue May 20 10:37:34 2014 -0700

----------------------------------------------------------------------
 .../tez/mapreduce/examples/ExampleDriver.java   |   4 +
 .../mapreduce/examples/IntersectDataGen.java    | 324 +++++++++++++++++++
 .../mapreduce/examples/IntersectExample.java    |   4 +-
 .../mapreduce/examples/IntersectValidate.java   | 283 ++++++++++++++++
 4 files changed, 612 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/60905cd2/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index ec24563..2f9cde2 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -87,6 +87,10 @@ public class ExampleDriver {
           "Filters lines by the specified word using OneToOne edge");
       pgd.addClass("intersect", IntersectExample.class,
           "Identify all occurences of lines in file1 which also occur in file2");
+      pgd.addClass("intersectdatagen", IntersectDataGen.class,
+          "Generate data to run the intersect example");
+      pgd.addClass("intersectvalidate", IntersectValidate.class,
+          "Validate data generated by intersect and intersectdatagen");
       exitCode = pgd.run(argv);
     }
     catch(Throwable e){

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/60905cd2/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
new file mode 100644
index 0000000..318c708
--- /dev/null
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
@@ -0,0 +1,324 @@
+package org.apache.tez.mapreduce.examples;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.mapreduce.committer.MROutputCommitter;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+import com.google.common.base.Preconditions;
+
+public class IntersectDataGen extends Configured implements Tool {
+
+  private static final Log LOG = LogFactory.getLog(IntersectDataGen.class);
+
+  private static final String STREAM_OUTPUT_NAME = "streamoutput";
+  private static final String HASH_OUTPUT_NAME = "hashoutput";
+  private static final String EXPECTED_OUTPUT_NAME = "expectedoutput";
+
+  public static void main(String[] args) throws Exception {
+    IntersectDataGen dataGen = new IntersectDataGen();
+    int status = ToolRunner.run(new Configuration(), dataGen, args);
+    System.exit(status);
+  }
+
+  private static void printUsage() {
+    System.err
+        .println("Usage: "
+            + "intersectdatagen <outPath1> <path1Size> <outPath2> <path2Size>
<expectedResultPath> <numTasks>");
+    ;
+    ToolRunner.printGenericCommandUsage(System.err);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Configuration conf = getConf();
+    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+
+    if (otherArgs.length != 6) {
+      printUsage();
+      return 2;
+    }
+    return execute(otherArgs);
+  }
+
+  private int execute(String[] args) throws IOException, TezException, InterruptedException
{
+    LOG.info("Running IntersectDataGen");
+
+    TezConfiguration tezConf = new TezConfiguration(getConf());
+    UserGroupInformation.setConfiguration(tezConf);
+
+    String outDir1 = args[0];
+    long outDir1Size = Long.parseLong(args[1]);
+    String outDir2 = args[2];
+    long outDir2Size = Long.parseLong(args[3]);
+    String expectedOutputDir = args[4];
+    int numTasks = Integer.parseInt(args[5]);
+
+    Path largeOutPath = null;
+    Path smallOutPath = null;
+    long largeOutSize = 0;
+    long smallOutSize = 0;
+
+    if (outDir1Size >= outDir2Size) {
+      largeOutPath = new Path(outDir1);
+      largeOutSize = outDir1Size;
+      smallOutPath = new Path(outDir2);
+      smallOutSize = outDir2Size;
+    } else {
+      largeOutPath = new Path(outDir2);
+      largeOutSize = outDir2Size;
+      smallOutPath = new Path(outDir1);
+      smallOutSize = outDir1Size;
+    }
+
+    Path expectedOutputPath = new Path(expectedOutputDir);
+
+    // Verify output path existence
+    FileSystem fs = FileSystem.get(tezConf);
+    int res = 0;
+    res = checkOutputDirectory(fs, largeOutPath) + checkOutputDirectory(fs, smallOutPath)
+        + checkOutputDirectory(fs, expectedOutputPath);
+    if (res != 0) {
+      return 2;
+    }
+
+    if (numTasks <= 0) {
+      System.err.println("NumTasks must be > 0");
+      return 2;
+    }
+
+    AMConfiguration amConfiguration = new AMConfiguration(null, null, tezConf, null);
+    TezSessionConfiguration sessionConfiguration = new TezSessionConfiguration(amConfiguration,
+        tezConf);
+    TezSession tezSession = new TezSession("IntersectDataGenSession", sessionConfiguration);
+    try {
+      tezSession.start();
+
+      DAG dag = createDag(tezConf, largeOutPath, smallOutPath, expectedOutputPath, numTasks,
+          largeOutSize, smallOutSize);
+      setupURIsForCredentials(dag, largeOutPath, smallOutPath, expectedOutputPath);
+
+      tezSession.waitTillReady();
+      DAGClient dagClient = tezSession.submitDAG(dag);
+      DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null);
+      if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+        LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
+        return -1;
+      }
+      return 0;
+    } finally {
+      tezSession.stop();
+    }
+  }
+
+  private DAG createDag(TezConfiguration tezConf, Path largeOutPath, Path smallOutPath,
+      Path expectedOutputPath, int numTasks, long largeOutSize, long smallOutSize)
+      throws IOException {
+
+    long largeOutSizePerTask = largeOutSize / numTasks;
+    long smallOutSizePerTask = smallOutSize / numTasks;
+
+    DAG dag = new DAG("IntersectExample");
+
+    byte[] streamOutputPayload = createPayloadForOutput(largeOutPath, tezConf);
+    byte[] hashOutputPayload = createPayloadForOutput(smallOutPath, tezConf);
+    byte[] expectedOutputPayload = createPayloadForOutput(expectedOutputPath, tezConf);
+
+    Vertex genDataVertex = new Vertex("datagen", new ProcessorDescriptor(
+        GenDataProcessor.class.getName()).setUserPayload(GenDataProcessor.createConfiguration(
+        largeOutSizePerTask, smallOutSizePerTask)), numTasks, MRHelpers.getMapResource(tezConf))
+        .setJavaOpts(MRHelpers.getMapJavaOpts(tezConf));
+    genDataVertex.addOutput(STREAM_OUTPUT_NAME,
+        new OutputDescriptor(MROutput.class.getName()).setUserPayload(streamOutputPayload),
+        MROutputCommitter.class);
+    genDataVertex.addOutput(HASH_OUTPUT_NAME,
+        new OutputDescriptor(MROutput.class.getName()).setUserPayload(hashOutputPayload),
+        MROutputCommitter.class);
+    genDataVertex.addOutput(EXPECTED_OUTPUT_NAME,
+        new OutputDescriptor(MROutput.class.getName()).setUserPayload(expectedOutputPayload),
+        MROutputCommitter.class);
+
+    dag.addVertex(genDataVertex);
+
+    return dag;
+  }
+
+  public static class GenDataProcessor extends SimpleMRProcessor {
+
+    private static final Log LOG = LogFactory.getLog(GenDataProcessor.class);
+
+    long streamOutputFileSize;
+    long hashOutputFileSize;
+    float overlapApprox = 0.2f;
+
+    public static byte[] createConfiguration(long streamOutputFileSize, long hashOutputFileSize)
+        throws IOException {
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      DataOutputStream dos = new DataOutputStream(bos);
+      dos.writeLong(streamOutputFileSize);
+      dos.writeLong(hashOutputFileSize);
+      dos.close();
+      bos.close();
+      return bos.toByteArray();
+    }
+
+    @Override
+    public void initialize() throws Exception {
+      byte[] payload = getContext().getUserPayload();
+      ByteArrayInputStream bis = new ByteArrayInputStream(payload);
+      DataInputStream dis = new DataInputStream(bis);
+      streamOutputFileSize = dis.readLong();
+      hashOutputFileSize = dis.readLong();
+      LOG.info("Initialized with largeFileTargetSize=" + streamOutputFileSize
+          + ", smallFileTragetSize=" + hashOutputFileSize);
+      dis.close();
+      bis.close();
+    }
+
+    @Override
+    public void run() throws Exception {
+      Preconditions.checkState(getInputs().size() == 0);
+      Preconditions.checkState(getOutputs().size() == 3);
+
+      KeyValueWriter streamOutputWriter = (KeyValueWriter) getOutputs().get(STREAM_OUTPUT_NAME)
+          .getWriter();
+      KeyValueWriter hashOutputWriter = (KeyValueWriter) getOutputs().get(HASH_OUTPUT_NAME)
+          .getWriter();
+      KeyValueWriter expectedOutputWriter = (KeyValueWriter) getOutputs().get(EXPECTED_OUTPUT_NAME)
+          .getWriter();
+
+      float fileSizeFraction = hashOutputFileSize / (float) streamOutputFileSize;
+      Preconditions.checkState(fileSizeFraction > 0.0f && fileSizeFraction <=
1.0f);
+      int mod = 1;
+      int extraKeysMod = 0;
+      if (fileSizeFraction > overlapApprox) {
+        // Common keys capped by overlap. Additional ones required in the hashFile.
+        mod = (int) (1 / overlapApprox);
+        extraKeysMod = (int) (1 / (fileSizeFraction - overlapApprox));
+      } else {
+        // All keys in hashFile must exist in stream file.
+        mod = (int) (1 / fileSizeFraction);
+      }
+      LOG.info("Using mod=" + mod + ", extraKeysMod=" + extraKeysMod);
+
+      long count = 0;
+      long sizeLarge = 0;
+      long sizeSmall = 0;
+      long numLargeFileKeys = 0;
+      long numSmallFileKeys = 0;
+      long numExpectedKeys = 0;
+      while (sizeLarge < streamOutputFileSize) {
+        String str = createOverlapString(13, count);
+        Text text = new Text(str);
+        int size = text.getLength();
+        streamOutputWriter.write(text, NullWritable.get());
+        sizeLarge += size;
+        numLargeFileKeys++;
+        if (count % mod == 0) {
+          hashOutputWriter.write(text, NullWritable.get());
+          sizeSmall += size;
+          numSmallFileKeys++;
+          expectedOutputWriter.write(text, NullWritable.get());
+          numExpectedKeys++;
+        }
+        if (extraKeysMod != 0 && count % extraKeysMod == 0) {
+          String nStr = createNonOverlaptring(13, count);
+          Text nText = new Text(nStr);
+          hashOutputWriter.write(nText, NullWritable.get());
+          sizeSmall += nText.getLength();
+          numSmallFileKeys++;
+        }
+        count++;
+      }
+      LOG.info("OutputStats: " + "largeFileNumKeys=" + numLargeFileKeys + ", smallFileNumKeys="
+          + numSmallFileKeys + ", expFileNumKeys=" + numExpectedKeys + ", largeFileSize="
+          + sizeLarge + ", smallFileSize=" + sizeSmall);
+    }
+
+    private String createOverlapString(int size, long count) {
+      StringBuilder sb = new StringBuilder();
+      Random random = new Random();
+      for (int i = 0; i < size; i++) {
+        int r = Math.abs(random.nextInt()) % 26;
+        // Random a-z followed by the count
+        sb.append((char) (97 + r));
+      }
+      sb.append("_").append(getContext().getTaskIndex()).append("_").append(count);
+      return sb.toString();
+    }
+
+    private String createNonOverlaptring(int size, long count) {
+      StringBuilder sb = new StringBuilder();
+      Random random = new Random();
+      for (int i = 0; i < size; i++) {
+        int r = Math.abs(random.nextInt()) % 26;
+        // Random A-Z followed by the count
+        sb.append((char) (65 + r));
+      }
+      sb.append("_").append(getContext().getTaskIndex()).append("_").append(count);
+      return sb.toString();
+    }
+
+  }
+
+  private void setupURIsForCredentials(DAG dag, Path... paths) throws IOException {
+    List<URI> uris = new LinkedList<URI>();
+    for (Path path : paths) {
+      FileSystem fs = path.getFileSystem(getConf());
+      Path qPath = fs.makeQualified(path);
+      uris.add(qPath.toUri());
+    }
+    dag.addURIsForCredentials(uris);
+  }
+
+  private int checkOutputDirectory(FileSystem fs, Path path) throws IOException {
+    if (fs.exists(path)) {
+      System.err.println("Output directory: " + path + " already exists");
+      return 2;
+    }
+    return 0;
+  }
+
+  private byte[] createPayloadForOutput(Path outputPath, Configuration srcConf) throws IOException
{
+    Configuration conf = new Configuration(srcConf);
+    conf.set(FileOutputFormat.OUTDIR, outputPath.toUri().toString());
+    byte[] payload = MROutput.createUserPayload(conf, TextOutputFormat.class.getName(), true);
+    return payload;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/60905cd2/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
index 43565be..aa9b9a2 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
@@ -70,7 +70,6 @@ import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.Reader;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
-import org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter;
 import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
 import org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
@@ -255,10 +254,9 @@ public class IntersectExample extends Configured implements Tool {
       Reader rawReader = input.getReader();
       Preconditions.checkState(rawReader instanceof KeyValueReader);
       LogicalOutput output = getOutputs().values().iterator().next();
-      Preconditions.checkState(output instanceof OnFileUnorderedPartitionedKVOutput);
 
       KeyValueReader reader = (KeyValueReader) rawReader;
-      KeyValueWriter writer = (UnorderedPartitionedKVWriter) output.getWriter();
+      KeyValueWriter writer = (KeyValueWriter) output.getWriter();
 
       while (reader.next()) {
         Object val = reader.getCurrentValue();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/60905cd2/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
new file mode 100644
index 0000000..537b5cd
--- /dev/null
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
+import org.apache.tez.mapreduce.examples.IntersectExample.ForwardingProcessor;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.input.ShuffledMergedInput;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+public class IntersectValidate extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(IntersectExample.class);
+
+  private static final String LHS_INPUT_NAME = "lhsfile";
+  private static final String RHS_INPUT_NAME = "rhsfile";
+
+  private static final String COUNTER_GROUP_NAME = "INTERSECT_VALIDATE";
+  private static final String MISSING_KEY_COUNTER_NAME = "MISSING_KEY_EXISTS";
+
+  public static void main(String[] args) throws Exception {
+    IntersectValidate validate = new IntersectValidate();
+    int status = ToolRunner.run(new Configuration(), validate, args);
+    System.exit(status);
+  }
+
+  private static void printUsage() {
+    System.err.println("Usage: " + "intersectvalidate <path1> <path2>");
+    ToolRunner.printGenericCommandUsage(System.err);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Configuration conf = getConf();
+    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+
+    if (otherArgs.length != 3 && otherArgs.length != 2) {
+      printUsage();
+      return 2;
+    }
+    return execute(otherArgs);
+  }
+
+  private int execute(String[] args) throws IOException, TezException, InterruptedException
{
+    LOG.info("Running IntersectValidate");
+    TezConfiguration tezConf = new TezConfiguration(getConf());
+    UserGroupInformation.setConfiguration(tezConf);
+
+    String lhsDir = args[0];
+    String rhsDir = args[1];
+    int numPartitions = 1;
+    if (args.length == 3) {
+      numPartitions = Integer.parseInt(args[2]);
+    }
+
+    if (numPartitions <= 0) {
+      System.err.println("NumPartitions must be > 0");
+      return 2;
+    }
+
+    Path lhsPath = new Path(lhsDir);
+    Path rhsPath = new Path(rhsDir);
+
+    AMConfiguration amConfiguration = new AMConfiguration(null, null, tezConf, null);
+    TezSessionConfiguration sessionConfiguration = new TezSessionConfiguration(amConfiguration,
+        tezConf);
+    TezSession tezSession = new TezSession("IntersectExampleSession", sessionConfiguration);
+    try {
+      tezSession.start();
+
+      DAG dag = createDag(tezConf, lhsPath, rhsPath, numPartitions);
+      setupURIsForCredentials(dag, lhsPath, rhsPath);
+
+      tezSession.waitTillReady();
+      DAGClient dagClient = tezSession.submitDAG(dag);
+      DAGStatus dagStatus = dagClient.waitForCompletionWithAllStatusUpdates(null);
+      if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+        LOG.info("DAG diagnostics: " + dagStatus.getDiagnostics());
+        return -1;
+      } else {
+        dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
+        TezCounter counter = dagStatus.getDAGCounters().findCounter(COUNTER_GROUP_NAME,
+            MISSING_KEY_COUNTER_NAME);
+        if (counter == null) {
+          LOG.info("Unable to determing equality");
+          return -1;
+        } else {
+          if (counter.getValue() != 0) {
+            LOG.info("Validate failed. The two sides are not equivalent");
+            return -1;
+          } else {
+            LOG.info("Vlidation successful. The two sides are equivalent");
+            return 0;
+          }
+        }
+      }
+    } finally {
+      tezSession.stop();
+    }
+  }
+
+  private DAG createDag(TezConfiguration tezConf, Path lhs, Path rhs, int numPartitions)
+      throws IOException {
+    DAG dag = new DAG("IntersectValidate");
+
+    // Configuration for src1
+    Configuration lhsInputConf = new Configuration(tezConf);
+    lhsInputConf.set(FileInputFormat.INPUT_DIR, lhs.toUri().toString());
+    byte[] streamInputPayload = MRInput.createUserPayload(lhsInputConf,
+        TextInputFormat.class.getName(), true, false);
+
+    // Configuration for src2
+    Configuration rhsInputConf = new Configuration(tezConf);
+    rhsInputConf.set(FileInputFormat.INPUT_DIR, rhs.toUri().toString());
+    byte[] hashInputPayload = MRInput.createUserPayload(rhsInputConf,
+        TextInputFormat.class.getName(), true, false);
+
+    // Configuration for intermediate output - shared by Vertex1 and Vertex2
+    // This should only be setting selective keys from the underlying conf. Fix after there's
a
+    // better mechanism to configure the IOs.
+    Configuration intermediateOutputConf = new Configuration(tezConf);
+    intermediateOutputConf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
+        Text.class.getName());
+    intermediateOutputConf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS,
+        NullWritable.class.getName());
+    intermediateOutputConf.set(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS,
+        HashPartitioner.class.getName());
+    byte[] intermediateOutputPayload = TezUtils.createUserPayloadFromConf(intermediateOutputConf);
+
+    Configuration intermediateInputConf = new Configuration(tezConf);
+    intermediateInputConf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
+        Text.class.getName());
+    intermediateInputConf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
+        NullWritable.class.getName());
+    byte[] intermediateInputPayload = TezUtils.createUserPayloadFromConf(intermediateInputConf);
+
+    // Change the way resources are setup - no MRHelpers
+    Vertex lhsVertex = new Vertex(LHS_INPUT_NAME, new ProcessorDescriptor(
+        ForwardingProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf)).setJavaOpts(
+        MRHelpers.getMapJavaOpts(tezConf)).addInput("lhs",
+        new InputDescriptor(MRInput.class.getName()).setUserPayload(streamInputPayload),
+        MRInputAMSplitGenerator.class);
+
+    Vertex rhsVertex = new Vertex(RHS_INPUT_NAME, new ProcessorDescriptor(
+        ForwardingProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf)).setJavaOpts(
+        MRHelpers.getMapJavaOpts(tezConf)).addInput("rhs",
+        new InputDescriptor(MRInput.class.getName()).setUserPayload(hashInputPayload),
+        MRInputAMSplitGenerator.class);
+
+    Vertex intersectValidateVertex = new Vertex("intersectvalidate", new ProcessorDescriptor(
+        IntersectValidateProcessor.class.getName()), numPartitions,
+        MRHelpers.getReduceResource(tezConf)).setJavaOpts(MRHelpers.getReduceJavaOpts(tezConf));
+
+    Edge e1 = new Edge(lhsVertex, intersectValidateVertex, new EdgeProperty(
+        DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+        new OutputDescriptor(OnFileSortedOutput.class.getName())
+            .setUserPayload(intermediateOutputPayload), new InputDescriptor(
+            ShuffledMergedInput.class.getName()).setUserPayload(intermediateInputPayload)));
+
+    Edge e2 = new Edge(rhsVertex, intersectValidateVertex, new EdgeProperty(
+        DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+        new OutputDescriptor(OnFileSortedOutput.class.getName())
+            .setUserPayload(intermediateOutputPayload), new InputDescriptor(
+            ShuffledMergedInput.class.getName()).setUserPayload(intermediateInputPayload)));
+
+    dag.addVertex(lhsVertex).addVertex(rhsVertex).addVertex(intersectValidateVertex).addEdge(e1)
+        .addEdge(e2);
+    return dag;
+  }
+
+  public static class IntersectValidateProcessor extends SimpleProcessor {
+
+    private static final Log LOG = LogFactory.getLog(IntersectValidateProcessor.class);
+    
+    @Override
+    public void run() throws Exception {
+      Preconditions.checkState(getInputs().size() == 2);
+      Preconditions.checkState(getOutputs().size() == 0);
+      LogicalInput lhsInput = getInputs().get(LHS_INPUT_NAME);
+      LogicalInput rhsInput = getInputs().get(RHS_INPUT_NAME);
+      Reader lhsReaderRaw = lhsInput.getReader();
+      Reader rhsReaderRaw = rhsInput.getReader();
+      Preconditions.checkState(lhsReaderRaw instanceof KeyValuesReader);
+      Preconditions.checkState(rhsReaderRaw instanceof KeyValuesReader);
+      KeyValuesReader lhsReader = (KeyValuesReader) lhsReaderRaw;
+      KeyValuesReader rhsReader = (KeyValuesReader) rhsReaderRaw;
+
+      TezCounter lhsMissingKeyCounter = getContext().getCounters().findCounter(COUNTER_GROUP_NAME,
+          MISSING_KEY_COUNTER_NAME);
+
+      while (lhsReader.next()) {
+        if (rhsReader.next()) {
+          if (!lhsReader.getCurrentKey().equals(rhsReader.getCurrentKey())) {
+            LOG.info("MismatchedKeys: " + "lhs=" + lhsReader.getCurrentKey() + ", rhs=" +
rhsReader.getCurrentKey());
+            lhsMissingKeyCounter.increment(1);
+          }
+        } else {
+          lhsMissingKeyCounter.increment(1);
+          LOG.info("ExtraKey in lhs: " + lhsReader.getClass());
+          break;
+        }
+      }
+      if (rhsReader.next()) {
+        lhsMissingKeyCounter.increment(1);
+        LOG.info("ExtraKey in rhs: " + lhsReader.getClass());
+      }
+    }
+  }
+
+  private void setupURIsForCredentials(DAG dag, Path... paths) throws IOException {
+    List<URI> uris = new LinkedList<URI>();
+    for (Path path : paths) {
+      FileSystem fs = path.getFileSystem(getConf());
+      Path qPath = fs.makeQualified(path);
+      uris.add(qPath.toUri());
+    }
+    dag.addURIsForCredentials(uris);
+  }
+}


Mime
View raw message