Return-Path: X-Original-To: apmail-pig-commits-archive@www.apache.org Delivered-To: apmail-pig-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9BAE1797A for ; Tue, 19 Jul 2011 01:15:39 +0000 (UTC) Received: (qmail 18170 invoked by uid 500); 19 Jul 2011 01:15:39 -0000 Delivered-To: apmail-pig-commits-archive@pig.apache.org Received: (qmail 18085 invoked by uid 500); 19 Jul 2011 01:15:39 -0000 Mailing-List: contact commits-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list commits@pig.apache.org Received: (qmail 18054 invoked by uid 99); 19 Jul 2011 01:15:38 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Jul 2011 01:15:38 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Jul 2011 01:15:28 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 4B7C6238890D for ; Tue, 19 Jul 2011 01:15:05 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1148118 [1/3] - in /pig/branches/branch-0.9: ./ shims/ shims/src/ shims/src/hadoop20/ shims/src/hadoop20/org/ shims/src/hadoop20/org/apache/ shims/src/hadoop20/org/apache/pig/ shims/src/hadoop20/org/apache/pig/backend/ shims/src/hadoop20/o... Date: Tue, 19 Jul 2011 01:15:02 -0000 To: commits@pig.apache.org From: daijy@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110719011505.4B7C6238890D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: daijy Date: Tue Jul 19 01:14:58 2011 New Revision: 1148118 URL: http://svn.apache.org/viewvc?rev=1148118&view=rev Log: PIG-2125: Make Pig work with hadoop .NEXT Added: pig/branches/branch-0.9/shims/ pig/branches/branch-0.9/shims/src/ pig/branches/branch-0.9/shims/src/hadoop20/ pig/branches/branch-0.9/shims/src/hadoop20/org/ pig/branches/branch-0.9/shims/src/hadoop20/org/apache/ pig/branches/branch-0.9/shims/src/hadoop20/org/apache/pig/ pig/branches/branch-0.9/shims/src/hadoop20/org/apache/pig/backend/ pig/branches/branch-0.9/shims/src/hadoop20/org/apache/pig/backend/hadoop/ pig/branches/branch-0.9/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/ pig/branches/branch-0.9/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ pig/branches/branch-0.9/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java pig/branches/branch-0.9/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java pig/branches/branch-0.9/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/ pig/branches/branch-0.9/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java pig/branches/branch-0.9/shims/src/hadoop23/ pig/branches/branch-0.9/shims/src/hadoop23/org/ pig/branches/branch-0.9/shims/src/hadoop23/org/apache/ pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/ pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/ pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/ pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/ pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/ pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java pig/branches/branch-0.9/shims/test/ pig/branches/branch-0.9/shims/test/hadoop20/ pig/branches/branch-0.9/shims/test/hadoop20/org/ pig/branches/branch-0.9/shims/test/hadoop20/org/apache/ pig/branches/branch-0.9/shims/test/hadoop20/org/apache/pig/ pig/branches/branch-0.9/shims/test/hadoop20/org/apache/pig/test/ pig/branches/branch-0.9/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java pig/branches/branch-0.9/shims/test/hadoop23/ pig/branches/branch-0.9/shims/test/hadoop23/org/ pig/branches/branch-0.9/shims/test/hadoop23/org/apache/ pig/branches/branch-0.9/shims/test/hadoop23/org/apache/pig/ pig/branches/branch-0.9/shims/test/hadoop23/org/apache/pig/test/ pig/branches/branch-0.9/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java pig/branches/branch-0.9/test/org/apache/pig/test/MiniGenericCluster.java Modified: pig/branches/branch-0.9/build.xml pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java pig/branches/branch-0.9/src/org/apache/pig/impl/PigContext.java pig/branches/branch-0.9/src/org/apache/pig/impl/io/PigFile.java pig/branches/branch-0.9/src/org/apache/pig/impl/io/ReadToEndLoader.java pig/branches/branch-0.9/src/org/apache/pig/parser/QueryParserUtils.java pig/branches/branch-0.9/src/overview.html pig/branches/branch-0.9/test/findbugsExcludeFile.xml pig/branches/branch-0.9/test/org/apache/pig/test/MiniCluster.java pig/branches/branch-0.9/test/org/apache/pig/test/TestGrunt.java pig/branches/branch-0.9/test/org/apache/pig/test/TestTypedMap.java Modified: pig/branches/branch-0.9/build.xml URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/build.xml?rev=1148118&r1=1148117&r2=1148118&view=diff ============================================================================== --- pig/branches/branch-0.9/build.xml (original) +++ pig/branches/branch-0.9/build.xml Tue Jul 19 01:14:58 2011 @@ -20,6 +20,8 @@ xmlns:ivy="antlib:org.apache.ivy.ant"> + + @@ -145,6 +147,16 @@ + + + + + + + @@ -194,7 +206,7 @@ - + @@ -349,13 +361,13 @@ *** Else, compile-sources (which only warns about deprecations) target will be executed *** - + - + @@ -369,7 +381,7 @@ *** Else, compile-sources (which only warns about deprecations) target will be executed *** - + @@ -377,7 +389,7 @@ - + @@ -665,7 +677,7 @@ - + Added: pig/branches/branch-0.9/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=1148118&view=auto ============================================================================== --- pig/branches/branch-0.9/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (added) +++ pig/branches/branch-0.9/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Tue Jul 19 01:14:58 2011 @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.PigNullableWritable; +import org.apache.pig.impl.util.Pair; + +abstract public class PigMapBase extends PigGenericMapBase { + /** + * + * Get mapper's illustrator context + * + * @param conf Configuration + * @param input Input bag to serve as data source + * @param output Map output buffer + * @param split the split + * @return Illustrator's context + * @throws IOException + * @throws InterruptedException + */ + @Override + public Context getIllustratorContext(Configuration conf, DataBag input, + List> output, InputSplit split) + throws IOException, InterruptedException { + return new IllustratorContext(conf, input, output, split); + } + + public class IllustratorContext extends Context { + private DataBag input; + List> output; + private Iterator it = null; + private Tuple value = null; + private boolean init = false; + + public IllustratorContext(Configuration conf, DataBag input, + List> output, + InputSplit split) throws IOException, InterruptedException { + super(conf, new TaskAttemptID(), null, null, null, null, split); + if (output == null) + throw new IOException("Null output can not be used"); + this.input = input; this.output = output; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (input == null) { + if (!init) { + init = true; + return true; + } + return false; + } + if (it == null) + it = input.iterator(); + if (!it.hasNext()) + return false; + value = it.next(); + return true; + } + + @Override + public Text getCurrentKey() { + return null; + } + + @Override + public Tuple getCurrentValue() { + return value; + } + + @Override + public void write(PigNullableWritable key, Writable value) + throws IOException, InterruptedException { + output.add(new Pair(key, value)); + } + + @Override + public void progress() { + + } + } +} Added: pig/branches/branch-0.9/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=1148118&view=auto ============================================================================== --- pig/branches/branch-0.9/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (added) +++ pig/branches/branch-0.9/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Tue Jul 19 01:14:58 2011 @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.jobcontrol.Job; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.Reducer.Context; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.impl.io.NullableTuple; +import org.apache.pig.impl.io.PigNullableWritable; +import org.apache.pig.impl.util.Pair; +import org.apache.pig.pen.FakeRawKeyValueIterator; + +public class PigMapReduce extends PigGenericMapReduce { + public static class Reduce extends PigGenericMapReduce.Reduce { + /** + * Get reducer's illustrator context + * + * @param input Input buffer as output by maps + * @param pkg package + * @return reducer's illustrator context + * @throws IOException + * @throws InterruptedException + */ + @Override + public Context getIllustratorContext(Job job, + List> input, POPackage pkg) throws IOException, InterruptedException { + return new IllustratorContext(job, input, pkg); + } + + @SuppressWarnings("unchecked") + public class IllustratorContext extends Context { + private PigNullableWritable currentKey = null, nextKey = null; + private NullableTuple nextValue = null; + private List currentValues = null; + private Iterator> it; + private final ByteArrayOutputStream bos; + private final DataOutputStream dos; + private final RawComparator sortComparator, groupingComparator; + POPackage pack = null; + + public IllustratorContext(Job job, + List> input, + POPackage pkg + ) throws IOException, InterruptedException { + super(job.getJobConf(), new TaskAttemptID(), new FakeRawKeyValueIterator(input.iterator().hasNext()), + null, null, null, null, null, null, PigNullableWritable.class, NullableTuple.class); + bos = new ByteArrayOutputStream(); + dos = new DataOutputStream(bos); + org.apache.hadoop.mapreduce.Job nwJob = new org.apache.hadoop.mapreduce.Job(job.getJobConf()); + sortComparator = nwJob.getSortComparator(); + groupingComparator = nwJob.getGroupingComparator(); + + Collections.sort(input, new Comparator>() { + @Override + public int compare(Pair o1, + Pair o2) { + try { + o1.first.write(dos); + int l1 = bos.size(); + o2.first.write(dos); + int l2 = bos.size(); + byte[] bytes = bos.toByteArray(); + bos.reset(); + return sortComparator.compare(bytes, 0, l1, bytes, l1, l2-l1); + } catch (IOException e) { + throw new RuntimeException("Serialization exception in sort:"+e.getMessage()); + } + } + } + ); + currentValues = new ArrayList(); + it = input.iterator(); + if (it.hasNext()) { + Pair entry = it.next(); + nextKey = entry.first; + nextValue = (NullableTuple) entry.second; + } + pack = pkg; + } + + @Override + public PigNullableWritable getCurrentKey() { + return currentKey; + } + + @Override + public boolean nextKey() { + if (nextKey == null) + return false; + currentKey = nextKey; + currentValues.clear(); + currentValues.add(nextValue); + nextKey = null; + for(; it.hasNext(); ) { + Pair entry = it.next(); + /* Why can't raw comparison be used? + byte[] bytes; + int l1, l2; + try { + currentKey.write(dos); + l1 = bos.size(); + entry.first.write(dos); + l2 = bos.size(); + bytes = bos.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("nextKey exception : "+e.getMessage()); + } + bos.reset(); + if (groupingComparator.compare(bytes, 0, l1, bytes, l1, l2-l1) == 0) + */ + if (groupingComparator.compare(currentKey, entry.first) == 0) + { + currentValues.add((NullableTuple)entry.second); + } else { + nextKey = entry.first; + nextValue = (NullableTuple) entry.second; + break; + } + } + return true; + } + + @Override + public Iterable getValues() { + return currentValues; + } + + @Override + public void write(PigNullableWritable k, Writable t) { + } + + @Override + public void progress() { + } + } + } +} Added: pig/branches/branch-0.9/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1148118&view=auto ============================================================================== --- pig/branches/branch-0.9/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (added) +++ pig/branches/branch-0.9/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Tue Jul 19 01:14:58 2011 @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.shims; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; + +/** + * We need to make Pig work with both hadoop 20 and hadoop 23 (PIG-2125). However, + * there is API differences between hadoop 20 and 23. Here we use a shims layer to + * hide these API differences. A dynamic shims layer is not possible due to some + * static dependencies. We adopt a static shimes approach. For different hadoop version, + * we need to recompile. + * + * This class wrapping all static method. PigMapReduce, PigMapBase, MiniCluster wrapping hadoop + * version dependant implementaton of PigGenericMapReduce, PigGenericMapBase and MiniGenericCluster. + **/ +public class HadoopShims { + static public JobContext cloneJobContext(JobContext original) throws IOException, InterruptedException { + JobContext newContext = new JobContext(original.getConfiguration(), original.getJobID()); + return newContext; + } + + static public TaskAttemptContext createTaskAttemptContext(Configuration conf, + TaskAttemptID taskId) { + TaskAttemptContext newContext = new TaskAttemptContext(conf, + taskId); + return newContext; + } + + static public JobContext createJobContext(Configuration conf, + JobID jobId) { + JobContext newJobContext = new JobContext( + conf, jobId); + return newJobContext; + } + + static public boolean isMap(TaskAttemptID taskAttemptID) { + return taskAttemptID.isMap(); + } +} Added: pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=1148118&view=auto ============================================================================== --- pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (added) +++ pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Tue Jul 19 01:14:58 2011 @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; + + +import java.io.IOException; +import java.net.URI; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configuration.IntegerRanges; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.security.Credentials; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.PigNullableWritable; +import org.apache.pig.impl.util.Pair; + +abstract public class PigMapBase extends PigGenericMapBase { + /** + * + * Get mapper's illustrator context + * + * @param conf Configuration + * @param input Input bag to serve as data source + * @param output Map output buffer + * @param split the split + * @return Illustrator's context + * @throws IOException + * @throws InterruptedException + */ + @Override + public Context getIllustratorContext(Configuration conf, DataBag input, + List> output, InputSplit split) + throws IOException, InterruptedException { + return new IllustratorContext(conf, input, output, split); + } + + public class IllustratorContext extends Context { + private DataBag input; + List> output; + private Iterator it = null; + private Tuple value = null; + private boolean init = false; + + public IllustratorContext(Configuration conf, DataBag input, + List> output, + InputSplit split) throws IOException, InterruptedException { + if (output == null) + throw new IOException("Null output can not be used"); + this.input = input; this.output = output; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (input == null) { + if (!init) { + init = true; + return true; + } + return false; + } + if (it == null) + it = input.iterator(); + if (!it.hasNext()) + return false; + value = it.next(); + return true; + } + + @Override + public Text getCurrentKey() { + return null; + } + + @Override + public Tuple getCurrentValue() { + return value; + } + + @Override + public void write(PigNullableWritable key, Writable value) + throws IOException, InterruptedException { + output.add(new Pair(key, value)); + } + + @Override + public void progress() { + + } + + @Override + public InputSplit getInputSplit() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Counter getCounter(Enum arg0) { + // TODO Auto-generated method stub + return null; + } + + @Override + public Counter getCounter(String arg0, String arg1) { + // TODO Auto-generated method stub + return null; + } + + @Override + public OutputCommitter getOutputCommitter() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getStatus() { + // TODO Auto-generated method stub + return null; + } + + @Override + public TaskAttemptID getTaskAttemptID() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setStatus(String arg0) { + // TODO Auto-generated method stub + + } + + @Override + public Path[] getArchiveClassPaths() { + // TODO Auto-generated method stub + return null; + } + + @Override + public long[] getArchiveTimestamps() { + // TODO Auto-generated method stub + return null; + } + + @Override + public URI[] getCacheArchives() throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public URI[] getCacheFiles() throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Class> getCombinerClass() + throws ClassNotFoundException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Configuration getConfiguration() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Credentials getCredentials() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Path[] getFileClassPaths() { + // TODO Auto-generated method stub + return null; + } + + @Override + public long[] getFileTimestamps() { + // TODO Auto-generated method stub + return null; + } + + @Override + public RawComparator getGroupingComparator() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Class> getInputFormatClass() + throws ClassNotFoundException { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getJar() { + // TODO Auto-generated method stub + return null; + } + + @Override + public JobID getJobID() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getJobName() { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean getJobSetupCleanupNeeded() { + // TODO Auto-generated method stub + return false; + } + + @Override + public Path[] getLocalCacheArchives() throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Path[] getLocalCacheFiles() throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Class getMapOutputKeyClass() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Class getMapOutputValueClass() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Class> getMapperClass() + throws ClassNotFoundException { + // TODO Auto-generated method stub + return null; + } + + @Override + public int getMaxMapAttempts() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int getMaxReduceAttempts() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int getNumReduceTasks() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public Class> getOutputFormatClass() + throws ClassNotFoundException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Class getOutputKeyClass() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Class getOutputValueClass() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Class> getPartitionerClass() + throws ClassNotFoundException { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean getProfileEnabled() { + // TODO Auto-generated method stub + return false; + } + + @Override + public String getProfileParams() { + // TODO Auto-generated method stub + return null; + } + + @Override + public IntegerRanges getProfileTaskRange(boolean arg0) { + // TODO Auto-generated method stub + return null; + } + + @Override + public Class> getReducerClass() + throws ClassNotFoundException { + // TODO Auto-generated method stub + return null; + } + + @Override + public RawComparator getSortComparator() { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean getSymlink() { + // TODO Auto-generated method stub + return false; + } + + @Override + public String getUser() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Path getWorkingDirectory() throws IOException { + // TODO Auto-generated method stub + return null; + } + } +} Added: pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=1148118&view=auto ============================================================================== --- pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (added) +++ pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Tue Jul 19 01:14:58 2011 @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configuration.IntegerRanges; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.jobcontrol.Job; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.Reducer.Context; +import org.apache.hadoop.security.Credentials; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.impl.io.NullableTuple; +import org.apache.pig.impl.io.PigNullableWritable; +import org.apache.pig.impl.util.Pair; +import org.apache.pig.pen.FakeRawKeyValueIterator; + +public class PigMapReduce extends PigGenericMapReduce { + public static class Reduce extends PigGenericMapReduce.Reduce { + /** + * Get reducer's illustrator context + * + * @param input Input buffer as output by maps + * @param pkg package + * @return reducer's illustrator context + * @throws IOException + * @throws InterruptedException + */ + @Override + public Context getIllustratorContext(Job job, + List> input, POPackage pkg) throws IOException, InterruptedException { + return new IllustratorContext(job, input, pkg); + } + + @SuppressWarnings("unchecked") + public class IllustratorContext extends Context { + private PigNullableWritable currentKey = null, nextKey = null; + private NullableTuple nextValue = null; + private List currentValues = null; + private Iterator> it; + private final ByteArrayOutputStream bos; + private final DataOutputStream dos; + private final RawComparator sortComparator, groupingComparator; + POPackage pack = null; + + public IllustratorContext(Job job, + List> input, + POPackage pkg + ) throws IOException, InterruptedException { + bos = new ByteArrayOutputStream(); + dos = new DataOutputStream(bos); + org.apache.hadoop.mapreduce.Job nwJob = new org.apache.hadoop.mapreduce.Job(job.getJobConf()); + sortComparator = nwJob.getSortComparator(); + groupingComparator = nwJob.getGroupingComparator(); + + Collections.sort(input, new Comparator>() { + @Override + public int compare(Pair o1, + Pair o2) { + try { + o1.first.write(dos); + int l1 = bos.size(); + o2.first.write(dos); + int l2 = bos.size(); + byte[] bytes = bos.toByteArray(); + bos.reset(); + return sortComparator.compare(bytes, 0, l1, bytes, l1, l2-l1); + } catch (IOException e) { + throw new RuntimeException("Serialization exception in sort:"+e.getMessage()); + } + } + } + ); + currentValues = new ArrayList(); + it = input.iterator(); + if (it.hasNext()) { + Pair entry = it.next(); + nextKey = entry.first; + nextValue = (NullableTuple) entry.second; + } + pack = pkg; + } + + @Override + public PigNullableWritable getCurrentKey() { + return currentKey; + } + + @Override + public boolean nextKey() { + if (nextKey == null) + return false; + currentKey = nextKey; + currentValues.clear(); + currentValues.add(nextValue); + nextKey = null; + for(; it.hasNext(); ) { + Pair entry = it.next(); + /* Why can't raw comparison be used? + byte[] bytes; + int l1, l2; + try { + currentKey.write(dos); + l1 = bos.size(); + entry.first.write(dos); + l2 = bos.size(); + bytes = bos.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("nextKey exception : "+e.getMessage()); + } + bos.reset(); + if (groupingComparator.compare(bytes, 0, l1, bytes, l1, l2-l1) == 0) + */ + if (groupingComparator.compare(currentKey, entry.first) == 0) + { + currentValues.add((NullableTuple)entry.second); + } else { + nextKey = entry.first; + nextValue = (NullableTuple) entry.second; + break; + } + } + return true; + } + + @Override + public Iterable getValues() { + return currentValues; + } + + @Override + public void write(PigNullableWritable k, Writable t) { + } + + @Override + public void progress() { + } + + @Override + public Counter getCounter(Enum arg0) { + // TODO Auto-generated method stub + return null; + } + + @Override + public Counter getCounter(String arg0, String arg1) { + // TODO Auto-generated method stub + return null; + } + + @Override + public NullableTuple getCurrentValue() throws IOException, + InterruptedException { + // TODO Auto-generated method stub + return null; + } + + @Override + public OutputCommitter getOutputCommitter() { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean nextKeyValue() throws IOException, + InterruptedException { + // TODO Auto-generated method stub + return false; + } + + @Override + public String getStatus() { + // TODO Auto-generated method stub + return null; + } + + @Override + public TaskAttemptID getTaskAttemptID() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setStatus(String arg0) { + // TODO Auto-generated method stub + + } + + @Override + public Path[] getArchiveClassPaths() { + // TODO Auto-generated method stub + return null; + } + + @Override + public long[] getArchiveTimestamps() { + // TODO Auto-generated method stub + return null; + } + + @Override + public URI[] getCacheArchives() throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public URI[] getCacheFiles() throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Class> getCombinerClass() + throws ClassNotFoundException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Configuration getConfiguration() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Credentials getCredentials() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Path[] getFileClassPaths() { + // TODO Auto-generated method stub + return null; + } + + @Override + public long[] getFileTimestamps() { + // TODO Auto-generated method stub + return null; + } + + @Override + public RawComparator getGroupingComparator() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Class> getInputFormatClass() + throws ClassNotFoundException { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getJar() { + // TODO Auto-generated method stub + return null; + } + + @Override + public JobID getJobID() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getJobName() { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean getJobSetupCleanupNeeded() { + // TODO Auto-generated method stub + return false; + } + + @Override + public Path[] getLocalCacheArchives() throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Path[] getLocalCacheFiles() throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Class getMapOutputKeyClass() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Class getMapOutputValueClass() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Class> getMapperClass() + throws ClassNotFoundException { + // TODO Auto-generated method stub + return null; + } + + @Override + public int getMaxMapAttempts() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int getMaxReduceAttempts() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int getNumReduceTasks() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public Class> getOutputFormatClass() + throws ClassNotFoundException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Class getOutputKeyClass() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Class getOutputValueClass() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Class> getPartitionerClass() + throws ClassNotFoundException { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean getProfileEnabled() { + // TODO Auto-generated method stub + return false; + } + + @Override + public String getProfileParams() { + // TODO Auto-generated method stub + return null; + } + + @Override + public IntegerRanges getProfileTaskRange(boolean arg0) { + // TODO Auto-generated method stub + return null; + } + + @Override + public Class> getReducerClass() + throws ClassNotFoundException { + // TODO Auto-generated method stub + return null; + } + + @Override + public RawComparator getSortComparator() { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean getSymlink() { + // TODO Auto-generated method stub + return false; + } + + @Override + public String getUser() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Path getWorkingDirectory() throws IOException { + // TODO Auto-generated method stub + return null; + } + } + } +} Added: pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1148118&view=auto ============================================================================== --- pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (added) +++ pig/branches/branch-0.9/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Tue Jul 19 01:14:58 2011 @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.shims; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.ContextFactory; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +public class HadoopShims { + static public JobContext cloneJobContext(JobContext original) throws IOException, InterruptedException { + JobContext newContext = ContextFactory.cloneContext(original, original.getConfiguration()); + return newContext; + } + + static public TaskAttemptContext createTaskAttemptContext(Configuration conf, + TaskAttemptID taskId) { + TaskAttemptContext newContext = new TaskAttemptContextImpl(conf, taskId); + return newContext; + } + + static public JobContext createJobContext(Configuration conf, + JobID jobId) { + JobContext newContext = new JobContextImpl(conf, jobId); + return newContext; + } + + static public boolean isMap(TaskAttemptID taskAttemptID) { + TaskType type = taskAttemptID.getTaskType(); + if (type==TaskType.MAP) + return true; + + return false; + } +} Added: pig/branches/branch-0.9/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java?rev=1148118&view=auto ============================================================================== --- pig/branches/branch-0.9/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java (added) +++ pig/branches/branch-0.9/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java Tue Jul 19 01:14:58 2011 @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapred.MiniMRCluster; + +public class MiniCluster extends MiniGenericCluster { + private MiniMRCluster m_mr = null; + public MiniCluster() { + super(); + } + + @Override + protected void setupMiniDfsAndMrClusters() { + try { + final int dataNodes = 4; // There will be 4 data nodes + final int taskTrackers = 4; // There will be 4 task tracker nodes + + // Create the configuration hadoop-site.xml file + File conf_dir = new File(System.getProperty("user.home"), "pigtest/conf/"); + conf_dir.mkdirs(); + File conf_file = new File(conf_dir, "hadoop-site.xml"); + + conf_file.delete(); + + // Builds and starts the mini dfs and mapreduce clusters + Configuration config = new Configuration(); + m_dfs = new MiniDFSCluster(config, dataNodes, true, null); + m_fileSys = m_dfs.getFileSystem(); + m_mr = new MiniMRCluster(taskTrackers, m_fileSys.getUri().toString(), 1); + + // Write the necessary config info to hadoop-site.xml + m_conf = m_mr.createJobConf(); + m_conf.setInt("mapred.submit.replication", 2); + m_conf.set("dfs.datanode.address", "0.0.0.0:0"); + m_conf.set("dfs.datanode.http.address", "0.0.0.0:0"); + m_conf.set("mapred.map.max.attempts", "2"); + m_conf.set("mapred.reduce.max.attempts", "2"); + m_conf.writeXml(new FileOutputStream(conf_file)); + + // Set the system properties needed by Pig + System.setProperty("cluster", m_conf.get("mapred.job.tracker")); + System.setProperty("namenode", m_conf.get("fs.default.name")); + System.setProperty("junit.hadoop.conf", conf_dir.getPath()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + protected void shutdownMiniMrClusters() { + if (m_mr != null) { m_mr.shutdown(); } + m_mr = null; + } +} Added: pig/branches/branch-0.9/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java?rev=1148118&view=auto ============================================================================== --- pig/branches/branch-0.9/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java (added) +++ pig/branches/branch-0.9/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java Tue Jul 19 01:14:58 2011 @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.filecache.DistributedCache; +import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; +import org.apache.hadoop.mapreduce.v2.TestMRJobs; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; + +/** + * This class builds a single instance of itself with the Singleton + * design pattern. While building the single instance, it sets up a + * mini cluster that actually consists of a mini DFS cluster and a + * mini MapReduce cluster on the local machine and also sets up the + * environment for Pig to run on top of the mini cluster. + */ +public class MiniCluster extends MiniGenericCluster { + protected MiniMRYarnCluster m_mr = null; + private Configuration m_dfs_conf = null; + private Configuration m_mr_conf = null; + + public MiniCluster() { + super(); + } + + @Override + protected void setupMiniDfsAndMrClusters() { + Logger.getLogger("org.apache.hadoop").setLevel(Level.INFO); + try { + final int dataNodes = 4; // There will be 4 data nodes + final int taskTrackers = 4; // There will be 4 task tracker nodes + + Logger.getRootLogger().setLevel(Level.TRACE); + // Create the configuration hadoop-site.xml file + File conf_dir = new File(System.getProperty("user.home"), "pigtest/conf/"); + conf_dir.mkdirs(); + File conf_file = new File(conf_dir, "hadoop-site.xml"); + + conf_file.delete(); + + // Builds and starts the mini dfs and mapreduce clusters + Configuration config = new Configuration(); + m_dfs = new MiniDFSCluster(config, dataNodes, true, null); + m_fileSys = m_dfs.getFileSystem(); + m_dfs_conf = m_dfs.getConfiguration(0); + + m_mr = new MiniMRYarnCluster("PigMiniCluster"); + m_mr.init(new Configuration()); + //m_mr.init(m_dfs_conf); + m_mr.start(); + + // Write the necessary config info to hadoop-site.xml + //m_mr_conf = m_mr.getConfig(); + m_mr_conf = new Configuration(m_mr.getConfig()); + + m_conf = m_mr_conf; + m_conf.set("fs.default.name", m_dfs_conf.get("fs.default.name")); + + /* + try { + DistributedCache.addCacheFile(new URI("file:///hadoop-mapreduce-client-app-1.0-SNAPSHOT.jar"), m_conf); + DistributedCache.addCacheFile(new URI("file:///hadoop-mapreduce-client-jobclient-1.0-SNAPSHOT.jar"), m_conf); + DistributedCache.addCacheFile(new URI("file:///pig.jar"), m_conf); + } catch (Exception e) { + e.printStackTrace(); + } + */ + m_dfs.getFileSystem().copyFromLocalFile(new Path("file:///hadoop-mapreduce-client-app-1.0-SNAPSHOT.jar"), new Path("/hadoop-mapreduce-client-app-1.0-SNAPSHOT.jar")); + m_dfs.getFileSystem().copyFromLocalFile(new Path("file:///hadoop-mapreduce-client-jobclient-1.0-SNAPSHOT.jar"), new Path("/hadoop-mapreduce-client-jobclient-1.0-SNAPSHOT.jar")); + m_dfs.getFileSystem().copyFromLocalFile(new Path("file:///pig.jar"), new Path("/pig.jar")); + m_dfs.getFileSystem().copyFromLocalFile(new Path("file:///pig-test.jar"), new Path("/pig-test.jar")); + + DistributedCache.addFileToClassPath(new Path("/hadoop-mapreduce-client-app-1.0-SNAPSHOT.jar"), m_conf); + DistributedCache.addFileToClassPath(new Path("/pig.jar"), m_conf); + DistributedCache.addFileToClassPath(new Path("/pig-test.jar"), m_conf); + DistributedCache.addFileToClassPath(new Path("/hadoop-mapreduce-client-jobclient-1.0-SNAPSHOT.jar"), m_conf); + + //ConfigurationUtil.mergeConf(m_conf, m_dfs_conf); + //ConfigurationUtil.mergeConf(m_conf, m_mr_conf); + + m_conf.setInt("mapred.submit.replication", 2); + m_conf.set("dfs.datanode.address", "0.0.0.0:0"); + m_conf.set("dfs.datanode.http.address", "0.0.0.0:0"); + m_conf.set("mapred.map.max.attempts", "2"); + m_conf.set("mapred.reduce.max.attempts", "2"); + m_conf.writeXml(new FileOutputStream(conf_file)); + +// try { +// Thread.sleep(1000*1000); +// } catch (InterruptedException e) { +// // TODO Auto-generated catch block +// e.printStackTrace(); +// } + + System.err.println("XXX: Setting fs.default.name to: " + m_dfs_conf.get("fs.default.name")); + // Set the system properties needed by Pig + System.setProperty("cluster", m_conf.get("mapred.job.tracker")); + //System.setProperty("namenode", m_dfs_conf.get("fs.default.name")); + System.setProperty("namenode", m_conf.get("fs.default.name")); + System.setProperty("junit.hadoop.conf", conf_dir.getPath()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + protected void shutdownMiniMrClusters() { + if (m_mr != null) { m_mr.stop(); } + m_mr = null; + } +} Modified: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java?rev=1148118&r1=1148117&r2=1148118&view=diff ============================================================================== --- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java (original) +++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java Tue Jul 19 01:14:58 2011 @@ -25,6 +25,8 @@ import java.util.Properties; import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; +import org.apache.pig.ExecType; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; public class ConfigurationUtil { @@ -65,8 +67,13 @@ public class ConfigurationUtil { } public static Properties getLocalFSProperties() { - Configuration localConf = new Configuration(false); - localConf.addResource("core-default.xml"); + Configuration localConf; + if (PigMapReduce.sJobContext!=null && PigMapReduce.sJobContext.getConfiguration().get("exectype").equals(ExecType.LOCAL.toString())) { + localConf = new Configuration(false); + localConf.addResource("core-default.xml"); + } else { + localConf = new Configuration(true); + } Properties props = ConfigurationUtil.toProperties(localConf); props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); return props; Modified: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1148118&r1=1148117&r2=1148118&view=diff ============================================================================== --- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original) +++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Tue Jul 19 01:14:58 2011 @@ -64,10 +64,11 @@ import org.apache.pig.pen.POOptimizeDisa public class HExecutionEngine { public static final String JOB_TRACKER_LOCATION = "mapred.job.tracker"; - private static final String FILE_SYSTEM_LOCATION = "fs.default.name"; + private static final String FILE_SYSTEM_LOCATION = "fs.defaultFS"; private static final String HADOOP_SITE = "hadoop-site.xml"; private static final String CORE_SITE = "core-site.xml"; + private static final String YARN_SITE = "yarn-site.xml"; private final Log log = LogFactory.getLog(getClass()); public static final String LOCAL = "local"; @@ -155,6 +156,7 @@ public class HExecutionEngine { jc = new JobConf(); jc.addResource("pig-cluster-hadoop-site.xml"); + jc.addResource(YARN_SITE); // Trick to invoke static initializer of DistributedFileSystem to add hdfs-default.xml // into configuration Modified: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1148118&r1=1148117&r2=1148118&view=diff ============================================================================== --- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Tue Jul 19 01:14:58 2011 @@ -83,6 +83,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.data.DataType; import org.apache.pig.impl.PigContext; @@ -1337,8 +1338,7 @@ public class MRCompiler extends PhyPlanV Job job = new Job(conf); loader.setLocation(location, job); InputFormat inf = loader.getInputFormat(); - List splits = inf.getSplits(new JobContext( - job.getConfiguration(), job.getJobID())); + List splits = inf.getSplits(HadoopShims.cloneJobContext(job)); List> results = MapRedUtil .getCombinePigSplits(splits, fs .getDefaultBlockSize(), conf); Modified: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1148118&r1=1148117&r2=1148118&view=diff ============================================================================== --- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original) +++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Tue Jul 19 01:14:58 2011 @@ -55,6 +55,8 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; +import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.plan.CompilationMessageCollector; @@ -563,7 +565,7 @@ public class MapReduceLauncher extends L * @throws IOException */ private void storeSchema(Job job, POStore st) throws IOException { - JobContext jc = new JobContext(job.getJobConf(), + JobContext jc = HadoopShims.createJobContext(job.getJobConf(), new org.apache.hadoop.mapreduce.JobID()); JobContext updatedJc = PigOutputCommitter.setUpContext(jc, st); PigOutputCommitter.storeCleanup(st, updatedJc.getConfiguration()); Modified: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=1148118&r1=1148117&r2=1148118&view=diff ============================================================================== --- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java (original) +++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java Tue Jul 19 01:14:58 2011 @@ -28,6 +28,8 @@ import org.apache.hadoop.mapreduce.TaskI import org.apache.pig.StoreFuncInterface; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; +import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.tools.pigstats.PigStatsUtil; import org.apache.pig.tools.pigstats.PigStatusReporter; /** @@ -58,7 +60,8 @@ public class MapReducePOStoreImpl extend // task (map or reduce) we could have multiple stores, we should // make this copy so that the same context does not get over-written // by the different stores. - this.context = new TaskAttemptContext(outputConf, + + this.context = HadoopShims.createTaskAttemptContext(outputConf, context.getTaskAttemptID()); } Added: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1148118&view=auto ============================================================================== --- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (added) +++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Tue Jul 19 01:14:58 2011 @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Iterator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.Mapper.Context; +import org.apache.log4j.PropertyConfigurator; +import org.apache.pig.PigException; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.PigNullableWritable; +import org.apache.pig.impl.plan.DependencyOrderWalker; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.util.ObjectSerializer; +import org.apache.pig.impl.util.SpillableMemoryManager; +import org.apache.pig.impl.util.Pair; +import org.apache.pig.tools.pigstats.PigStatusReporter; + +/** + * This class is the base class for PigMapBase, which has slightly + * difference among different versions of hadoop. PigMapBase implementation + * is located in $PIG_HOME/shims. +**/ +public abstract class PigGenericMapBase extends Mapper { + private static final Tuple DUMMYTUPLE = null; + + private final Log log = LogFactory.getLog(getClass()); + + protected byte keyType; + + //Map Plan + protected PhysicalPlan mp = null; + + // Store operators + protected List stores; + + protected TupleFactory tf = TupleFactory.getInstance(); + + boolean inIllustrator = false; + + Context outputCollector; + + // Reporter that will be used by operators + // to transmit heartbeat + ProgressableReporter pigReporter; + + protected boolean errorInMap = false; + + PhysicalOperator[] roots; + + private PhysicalOperator leaf; + + PigContext pigContext = null; + private volatile boolean initialized = false; + + /** + * for local map/reduce simulation + * @param plan the map plan + */ + public void setMapPlan(PhysicalPlan plan) { + mp = plan; + } + + /** + * Will be called when all the tuples in the input + * are done. So reporter thread should be closed. + */ + @Override + public void cleanup(Context context) throws IOException, InterruptedException { + super.cleanup(context); + if(errorInMap) { + //error in map - returning + return; + } + + if(PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP, "false").equals("true")) { + // If there is a stream in the pipeline or if this map job belongs to merge-join we could + // potentially have more to process - so lets + // set the flag stating that all map input has been sent + // already and then lets run the pipeline one more time + // This will result in nothing happening in the case + // where there is no stream or it is not a merge-join in the pipeline + mp.endOfAllInput = true; + runPipeline(leaf); + } + + for (POStore store: stores) { + if (!initialized) { + MapReducePOStoreImpl impl + = new MapReducePOStoreImpl(context); + store.setStoreImpl(impl); + store.setUp(); + } + store.tearDown(); + } + + //Calling EvalFunc.finish() + UDFFinishVisitor finisher = new UDFFinishVisitor(mp, new DependencyOrderWalker(mp)); + try { + finisher.visit(); + } catch (VisitorException e) { + int errCode = 2121; + String msg = "Error while calling finish method on UDFs."; + throw new VisitorException(msg, errCode, PigException.BUG, e); + } + + mp = null; + + PhysicalOperator.setReporter(null); + initialized = false; + } + + /** + * Configures the mapper with the map plan and the + * reproter thread + */ + @SuppressWarnings("unchecked") + @Override + public void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + + Configuration job = context.getConfiguration(); + SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job)); + PigMapReduce.sJobContext = context; + PigMapReduce.sJobConfInternal.set(context.getConfiguration()); + PigMapReduce.sJobConf = context.getConfiguration(); + inIllustrator = (context instanceof PigMapBase.IllustratorContext); + + PigContext.setPackageImportList((ArrayList)ObjectSerializer.deserialize(job.get("udf.import.list"))); + pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext")); + if (pigContext.getLog4jProperties()!=null) + PropertyConfigurator.configure(pigContext.getLog4jProperties()); + + if (mp == null) + mp = (PhysicalPlan) ObjectSerializer.deserialize( + job.get("pig.mapPlan")); + stores = PlanHelper.getStores(mp); + + // To be removed + if(mp.isEmpty()) + log.debug("Map Plan empty!"); + else{ + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + mp.explain(baos); + log.debug(baos.toString()); + } + keyType = ((byte[])ObjectSerializer.deserialize(job.get("pig.map.keytype")))[0]; + // till here + + pigReporter = new ProgressableReporter(); + // Get the UDF specific context + MapRedUtil.setupUDFContext(job); + + if(!(mp.isEmpty())) { + + PigSplit split = (PigSplit)context.getInputSplit(); + List targetOpKeys = split.getTargetOps(); + + ArrayList targetOpsAsList = new ArrayList(); + for (OperatorKey targetKey : targetOpKeys) { + targetOpsAsList.add(mp.getOperator(targetKey)); + } + roots = targetOpsAsList.toArray(new PhysicalOperator[1]); + leaf = mp.getLeaves().get(0); + } + + PigStatusReporter.setContext(context); + + } + + /** + * The map function that attaches the inpTuple appropriately + * and executes the map plan if its not empty. Collects the + * result of execution into oc or the input directly to oc + * if map plan empty. The collection is left abstract for the + * map-only or map-reduce job to implement. Map-only collects + * the tuple as-is whereas map-reduce collects it after extracting + * the key and indexed tuple. + */ + @Override + protected void map(Text key, Tuple inpTuple, Context context) throws IOException, InterruptedException { + if(!initialized) { + initialized = true; + // cache the collector for use in runPipeline() which + // can be called from close() + this.outputCollector = context; + pigReporter.setRep(context); + PhysicalOperator.setReporter(pigReporter); + + for (POStore store: stores) { + MapReducePOStoreImpl impl + = new MapReducePOStoreImpl(context); + store.setStoreImpl(impl); + if (!pigContext.inIllustrator) + store.setUp(); + } + + boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning")); + + PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); + pigHadoopLogger.setAggregate(aggregateWarning); + pigHadoopLogger.setReporter(PigStatusReporter.getInstance()); + + PhysicalOperator.setPigLogger(pigHadoopLogger); + } + + if (mp.isEmpty()) { + collect(context,inpTuple); + return; + } + + for (PhysicalOperator root : roots) { + if (inIllustrator) { + if (root != null) { + root.attachInput(inpTuple); + } + } else { + root.attachInput(tf.newTupleNoCopy(inpTuple.getAll())); + } + } + + runPipeline(leaf); + } + + protected void runPipeline(PhysicalOperator leaf) throws IOException, InterruptedException { + while(true){ + Result res = leaf.getNext(DUMMYTUPLE); + if(res.returnStatus==POStatus.STATUS_OK){ + collect(outputCollector,(Tuple)res.result); + continue; + } + + if(res.returnStatus==POStatus.STATUS_EOP) { + return; + } + + if(res.returnStatus==POStatus.STATUS_NULL) + continue; + + if(res.returnStatus==POStatus.STATUS_ERR){ + // remember that we had an issue so that in + // close() we can do the right thing + errorInMap = true; + // if there is an errmessage use it + String errMsg; + if(res.result != null) { + errMsg = "Received Error while " + + "processing the map plan: " + res.result; + } else { + errMsg = "Received Error while " + + "processing the map plan."; + } + + int errCode = 2055; + ExecException ee = new ExecException(errMsg, errCode, PigException.BUG); + throw ee; + } + } + + } + + abstract public void collect(Context oc, Tuple tuple) throws InterruptedException, IOException; + + /** + * @return the keyType + */ + public byte getKeyType() { + return keyType; + } + + /** + * @param keyType the keyType to set + */ + public void setKeyType(byte keyType) { + this.keyType = keyType; + } + + abstract public Context getIllustratorContext(Configuration conf, DataBag input, + List> output, InputSplit split) + throws IOException, InterruptedException; +}