Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 91C15101C2 for ; Thu, 18 Apr 2013 23:56:47 +0000 (UTC) Received: (qmail 86872 invoked by uid 500); 18 Apr 2013 23:56:47 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 86849 invoked by uid 500); 18 Apr 2013 23:56:47 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 86842 invoked by uid 99); 18 Apr 2013 23:56:47 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Apr 2013 23:56:47 +0000 X-ASF-Spam-Status: No, hits=-1996.5 required=5.0 tests=ALL_TRUSTED,URIBL_BLACK X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Apr 2013 23:56:16 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 6B1222388CAD; Thu, 18 Apr 2013 23:54:58 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1469642 [34/36] - in /incubator/tez/branches/TEZ-1: ./ example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/ example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/ tez-common/src/main/ tez-common/src/main/java/ t... Date: Thu, 18 Apr 2013 23:54:28 -0000 To: commits@tez.incubator.apache.org From: hitesh@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130418235458.6B1222388CAD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java?rev=1469642&view=auto ============================================================================== --- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java (added) +++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java Thu Apr 18 23:54:18 2013 @@ -0,0 +1,111 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.mapreduce.processor; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.Reporter; +import org.apache.tez.common.TezTaskReporter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.mapreduce.hadoop.mapred.MRCounters; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class MRTaskReporter + extends org.apache.hadoop.mapreduce.StatusReporter + implements Reporter { + + private final TezTaskReporterImpl reporter; + + private InputSplit split = null; + + public MRTaskReporter(TezTaskReporter reporter) { + this.reporter = (TezTaskReporterImpl)reporter; + } + + // getters and setters for flag + void setProgressFlag() { + reporter.setProgressFlag(); + } + boolean resetProgressFlag() { + return reporter.resetProgressFlag(); + } + public void setStatus(String status) { + reporter.setStatus(status); + } + public void setProgress(float progress) { + reporter.setProgress(progress); + } + + public float getProgress() { + return reporter.getProgress(); + }; + + public void progress() { + reporter.progress(); + } + + public Counters.Counter getCounter(String group, String name) { + TezCounter counter = reporter.getCounter(group, name); + MRCounters.MRCounter mrCounter = null; + if (counter != null) { + mrCounter = new MRCounters.MRCounter(counter); + } + return mrCounter; + } + + public Counters.Counter getCounter(Enum name) { + TezCounter counter = reporter.getCounter(name); + MRCounters.MRCounter mrCounter = null; + if (counter != null) { + mrCounter = new MRCounters.MRCounter(counter); + } + return mrCounter; + } + + public void incrCounter(Enum key, long amount) { + reporter.incrCounter(key, amount); + } + + public void incrCounter(String group, String counter, long amount) { + reporter.incrCounter(group, counter, amount); + } + + public void setInputSplit(InputSplit split) { + this.split = split; + } + + public InputSplit getInputSplit() throws UnsupportedOperationException { + if (split == null) { + throw new UnsupportedOperationException("Input only available on map"); + } else { + return split; + } + } + + public void startCommunicationThread() { + reporter.startCommunicationThread(); + } + + public void stopCommunicationThread() throws InterruptedException { + reporter.stopCommunicationThread(); + } +} Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java?rev=1469642&view=auto ============================================================================== --- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java (added) +++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java Thu Apr 18 23:54:18 2013 @@ -0,0 +1,271 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.mapreduce.processor; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.util.Progress; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.engine.records.TezTaskAttemptID; +import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate; +import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +class TezTaskReporterImpl + implements org.apache.tez.common.TezTaskReporter, Runnable { + + private static final Log LOG = LogFactory.getLog(TezTaskReporterImpl.class); + + private final MRTask mrTask; + private final TezTaskUmbilicalProtocol umbilical; + private final Progress taskProgress; + + private Thread pingThread = null; + private boolean done = true; + private Object lock = new Object(); + + /** + * flag that indicates whether progress update needs to be sent to parent. + * If true, it has been set. If false, it has been reset. + * Using AtomicBoolean since we need an atomic read & reset method. + */ + private AtomicBoolean progressFlag = new AtomicBoolean(false); + + TezTaskReporterImpl(MRTask mrTask, TezTaskUmbilicalProtocol umbilical) { + this.mrTask = mrTask; + this.umbilical = umbilical; + this.taskProgress = mrTask.getProgress(); + } + + // getters and setters for flag + void setProgressFlag() { + progressFlag.set(true); + } + + boolean resetProgressFlag() { + return progressFlag.getAndSet(false); + } + + public void setStatus(String status) { + // FIXME - BADLY + if (true) { + return; + } + taskProgress.setStatus( + MRTask.normalizeStatus(status, this.mrTask.jobConf)); + // indicate that progress update needs to be sent + setProgressFlag(); + } + + public void setProgress(float progress) { + // set current phase progress. + // This method assumes that task has phases. + taskProgress.phase().set(progress); + // indicate that progress update needs to be sent + setProgressFlag(); + } + + public float getProgress() { + return taskProgress.getProgress(); + }; + + public void progress() { + // indicate that progress update needs to be sent + setProgressFlag(); + } + + public TezCounter getCounter(String group, String name) { + return this.mrTask.counters == null ? + null : + this.mrTask.counters.findCounter(group, name); + } + + public TezCounter getCounter(Enum name) { + return this.mrTask.counters == null ? + null : + this.mrTask.counters.findCounter(name); + } + + public void incrCounter(Enum key, long amount) { + if (this.mrTask.counters != null) { + this.mrTask.counters.findCounter(key).increment(amount); + } + setProgressFlag(); + } + + public void incrCounter(String group, String counter, long amount) { + if (this.mrTask.counters != null) { + this.mrTask.counters.findCounter(group, counter).increment(amount); + } + setProgressFlag(); + } + + /** + * The communication thread handles communication with the parent (Task Tracker). + * It sends progress updates if progress has been made or if the task needs to + * let the parent know that it's alive. It also pings the parent to see if it's alive. + */ + public void run() { + final int MAX_RETRIES = 3; + int remainingRetries = MAX_RETRIES; + // get current flag value and reset it as well + boolean sendProgress = resetProgressFlag(); + while (!this.mrTask.taskDone.get()) { + synchronized (lock) { + done = false; + } + try { + boolean taskFound = true; // whether TT knows about this task + // sleep for a bit + synchronized(lock) { + if (this.mrTask.taskDone.get()) { + break; + } + lock.wait(MRTask.PROGRESS_INTERVAL); + } + if (this.mrTask.taskDone.get()) { + break; + } + + if (sendProgress) { + // we need to send progress update + this.mrTask.updateCounters(); + this.mrTask.getStatus().statusUpdate( + taskProgress.get(), + taskProgress.toString(), + this.mrTask.counters); + taskFound = + umbilical.statusUpdate( + this.mrTask.getTaskAttemptId(), this.mrTask.getStatus()); + this.mrTask.getStatus().clearStatus(); + } + else { + // send ping + taskFound = umbilical.ping(this.mrTask.getTaskAttemptId()); + } + + // if Task Tracker is not aware of our task ID (probably because it died and + // came back up), kill ourselves + if (!taskFound) { + MRTask.LOG.warn("Parent died. Exiting " + this.mrTask.getTaskAttemptId()); + resetDoneFlag(); + System.exit(66); + } + + sendProgress = resetProgressFlag(); + remainingRetries = MAX_RETRIES; + } + catch (Throwable t) { + MRTask.LOG.info("Communication exception: " + StringUtils.stringifyException(t)); + remainingRetries -=1; + if (remainingRetries == 0) { + ReflectionUtils.logThreadInfo(MRTask.LOG, "Communication exception", 0); + MRTask.LOG.warn("Last retry, killing " + this.mrTask.getTaskAttemptId()); + resetDoneFlag(); + System.exit(65); + } + } + } + //Notify that we are done with the work + resetDoneFlag(); + } + void resetDoneFlag() { + synchronized (lock) { + done = true; + lock.notify(); + } + } + public void startCommunicationThread() { + if (pingThread == null) { + pingThread = new Thread(this, "communication thread"); + pingThread.setDaemon(true); + pingThread.start(); + } + } + public void stopCommunicationThread() throws InterruptedException { + if (pingThread != null) { + // Intent of the lock is to not send an interupt in the middle of an + // umbilical.ping or umbilical.statusUpdate + synchronized(lock) { + //Interrupt if sleeping. Otherwise wait for the RPC call to return. + lock.notify(); + } + + synchronized (lock) { + while (!done) { + lock.wait(); + } + } + pingThread.interrupt(); + pingThread.join(); + } + } + + @Override + public TezTaskDependencyCompletionEventsUpdate getDependentTasksCompletionEvents( + int fromEventIdx, int maxEventsToFetch, + TezTaskAttemptID reduce) { + return umbilical.getDependentTasksCompletionEvents( + fromEventIdx, maxEventsToFetch, reduce); + } + + @Override + public void reportFatalError(TezTaskAttemptID taskAttemptId, + Throwable throwable, String logMsg) { + LOG.fatal(logMsg); + Throwable tCause = throwable.getCause(); + String cause = tCause == null + ? StringUtils.stringifyException(throwable) + : StringUtils.stringifyException(tCause); + try { + umbilical.fatalError(mrTask.getTaskAttemptId(), cause); + } catch (IOException ioe) { + LOG.fatal("Failed to contact the tasktracker", ioe); + System.exit(-1); + } + } + + public TezTaskUmbilicalProtocol getUmbilical() { + return umbilical; + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + // TODO TEZAM3 + return 1; + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + return ProtocolSignature.getProtocolSignature(this, protocol, + clientVersion, clientMethodsHash); + } +} Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java?rev=1469642&view=auto ============================================================================== --- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java (added) +++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java Thu Apr 18 23:54:18 2013 @@ -0,0 +1,377 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.mapreduce.processor.map; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.MapRunnable; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.map.WrappedMapper; +import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex; +import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; +import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez; +import org.apache.hadoop.util.Progress; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.tez.common.TezTask; +import org.apache.tez.common.TezTaskStatus; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.engine.api.Input; +import org.apache.tez.engine.api.Master; +import org.apache.tez.engine.api.Output; +import org.apache.tez.engine.api.Processor; +import org.apache.tez.engine.common.sort.SortingOutput; +import org.apache.tez.mapreduce.hadoop.IDConverter; +import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.tez.mapreduce.input.SimpleInput; +import org.apache.tez.mapreduce.output.SimpleOutput; +import org.apache.tez.mapreduce.processor.MRTask; +import org.apache.tez.mapreduce.processor.MRTaskReporter; + +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + +@SuppressWarnings({ "unchecked", "rawtypes" }) +public class MapProcessor extends MRTask implements Processor { + + private static final Log LOG = LogFactory.getLog(MapProcessor.class); + + private Progress mapPhase; + + @Inject + public MapProcessor( + @Assisted TezTask context + ) throws IOException { + super(context); + } + + + + @Override + public void initialize(Configuration conf, Master master) throws IOException, + InterruptedException { + super.initialize(conf, master); + TaskSplitMetaInfo[] allMetaInfo = readSplits(); + TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[tezTaskContext + .getTaskAttemptId().getTaskID().getId()]; + splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(), + thisTaskMetaInfo.getStartOffset()); + } + + @Override + public void process( + final Input in, + final Output out) + throws IOException, InterruptedException { + MRTaskReporter reporter = new MRTaskReporter(getTaskReporter()); + boolean useNewApi = jobConf.getUseNewMapper(); + initTask(jobConf, getDAGID(), reporter, useNewApi); + + if (in instanceof SimpleInput) { + ((SimpleInput)in).setTask(this); + } + + if (out instanceof SimpleOutput) { + ((SimpleOutput)out).setTask(this); + } else if (out instanceof SortingOutput) { + ((SortingOutput)out).setTask(this); + } + + + in.initialize(jobConf, getTaskReporter()); + out.initialize(jobConf, getTaskReporter()); + + // If there are no reducers then there won't be any sort. Hence the map + // phase will govern the entire attempt's progress. + if (jobConf.getNumReduceTasks() == 0) { + mapPhase = getProgress().addPhase("map", 1.0f); + } else { + // If there are reducers then the entire attempt's progress will be + // split between the map phase (67%) and the sort phase (33%). + mapPhase = getProgress().addPhase("map", 0.667f); + } + + // Sanity check + if (!(in instanceof SimpleInput)) { + throw new IOException("Unknown input! - " + in.getClass()); + } + SimpleInput input = (SimpleInput)in; + + if (useNewApi) { + runNewMapper(jobConf, reporter, input, out, getTaskReporter()); + } else { + runOldMapper(jobConf, reporter, input, out, getTaskReporter()); + } + + done(out.getOutputContext(), reporter); + } + + public void close() throws IOException, InterruptedException { + // TODO Auto-generated method stub + + } + + void runOldMapper( + final JobConf job, + final MRTaskReporter reporter, + final SimpleInput input, + final Output output, + final Master master + ) throws IOException, InterruptedException { + + RecordReader in = new OldRecordReader(input); + + int numReduceTasks = job.getNumReduceTasks(); + LOG.info("numReduceTasks: " + numReduceTasks); + + OutputCollector collector = new OldOutputCollector(output); + + MapRunnable runner = + (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job); + + try { + runner.run(in, collector, (Reporter)reporter); + mapPhase.complete(); + // start the sort phase only if there are reducers + if (numReduceTasks > 0) { + setPhase(TezTaskStatus.Phase.SORT); + } + this.statusUpdate(); + } finally { + //close + in.close(); // close input + output.close(); + } + } + + private void runNewMapper(final JobConf job, + MRTaskReporter reporter, + final SimpleInput in, + Output out, + final Master master + ) throws IOException, InterruptedException { + // make a task context so we can get the classes + org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = + new TaskAttemptContextImpl(job, getTaskAttemptId(), reporter); + + // make a mapper + org.apache.hadoop.mapreduce.Mapper mapper; + try { + mapper = (org.apache.hadoop.mapreduce.Mapper) + ReflectionUtils.newInstance(taskContext.getMapperClass(), job); + } catch (ClassNotFoundException cnfe) { + throw new IOException(cnfe); + } + + if (!(in instanceof SimpleInput)) { + throw new IOException("Unknown input! - " + in.getClass()); + } + + org.apache.hadoop.mapreduce.RecordReader input = + new NewRecordReader(in); + + org.apache.hadoop.mapreduce.RecordWriter output = + new NewOutputCollector(out); + + org.apache.hadoop.mapreduce.InputSplit split = in.getNewInputSplit(); + + org.apache.hadoop.mapreduce.MapContext + mapContext = + new org.apache.tez.mapreduce.hadoop.mapreduce.MapContextImpl( + job, IDConverter.toMRTaskAttemptId(getTaskAttemptId()), + input, output, + getCommitter(), + reporter, split); + + org.apache.hadoop.mapreduce.Mapper.Context mapperContext = + new WrappedMapper().getMapContext(mapContext); + + input.initialize(split, mapperContext); + mapper.run(mapperContext); + mapPhase.complete(); + setPhase(TezTaskStatus.Phase.SORT); + this.statusUpdate(); + input.close(); + output.close(mapperContext); + } + + private static class NewRecordReader extends + org.apache.hadoop.mapreduce.RecordReader { + private final SimpleInput in; + + private NewRecordReader(SimpleInput in) { + this.in = in; + } + + @Override + public void initialize(org.apache.hadoop.mapreduce.InputSplit split, + TaskAttemptContext context) throws IOException, + InterruptedException { + in.initializeNewRecordReader(split, context); + } + + @Override + public boolean nextKeyValue() throws IOException, + InterruptedException { + return in.hasNext(); + } + + @Override + public Object getCurrentKey() throws IOException, + InterruptedException { + return in.getNextKey(); + } + + @Override + public Object getCurrentValue() throws IOException, + InterruptedException { + return in.getNextValues().iterator().next(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return in.getProgress(); + } + + @Override + public void close() throws IOException { + in.close(); + } + } + + private static class OldRecordReader implements RecordReader { + private final SimpleInput simpleInput; + + private OldRecordReader(SimpleInput simpleInput) { + this.simpleInput = simpleInput; + } + + @Override + public boolean next(Object key, Object value) throws IOException { + simpleInput.setKey(key); + simpleInput.setValue(value); + try { + return simpleInput.hasNext(); + } catch (InterruptedException ie) { + throw new IOException(ie); + } + } + + @Override + public Object createKey() { + return simpleInput.getOldRecordReader().createKey(); + } + + @Override + public Object createValue() { + return simpleInput.getOldRecordReader().createValue(); + } + + @Override + public long getPos() throws IOException { + return simpleInput.getOldRecordReader().getPos(); + } + + @Override + public void close() throws IOException { + simpleInput.close(); + } + + @Override + public float getProgress() throws IOException { + try { + return simpleInput.getProgress(); + } catch (InterruptedException ie) { + throw new IOException(ie); + } + } + } + + private static class OldOutputCollector + implements OutputCollector { + private final Output output; + + OldOutputCollector(Output output) { + this.output = output; + } + + public void collect(Object key, Object value) throws IOException { + try { + output.write(key, value); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException("interrupt exception", ie); + } + } + } + + private class NewOutputCollector + extends org.apache.hadoop.mapreduce.RecordWriter { + private final Output out; + + NewOutputCollector(Output out) throws IOException { + this.out = out; + } + + @Override + public void write(Object key, Object value) throws IOException, InterruptedException { + out.write(key, value); + } + + @Override + public void close(TaskAttemptContext context + ) throws IOException, InterruptedException { + out.close(); + } + } + + @Override + public void localizeConfiguration(JobConf jobConf) + throws IOException, InterruptedException { + super.localizeConfiguration(jobConf); + jobConf.setBoolean(JobContext.TASK_ISMAP, true); + } + + @Override + public TezCounter getOutputRecordsCounter() { + return reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS); + } + + @Override + public TezCounter getInputRecordsCounter() { + return reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS); + + } + + protected TaskSplitMetaInfo[] readSplits() throws IOException { + TaskSplitMetaInfo[] allTaskSplitMetaInfo; + allTaskSplitMetaInfo = SplitMetaInfoReaderTez.readSplitMetaInfo(getConf(), + FileSystem.getLocal(getConf())); + return allTaskSplitMetaInfo; + } + +} Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java?rev=1469642&view=auto ============================================================================== --- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java (added) +++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java Thu Apr 18 23:54:18 2013 @@ -0,0 +1,347 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.mapreduce.processor.reduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.Progress; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.tez.common.TezEngineTask; +import org.apache.tez.common.TezJobConfig; +import org.apache.tez.common.TezTask; +import org.apache.tez.common.TezTaskStatus; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.engine.api.Input; +import org.apache.tez.engine.api.Master; +import org.apache.tez.engine.api.Output; +import org.apache.tez.engine.api.Processor; +import org.apache.tez.engine.common.ConfigUtils; +import org.apache.tez.engine.common.sort.SortingOutput; +import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator; +import org.apache.tez.engine.lib.input.ShuffledMergedInput; +import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol; +import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.tez.mapreduce.input.SimpleInput; +import org.apache.tez.mapreduce.output.SimpleOutput; +import org.apache.tez.mapreduce.processor.MRTask; +import org.apache.tez.mapreduce.processor.MRTaskReporter; + +import com.google.common.base.Preconditions; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + +@SuppressWarnings({ "unchecked", "rawtypes" }) +public class ReduceProcessor +extends MRTask +implements Processor { + + private static final Log LOG = LogFactory.getLog(ReduceProcessor.class); + + private Progress sortPhase; + private Progress reducePhase; + + private Counter reduceInputKeyCounter; + private Counter reduceInputValueCounter; + private int numMapTasks; + + @Inject + public ReduceProcessor( + @Assisted TezTask context + ) { + super(context); + TezEngineTask tezEngineContext = (TezEngineTask) context; + Preconditions.checkNotNull(tezEngineContext.getInputSpecList(), + "InputSpecList should not be null"); + Preconditions.checkArgument( + tezEngineContext.getInputSpecList().size() == 1, + "Expected exactly one input, found : " + + tezEngineContext.getInputSpecList().size()); + this.numMapTasks = tezEngineContext.getInputSpecList().get(0) + .getNumInputs(); + } + + @Override + public void initialize(Configuration conf, Master master) throws IOException, + InterruptedException { + super.initialize(conf, master); + + } + + @Override + public void process(Input in, Output out) + throws IOException, InterruptedException { + MRTaskReporter reporter = new MRTaskReporter(getTaskReporter()); + boolean useNewApi = jobConf.getUseNewMapper(); + initTask(jobConf, getDAGID(), reporter, useNewApi); + + if (in instanceof SimpleInput) { + ((SimpleInput)in).setTask(this); + } else if (in instanceof ShuffledMergedInput) { + ((ShuffledMergedInput)in).setTask(this); + } + + if (out instanceof SimpleOutput) { + ((SimpleOutput)out).setTask(this); + } else if (out instanceof SortingOutput) { + ((SortingOutput)out).setTask(this); + } + + in.initialize(jobConf, getTaskReporter()); + out.initialize(jobConf, getTaskReporter()); + + sortPhase = getProgress().addPhase("sort"); + reducePhase = getProgress().addPhase("reduce"); + sortPhase.complete(); // sort is complete + setPhase(TezTaskStatus.Phase.REDUCE); + + this.statusUpdate(); + + Class keyClass = ConfigUtils.getMapOutputKeyClass(jobConf); + Class valueClass = ConfigUtils.getMapOutputValueClass(jobConf); + RawComparator comparator = + ConfigUtils.getOutputValueGroupingComparator(jobConf); + + reduceInputKeyCounter = + reporter.getCounter(TaskCounter.REDUCE_INPUT_GROUPS); + reduceInputValueCounter = + reporter.getCounter(TaskCounter.REDUCE_INPUT_RECORDS); + + // Sanity check + if (!(in instanceof ShuffledMergedInput)) { + throw new IOException("Illegal input to reduce: " + in.getClass()); + } + ShuffledMergedInput shuffleInput = (ShuffledMergedInput)in; + + if (useNewApi) { + try { + runNewReducer( + jobConf, + (TezTaskUmbilicalProtocol)getUmbilical(), reporter, + shuffleInput, comparator, keyClass, valueClass, + out); + } catch (ClassNotFoundException cnfe) { + throw new IOException(cnfe); + } + } else { + runOldReducer( + jobConf, (TezTaskUmbilicalProtocol)getUmbilical(), reporter, + shuffleInput, comparator, keyClass, valueClass, out); + } + + done(out.getOutputContext(), reporter); + } + + public void close() throws IOException, InterruptedException { + // TODO Auto-generated method stub + + } + + void runOldReducer(JobConf job, + TezTaskUmbilicalProtocol umbilical, + final MRTaskReporter reporter, + ShuffledMergedInput input, + RawComparator comparator, + Class keyClass, + Class valueClass, + final Output output) throws IOException, InterruptedException { + + Reducer reducer = + ReflectionUtils.newInstance(job.getReducerClass(), job); + + // make output collector + + OutputCollector collector = + new OutputCollector() { + public void collect(Object key, Object value) + throws IOException { + try { + output.write(key, value); + } catch (InterruptedException ie) { + throw new IOException(ie); + } + } + }; + + // apply reduce function + try { + ReduceValuesIterator values = + new ReduceValuesIterator( + input, + job.getOutputValueGroupingComparator(), keyClass, valueClass, + job, reporter, reduceInputValueCounter, reducePhase); + + values.informReduceProgress(); + while (values.more()) { + reduceInputKeyCounter.increment(1); + reducer.reduce(values.getKey(), values, collector, reporter); + values.nextKey(); + values.informReduceProgress(); + } + + //Clean up: repeated in catch block below + reducer.close(); + output.close(); + //End of clean up. + } catch (IOException ioe) { + try { + reducer.close(); + } catch (IOException ignored) {} + + try { + output.close(); + } catch (IOException ignored) {} + + throw ioe; + } + } + + private static class ReduceValuesIterator + extends org.apache.tez.engine.common.task.impl.ValuesIterator { + private Counter reduceInputValueCounter; + private Progress reducePhase; + + public ReduceValuesIterator (ShuffledMergedInput in, + RawComparator comparator, + Class keyClass, + Class valClass, + Configuration conf, Progressable reporter, + Counter reduceInputValueCounter, + Progress reducePhase) + throws IOException { + super(in.getIterator(), comparator, keyClass, valClass, conf, reporter); + this.reduceInputValueCounter = reduceInputValueCounter; + this.reducePhase = reducePhase; + } + + @Override + public VALUE next() { + reduceInputValueCounter.increment(1); + return moveToNext(); + } + + protected VALUE moveToNext() { + return super.next(); + } + + public void informReduceProgress() { + reducePhase.set(super.in.getProgress().getProgress()); // update progress + reporter.progress(); + } + } + + void runNewReducer(JobConf job, + final TezTaskUmbilicalProtocol umbilical, + final MRTaskReporter reporter, + ShuffledMergedInput input, + RawComparator comparator, + Class keyClass, + Class valueClass, + final Output out + ) throws IOException,InterruptedException, + ClassNotFoundException { + // wrap value iterator to report progress. + final TezRawKeyValueIterator rawIter = input.getIterator(); + TezRawKeyValueIterator rIter = new TezRawKeyValueIterator() { + public void close() throws IOException { + rawIter.close(); + } + public DataInputBuffer getKey() throws IOException { + return rawIter.getKey(); + } + public Progress getProgress() { + return rawIter.getProgress(); + } + public DataInputBuffer getValue() throws IOException { + return rawIter.getValue(); + } + public boolean next() throws IOException { + boolean ret = rawIter.next(); + reporter.setProgress(rawIter.getProgress().getProgress()); + return ret; + } + }; + + // make a task context so we can get the classes + org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = + new TaskAttemptContextImpl(job, getTaskAttemptId(), reporter); + + // make a reducer + org.apache.hadoop.mapreduce.Reducer reducer = + (org.apache.hadoop.mapreduce.Reducer) + ReflectionUtils.newInstance(taskContext.getReducerClass(), job); + + org.apache.hadoop.mapreduce.RecordWriter trackedRW = + new org.apache.hadoop.mapreduce.RecordWriter() { + + @Override + public void write(Object key, Object value) throws IOException, + InterruptedException { + out.write(key, value); + } + + @Override + public void close(TaskAttemptContext context) throws IOException, + InterruptedException { + out.close(); + } + }; + + org.apache.hadoop.mapreduce.Reducer.Context reducerContext = + createReduceContext( + reducer, job, getTaskAttemptId(), + rIter, reduceInputKeyCounter, + reduceInputValueCounter, + trackedRW, + committer, + reporter, comparator, keyClass, + valueClass); + reducer.run(reducerContext); + trackedRW.close(reducerContext); + } + + @Override + public void localizeConfiguration(JobConf jobConf) + throws IOException, InterruptedException { + super.localizeConfiguration(jobConf); + jobConf.setBoolean(JobContext.TASK_ISMAP, false); + jobConf.setInt(TezJobConfig.TEZ_ENGINE_TASK_INDEGREE, numMapTasks); + } + + @Override + public TezCounter getOutputRecordsCounter() { + return reporter.getCounter(TaskCounter.REDUCE_OUTPUT_RECORDS); + } + + @Override + public TezCounter getInputRecordsCounter() { + return reporter.getCounter(TaskCounter.REDUCE_INPUT_GROUPS); + } +} Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java?rev=1469642&view=auto ============================================================================== --- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java (added) +++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java Thu Apr 18 23:54:18 2013 @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.mapreduce.task; + +import org.apache.tez.engine.api.Input; +import org.apache.tez.engine.api.Output; +import org.apache.tez.engine.api.Processor; +import org.apache.tez.engine.api.Task; +import org.apache.tez.engine.lib.input.ShuffledMergedInput; +import org.apache.tez.engine.runtime.InputFactory; +import org.apache.tez.engine.runtime.TezEngineFactory; +import org.apache.tez.engine.runtime.TezEngineFactoryImpl; +import org.apache.tez.engine.runtime.OutputFactory; +import org.apache.tez.engine.runtime.ProcessorFactory; +import org.apache.tez.engine.runtime.TaskFactory; +import org.apache.tez.engine.task.RuntimeTask; +import org.apache.tez.mapreduce.output.SimpleOutput; +import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor; + +import com.google.inject.AbstractModule; +import com.google.inject.assistedinject.FactoryModuleBuilder; + +public class FinalTask extends AbstractModule { + + @Override + protected void configure() { + install( + new FactoryModuleBuilder().implement( + Output.class, SimpleOutput.class). + build(OutputFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Input.class, ShuffledMergedInput.class). + build(InputFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Processor.class, ReduceProcessor.class). + build(ProcessorFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Task.class, RuntimeTask.class). + build(TaskFactory.class) + ); + + bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class); + } + +} Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java?rev=1469642&view=auto ============================================================================== --- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java (added) +++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java Thu Apr 18 23:54:18 2013 @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.mapreduce.task; + +import org.apache.tez.engine.api.Input; +import org.apache.tez.engine.api.Output; +import org.apache.tez.engine.api.Processor; +import org.apache.tez.engine.api.Task; +import org.apache.tez.engine.lib.output.OnFileSortedOutput; +import org.apache.tez.engine.runtime.InputFactory; +import org.apache.tez.engine.runtime.TezEngineFactory; +import org.apache.tez.engine.runtime.TezEngineFactoryImpl; +import org.apache.tez.engine.runtime.OutputFactory; +import org.apache.tez.engine.runtime.ProcessorFactory; +import org.apache.tez.engine.runtime.TaskFactory; +import org.apache.tez.engine.task.RuntimeTask; +import org.apache.tez.mapreduce.input.SimpleInput; +import org.apache.tez.mapreduce.processor.map.MapProcessor; + +import com.google.inject.AbstractModule; +import com.google.inject.assistedinject.FactoryModuleBuilder; + +public class InitialTask extends AbstractModule { + + @Override + protected void configure() { + install( + new FactoryModuleBuilder().implement( + Input.class, SimpleInput.class). + build(InputFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Output.class, OnFileSortedOutput.class). + build(OutputFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Processor.class, MapProcessor.class). + build(ProcessorFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Task.class, RuntimeTask.class). + build(TaskFactory.class) + ); + + bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class); + } + +} Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java?rev=1469642&view=auto ============================================================================== --- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java (added) +++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java Thu Apr 18 23:54:18 2013 @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.mapreduce.task; + +import org.apache.tez.engine.api.Input; +import org.apache.tez.engine.api.Output; +import org.apache.tez.engine.api.Processor; +import org.apache.tez.engine.api.Task; +import org.apache.tez.engine.lib.output.InMemorySortedOutput; +import org.apache.tez.engine.runtime.InputFactory; +import org.apache.tez.engine.runtime.TezEngineFactory; +import org.apache.tez.engine.runtime.TezEngineFactoryImpl; +import org.apache.tez.engine.runtime.OutputFactory; +import org.apache.tez.engine.runtime.ProcessorFactory; +import org.apache.tez.engine.runtime.TaskFactory; +import org.apache.tez.engine.task.RuntimeTask; +import org.apache.tez.mapreduce.input.SimpleInput; +import org.apache.tez.mapreduce.processor.map.MapProcessor; + +import com.google.inject.AbstractModule; +import com.google.inject.assistedinject.FactoryModuleBuilder; + +public class InitialTaskWithInMemSort extends AbstractModule { +// TODO EVENTUALLY - have all types subclass a single parent instead of AbstractModule. + + @Override + protected void configure() { + install( + new FactoryModuleBuilder().implement( + Input.class, SimpleInput.class). + build(InputFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Output.class, InMemorySortedOutput.class). + build(OutputFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Processor.class, MapProcessor.class). + build(ProcessorFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Task.class, RuntimeTask.class). + build(TaskFactory.class) + ); + + bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class); + } + +} Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java?rev=1469642&view=auto ============================================================================== --- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java (added) +++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java Thu Apr 18 23:54:18 2013 @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.mapreduce.task; + +import org.apache.tez.engine.api.Input; +import org.apache.tez.engine.api.Output; +import org.apache.tez.engine.api.Processor; +import org.apache.tez.engine.api.Task; +import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput; +import org.apache.tez.engine.runtime.InputFactory; +import org.apache.tez.engine.runtime.TezEngineFactory; +import org.apache.tez.engine.runtime.TezEngineFactoryImpl; +import org.apache.tez.engine.runtime.OutputFactory; +import org.apache.tez.engine.runtime.ProcessorFactory; +import org.apache.tez.engine.runtime.TaskFactory; +import org.apache.tez.engine.task.RuntimeTask; +import org.apache.tez.mapreduce.input.SimpleInput; +import org.apache.tez.mapreduce.processor.map.MapProcessor; + +import com.google.inject.AbstractModule; +import com.google.inject.assistedinject.FactoryModuleBuilder; + +public class InitialTaskWithLocalSort extends AbstractModule { +// TODO EVENTUALLY - have all types subclass a single parent instead of AbstractModule. + + @Override + protected void configure() { + install( + new FactoryModuleBuilder().implement( + Input.class, SimpleInput.class). + build(InputFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Output.class, LocalOnFileSorterOutput.class). + build(OutputFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Processor.class, MapProcessor.class). + build(ProcessorFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Task.class, RuntimeTask.class). + build(TaskFactory.class) + ); + + bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class); + } + +} Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java?rev=1469642&view=auto ============================================================================== --- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java (added) +++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java Thu Apr 18 23:54:18 2013 @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.mapreduce.task; + +import org.apache.tez.engine.api.Input; +import org.apache.tez.engine.api.Output; +import org.apache.tez.engine.api.Processor; +import org.apache.tez.engine.api.Task; +import org.apache.tez.engine.lib.input.ShuffledMergedInput; +import org.apache.tez.engine.lib.output.OnFileSortedOutput; +import org.apache.tez.engine.runtime.InputFactory; +import org.apache.tez.engine.runtime.TezEngineFactory; +import org.apache.tez.engine.runtime.TezEngineFactoryImpl; +import org.apache.tez.engine.runtime.OutputFactory; +import org.apache.tez.engine.runtime.ProcessorFactory; +import org.apache.tez.engine.runtime.TaskFactory; +import org.apache.tez.engine.task.RuntimeTask; +import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor; + +import com.google.inject.AbstractModule; +import com.google.inject.assistedinject.FactoryModuleBuilder; + + +public class IntermediateTask extends AbstractModule { + + @Override + protected void configure() { + install( + new FactoryModuleBuilder().implement( + Input.class, ShuffledMergedInput.class). + build(InputFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Output.class, OnFileSortedOutput.class). + build(OutputFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Processor.class, ReduceProcessor.class). + build(ProcessorFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Task.class, RuntimeTask.class). + build(TaskFactory.class) + ); + + bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class); + } + +} Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java?rev=1469642&view=auto ============================================================================== --- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java (added) +++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java Thu Apr 18 23:54:18 2013 @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.mapreduce.task; + +import org.apache.tez.engine.api.Input; +import org.apache.tez.engine.api.Output; +import org.apache.tez.engine.api.Processor; +import org.apache.tez.engine.api.Task; +import org.apache.tez.engine.lib.input.LocalMergedInput; +import org.apache.tez.engine.runtime.InputFactory; +import org.apache.tez.engine.runtime.TezEngineFactory; +import org.apache.tez.engine.runtime.TezEngineFactoryImpl; +import org.apache.tez.engine.runtime.OutputFactory; +import org.apache.tez.engine.runtime.ProcessorFactory; +import org.apache.tez.engine.runtime.TaskFactory; +import org.apache.tez.engine.task.RuntimeTask; +import org.apache.tez.mapreduce.output.SimpleOutput; +import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor; + +import com.google.inject.AbstractModule; +import com.google.inject.assistedinject.FactoryModuleBuilder; + +public class LocalFinalTask extends AbstractModule { + + @Override + protected void configure() { + install( + new FactoryModuleBuilder().implement( + Output.class, SimpleOutput.class). + build(OutputFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Input.class, LocalMergedInput.class). + build(InputFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Processor.class, ReduceProcessor.class). + build(ProcessorFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Task.class, RuntimeTask.class). + build(TaskFactory.class) + ); + + bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class); + } + +} Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java?rev=1469642&view=auto ============================================================================== --- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java (added) +++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java Thu Apr 18 23:54:18 2013 @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.mapreduce.task; + +import org.apache.tez.engine.api.Input; +import org.apache.tez.engine.api.Output; +import org.apache.tez.engine.api.Processor; +import org.apache.tez.engine.api.Task; +import org.apache.tez.engine.runtime.InputFactory; +import org.apache.tez.engine.runtime.TezEngineFactory; +import org.apache.tez.engine.runtime.TezEngineFactoryImpl; +import org.apache.tez.engine.runtime.OutputFactory; +import org.apache.tez.engine.runtime.ProcessorFactory; +import org.apache.tez.engine.runtime.TaskFactory; +import org.apache.tez.engine.task.RuntimeTask; +import org.apache.tez.mapreduce.input.SimpleInput; +import org.apache.tez.mapreduce.output.SimpleOutput; +import org.apache.tez.mapreduce.processor.map.MapProcessor; + +import com.google.inject.AbstractModule; +import com.google.inject.assistedinject.FactoryModuleBuilder; + +public class MapOnlyTask extends AbstractModule { + + @Override + protected void configure() { + install( + new FactoryModuleBuilder().implement( + Input.class, SimpleInput.class). + build(InputFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Output.class, SimpleOutput.class). + build(OutputFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Processor.class, MapProcessor.class). + build(ProcessorFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Task.class, RuntimeTask.class). + build(TaskFactory.class) + ); + + bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class); + } + +} Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java?rev=1469642&view=auto ============================================================================== --- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java (added) +++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java Thu Apr 18 23:54:18 2013 @@ -0,0 +1,239 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.mapreduce.task.impl; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapOutputFile; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.tez.common.Constants; + +/** + * Manipulate the working area for the transient store for maps and reduces. + * + * This class is used by map and reduce tasks to identify the directories that + * they need to write to/read from for intermediate files. The callers of + * these methods are from child space. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class YarnOutputFiles extends MapOutputFile { + + private JobConf conf; + + private static final String JOB_OUTPUT_DIR = "output"; + private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out"; + private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN + + ".index"; + + public YarnOutputFiles() { + } + + // assume configured to $localdir/usercache/$user/appcache/$appId + private LocalDirAllocator lDirAlloc = + new LocalDirAllocator(MRConfig.LOCAL_DIR); + + private Path getAttemptOutputDir() { + return new Path(JOB_OUTPUT_DIR, conf.get(JobContext.TASK_ATTEMPT_ID)); + } + + /** + * Return the path to local map output file created earlier + * + * @return path + * @throws IOException + */ + public Path getOutputFile() throws IOException { + Path attemptOutput = + new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING); + return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf); + } + + /** + * Create a local map output file name. + * + * @param size the size of the file + * @return path + * @throws IOException + */ + public Path getOutputFileForWrite(long size) throws IOException { + Path attemptOutput = + new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING); + return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf); + } + + /** + * Create a local map output file name on the same volume. + */ + public Path getOutputFileForWriteInVolume(Path existing) { + Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR); + Path attemptOutputDir = new Path(outputDir, + conf.get(JobContext.TASK_ATTEMPT_ID)); + return new Path(attemptOutputDir, Constants.MAP_OUTPUT_FILENAME_STRING); + } + + /** + * Return the path to a local map output index file created earlier + * + * @return path + * @throws IOException + */ + public Path getOutputIndexFile() throws IOException { + Path attemptIndexOutput = + new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING + + Constants.MAP_OUTPUT_INDEX_SUFFIX_STRING); + return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf); + } + + /** + * Create a local map output index file name. + * + * @param size the size of the file + * @return path + * @throws IOException + */ + public Path getOutputIndexFileForWrite(long size) throws IOException { + Path attemptIndexOutput = + new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING + + Constants.MAP_OUTPUT_INDEX_SUFFIX_STRING); + return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(), + size, conf); + } + + /** + * Create a local map output index file name on the same volume. + */ + public Path getOutputIndexFileForWriteInVolume(Path existing) { + Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR); + Path attemptOutputDir = new Path(outputDir, + conf.get(JobContext.TASK_ATTEMPT_ID)); + return new Path(attemptOutputDir, Constants.MAP_OUTPUT_FILENAME_STRING + + Constants.MAP_OUTPUT_INDEX_SUFFIX_STRING); + } + + /** + * Return a local map spill file created earlier. + * + * @param spillNumber the number + * @return path + * @throws IOException + */ + public Path getSpillFile(int spillNumber) throws IOException { + return lDirAlloc.getLocalPathToRead( + String.format(SPILL_FILE_PATTERN, + conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf); + } + + /** + * Create a local map spill file name. + * + * @param spillNumber the number + * @param size the size of the file + * @return path + * @throws IOException + */ + public Path getSpillFileForWrite(int spillNumber, long size) + throws IOException { + return lDirAlloc.getLocalPathForWrite( + String.format(String.format(SPILL_FILE_PATTERN, + conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber)), size, conf); + } + + /** + * Return a local map spill index file created earlier + * + * @param spillNumber the number + * @return path + * @throws IOException + */ + public Path getSpillIndexFile(int spillNumber) throws IOException { + return lDirAlloc.getLocalPathToRead( + String.format(SPILL_INDEX_FILE_PATTERN, + conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf); + } + + /** + * Create a local map spill index file name. + * + * @param spillNumber the number + * @param size the size of the file + * @return path + * @throws IOException + */ + public Path getSpillIndexFileForWrite(int spillNumber, long size) + throws IOException { + return lDirAlloc.getLocalPathForWrite( + String.format(SPILL_INDEX_FILE_PATTERN, + conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), size, conf); + } + + /** + * Return a local reduce input file created earlier + * + * @param mapId a map task id + * @return path + * @throws IOException + */ + public Path getInputFile(int mapId) throws IOException { + throw new UnsupportedOperationException("Incompatible with LocalRunner"); + } + + /** + * Create a local reduce input file name. + * + * @param mapId a map task id + * @param size the size of the file + * @return path + * @throws IOException + */ + public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId, + long size) throws IOException { + return lDirAlloc.getLocalPathForWrite(String.format( + Constants.REDUCE_INPUT_FILE_FORMAT_STRING, + getAttemptOutputDir().toString(), mapId.getId()), + size, conf); + } + + /** Removes all of the files related to a task. */ + public void removeAll() throws IOException { + throw new UnsupportedOperationException("Incompatible with LocalRunner"); + } + + @Override + public void setConf(Configuration conf) { + if (conf instanceof JobConf) { + this.conf = (JobConf) conf; + } else { + this.conf = new JobConf(conf); + } + } + + @Override + public Configuration getConf() { + return conf; + } + +} Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java?rev=1469642&view=auto ============================================================================== --- incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java (added) +++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java Thu Apr 18 23:54:18 2013 @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.tez.common.TezEngineTask; +import org.apache.tez.engine.api.Input; +import org.apache.tez.engine.api.Master; +import org.apache.tez.engine.api.Output; +import org.apache.tez.engine.api.Processor; +import org.apache.tez.engine.api.Task; +import org.apache.tez.engine.lib.input.ShuffledMergedInput; +import org.apache.tez.engine.lib.output.OnFileSortedOutput; +import org.apache.tez.engine.runtime.InputFactory; +import org.apache.tez.engine.runtime.OutputFactory; +import org.apache.tez.engine.runtime.ProcessorFactory; +import org.apache.tez.engine.runtime.TaskFactory; +import org.apache.tez.engine.runtime.TezEngineFactory; +import org.apache.tez.engine.runtime.TezEngineFactoryImpl; +import org.apache.tez.mapreduce.hadoop.MRTaskType; +import org.apache.tez.mapreduce.input.SimpleInput; +import org.apache.tez.mapreduce.output.SimpleOutput; +import org.apache.tez.mapreduce.processor.map.MapProcessor; +import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor; +import org.apache.tez.mapreduce.task.FinalTask; +import org.apache.tez.mapreduce.task.InitialTask; +import org.apache.tez.mapreduce.task.IntermediateTask; +import org.junit.Before; +import org.junit.Test; + +import com.google.inject.Guice; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.assistedinject.Assisted; +import com.google.inject.assistedinject.FactoryModuleBuilder; + +public class TestTaskModules { + + private static final Log LOG = LogFactory.getLog(TestTaskModules.class); + + TezEngineTask taskContext; + JobConf job; + + @Before + public void setUp() { + taskContext = new TezEngineTask(TezTestUtils.getMockTaskAttemptId(0, 0, 0, + MRTaskType.REDUCE), "tez", "tez", "TODO_vertexName", + TestInitialModule.class.getName(), null, null); + job = new JobConf(); + } + + @Test + public void testInitialTask() throws Exception { + Injector injector = Guice.createInjector(new TestInitialModule()); + TezEngineFactory factory = injector.getInstance(TezEngineFactory.class); + Task t = factory.createTask(taskContext); + t.initialize(job, null); + } + + @Test + public void testIntermediateTask() throws Exception { + Injector injector = Guice.createInjector(new TestIntermediateModule()); + TezEngineFactory factory = injector.getInstance(TezEngineFactory.class); + Task t = factory.createTask(taskContext); + t.initialize(job, null); + } + + @Test + public void testFinalTask() throws Exception { + Injector injector = Guice.createInjector(new TestFinalModule()); + TezEngineFactory factory = injector.getInstance(TezEngineFactory.class); + Task task = factory.createTask(taskContext); + LOG.info("task = " + task.getClass()); + task.initialize(job, null); + } + + static class TestTask implements Task { + + private final Input in; + private final Output out; + private final Processor processor; + + @Inject + public TestTask( + @Assisted Processor processor, + @Assisted Input in, + @Assisted Output out) { + this.in = in; + this.processor = processor; + this.out = out; + } + + @Override + public void initialize(Configuration conf, Master master) + throws IOException, InterruptedException { + LOG.info("in = " + in.getClass()); + LOG.info("processor = " + processor.getClass()); + LOG.info("out = " + out.getClass()); + } + + @Override + public Input getInput() { + return in; + } + + @Override + public Output getOutput() { + return out; + } + + @Override + public Processor getProcessor() { + return processor; + } + + @Override + public void run() throws IOException, InterruptedException { + // TODO Auto-generated method stub + + } + + @Override + public void close() throws IOException, InterruptedException { + // TODO Auto-generated method stub + + } + + } + + static class TestInitialModule extends InitialTask { + + @Override + protected void configure() { + install( + new FactoryModuleBuilder().implement( + Input.class, SimpleInput.class). + build(InputFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Output.class, OnFileSortedOutput.class). + build(OutputFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Processor.class, MapProcessor.class). + build(ProcessorFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Task.class, TestTask.class). + build(TaskFactory.class) + ); + + bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class); + } + + } + + + static class TestIntermediateModule extends IntermediateTask { + + @Override + protected void configure() { + install( + new FactoryModuleBuilder().implement( + Input.class, ShuffledMergedInput.class). + build(InputFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Output.class, OnFileSortedOutput.class). + build(OutputFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Processor.class, ReduceProcessor.class). + build(ProcessorFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Task.class, TestTask.class). + build(TaskFactory.class) + ); + + bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class); + } + + } + + + static class TestFinalModule extends FinalTask { + + @Override + protected void configure() { + install( + new FactoryModuleBuilder().implement( + Output.class, SimpleOutput.class). + build(OutputFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Input.class, ShuffledMergedInput.class). + build(InputFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Processor.class, ReduceProcessor.class). + build(ProcessorFactory.class) + ); + install( + new FactoryModuleBuilder().implement( + Task.class, TestTask.class). + build(TaskFactory.class) + ); + + bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class); + } + + } + +} Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java?rev=1469642&view=auto ============================================================================== --- incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java (added) +++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java Thu Apr 18 23:54:18 2013 @@ -0,0 +1,148 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.tez.common.ContainerTask; +import org.apache.tez.common.TezTaskStatus; +import org.apache.tez.engine.records.OutputContext; +import org.apache.tez.engine.records.TezTaskAttemptID; +import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate; +import org.apache.tez.mapreduce.hadoop.ContainerContext; +import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol; +import org.apache.tez.mapreduce.hadoop.records.ProceedToCompletionResponse; + +public class TestUmbilicalProtocol implements TezTaskUmbilicalProtocol { + + private static final Log LOG = LogFactory.getLog(TestUmbilicalProtocol.class); + private ProceedToCompletionResponse proceedToCompletionResponse; + + + public TestUmbilicalProtocol() { + proceedToCompletionResponse = new ProceedToCompletionResponse(false, true); + } + + public TestUmbilicalProtocol(boolean shouldLinger) { + if (shouldLinger) { + proceedToCompletionResponse = new ProceedToCompletionResponse(false, false); + } else { + proceedToCompletionResponse = new ProceedToCompletionResponse(false, true); + } + } + + @Override + public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2) + throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public long getProtocolVersion(String arg0, long arg1) throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public TezTaskDependencyCompletionEventsUpdate getDependentTasksCompletionEvents( + int fromEventIdx, int maxEventsToFetch, + TezTaskAttemptID reduce) { + // TODO Auto-generated method stub + return null; + } + + @Override + public ContainerTask getTask(ContainerContext containerContext) + throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean statusUpdate(TezTaskAttemptID taskId, TezTaskStatus taskStatus) + throws IOException, InterruptedException { + LOG.info("Got 'status-update' from " + taskId + ": status=" + taskStatus); + return true; + } + + @Override + public void reportDiagnosticInfo(TezTaskAttemptID taskid, String trace) + throws IOException { + LOG.info("Got 'diagnostic-info' from " + taskid + ": trace=" + trace); + } + + @Override + public boolean ping(TezTaskAttemptID taskid) throws IOException { + LOG.info("Got 'ping' from " + taskid); + return true; + } + + @Override + public void done(TezTaskAttemptID taskid) throws IOException { + LOG.info("Got 'done' from " + taskid); + } + + @Override + public void commitPending(TezTaskAttemptID taskId, TezTaskStatus taskStatus) + throws IOException, InterruptedException { + LOG.info("Got 'commit-pending' from " + taskId + ": status=" + taskStatus); + } + + @Override + public boolean canCommit(TezTaskAttemptID taskid) throws IOException { + LOG.info("Got 'can-commit' from " + taskid); + return false; + } + + @Override + public void shuffleError(TezTaskAttemptID taskId, String message) + throws IOException { + LOG.info("Got 'shuffle-error' from " + taskId + ": message=" + message); + } + + @Override + public void fsError(TezTaskAttemptID taskId, String message) + throws IOException { + LOG.info("Got 'fs-error' from " + taskId + ": message=" + message); + } + + @Override + public void fatalError(TezTaskAttemptID taskId, String message) + throws IOException { + LOG.info("Got 'fatal-error' from " + taskId + ": message=" + message); + } + + @Override + public void outputReady(TezTaskAttemptID taskAttemptId, + OutputContext outputContext) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public ProceedToCompletionResponse proceedToCompletion( + TezTaskAttemptID taskAttemptId) throws IOException { + return proceedToCompletionResponse; + } + +} Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java ------------------------------------------------------------------------------ svn:eol-style = native