crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mkw...@apache.org
Subject crunch git commit: CRUNCH-600: pass job credentials when building multiple outputs
Date Mon, 11 Apr 2016 15:47:18 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 65f39198e -> 02828f8f0


CRUNCH-600: pass job credentials when building multiple outputs

Signed-off-by: Micah Whitacre <mkwhit@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/02828f8f
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/02828f8f
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/02828f8f

Branch: refs/heads/master
Commit: 02828f8f08d7e6b427f3f03a9118bdd7ac6c3342
Parents: 65f3919
Author: Igor Bernstein <igorbernstein@spotify.com>
Authored: Sun Apr 10 15:42:10 2016 -0400
Committer: Micah Whitacre <mkwhit@gmail.com>
Committed: Mon Apr 11 09:47:54 2016 -0500

----------------------------------------------------------------------
 .../it/java/org/apache/crunch/MRPipelineIT.java | 52 ++++++++++++++++++++
 .../org/apache/crunch/io/CrunchOutputs.java     |  3 +-
 2 files changed, 54 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/02828f8f/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java
index 6af3f84..8cda55b 100644
--- a/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java
@@ -25,6 +25,7 @@ import java.io.FileFilter;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URLEncoder;
+import java.util.Map;
 
 import com.google.common.io.Files;
 import org.apache.commons.io.filefilter.SuffixFileFilter;
@@ -33,10 +34,20 @@ import org.apache.crunch.fn.FilterFns;
 import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.plan.PlanningParameters;
+import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.To;
+import org.apache.crunch.io.text.TextFileTarget;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PType;
 import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -109,4 +120,45 @@ public class MRPipelineIT implements Serializable {
     String regex = ".*_\\d{4}-\\d{2}-\\d{2}_\\d{2}\\.\\d{2}\\.\\d{2}\\.\\d{3}_jobplan\\.dot";
     assertTrue("DOT file name '" + fileName + "' did not match regex '" + regex + "'.", fileName.matches(regex));
   }
+
+  @Test
+  public void testJobCredentials() throws IOException {
+    Pipeline pipeline = new MRPipeline(MRPipelineIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<String> lines = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"));
+
+    pipeline.write(lines, new SecretTextFileTarget(tmpDir.getFile("output").getAbsolutePath()));
+
+    PipelineResult pipelineResult = pipeline.done();
+    assertTrue(pipelineResult.succeeded());
+  }
+
+  private static class SecretTextFileTarget extends TextFileTarget {
+    public SecretTextFileTarget(String path) {
+      super(path);
+    }
+
+    @Override
+    public Target outputConf(String key, String value) {
+      return super.outputConf(key, value);
+    }
+
+    @Override
+    public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String
name) {
+      Converter converter = ptype.getConverter();
+      Class keyClass = converter.getKeyClass();
+      Class valueClass = converter.getValueClass();
+      FormatBundle fb = FormatBundle.forOutput(SecretTextOutputFormat.class);
+      configureForMapReduce(job, keyClass, valueClass, fb, outputPath, name);
+
+      job.getCredentials().addSecretKey(new Text("secret"), "myPassword".getBytes());
+    }
+  }
+  private static class SecretTextOutputFormat extends TextOutputFormat {
+    @Override
+    public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException
{
+      byte[] secret = job.getCredentials().getSecretKey(new Text("secret"));
+      assertEquals("job credentials did not match", "myPassword", new String(secret));
+      return super.getRecordWriter(job);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/02828f8f/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
index 653a401..a9621ba 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
@@ -21,6 +21,7 @@ import com.google.common.collect.Sets;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
@@ -203,7 +204,7 @@ public class CrunchOutputs<K, V> {
 
   private static Job getJob(JobID jobID, String namedOutput, Configuration baseConf)
       throws IOException {
-    Job job = new Job(new Configuration(baseConf));
+    Job job = new Job(new JobConf(baseConf));
     job.getConfiguration().set("crunch.namedoutput", namedOutput);
     setJobID(job, jobID, namedOutput);
     return job;


Mime
View raw message