Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2C65410914 for ; Tue, 23 Apr 2013 20:41:10 +0000 (UTC) Received: (qmail 44834 invoked by uid 500); 23 Apr 2013 20:41:09 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 44305 invoked by uid 500); 23 Apr 2013 20:41:06 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 43460 invoked by uid 99); 23 Apr 2013 20:41:05 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Apr 2013 20:41:05 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 074ED823208; Tue, 23 Apr 2013 20:41:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Date: Tue, 23 Apr 2013 20:41:36 -0000 Message-Id: In-Reply-To: <5c5792aee18145bb82d063830978ad2a@git.apache.org> References: <5c5792aee18145bb82d063830978ad2a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [34/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java new file mode 100644 index 0000000..93926c1 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.crunch.impl.mr.run.RuntimeParameters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.base.Objects; +import com.google.common.collect.Lists; + +/** + * This class encapsulates a MapReduce job and its dependency. It monitors the + * states of the depending jobs and updates the state of this job. A job starts + * in the WAITING state. If it does not have any depending jobs, or all of the + * depending jobs are in SUCCEEDED state, then the job state will become READY. If + * any depending jobs fail, the job will fail too. When in READY state, the job + * can be submitted to Hadoop for execution, with the state changing into + * RUNNING state. From RUNNING state, the job can get into SUCCEEDED or FAILED + * state, depending the status of the job execution. + */ +public class CrunchControlledJob { + + // A job will be in one of the following states + public static enum State { + SUCCESS, WAITING, RUNNING, READY, FAILED, DEPENDENT_FAILED + }; + + public static interface Hook { + public void run() throws IOException; + } + + private static final Log LOG = LogFactory.getLog(CrunchControlledJob.class); + + private final int jobID; + private final Job job; // mapreduce job to be executed. + // the jobs the current job depends on + private final List dependingJobs; + private final Hook prepareHook; + private final Hook completionHook; + private State state; + // some info for human consumption, e.g. the reason why the job failed + private String message; + private String lastKnownProgress; + + /** + * Construct a job. + * + * @param jobID + * an ID used to match with its {@link org.apache.crunch.impl.mr.plan.JobPrototype}. + * @param job + * a mapreduce job to be executed. + * @param prepareHook + * a piece of code that will run before this job is submitted. + * @param completionHook + * a piece of code that will run after this job gets completed. + */ + public CrunchControlledJob(int jobID, Job job, Hook prepareHook, Hook completionHook) { + this.jobID = jobID; + this.job = job; + this.dependingJobs = Lists.newArrayList(); + this.prepareHook = prepareHook; + this.completionHook = completionHook; + this.state = State.WAITING; + this.message = "just initialized"; + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("job name:\t").append(this.job.getJobName()).append("\n"); + sb.append("job id:\t").append(this.jobID).append("\n"); + sb.append("job state:\t").append(this.state).append("\n"); + sb.append("job mapred id:\t").append(this.job.getJobID()).append("\n"); + sb.append("job message:\t").append(this.message).append("\n"); + + if (this.dependingJobs == null || this.dependingJobs.size() == 0) { + sb.append("job has no depending job:\t").append("\n"); + } else { + sb.append("job has ").append(this.dependingJobs.size()) + .append(" dependeng jobs:\n"); + for (int i = 0; i < this.dependingJobs.size(); i++) { + sb.append("\t depending job ").append(i).append(":\t"); + sb.append((this.dependingJobs.get(i)).getJobName()).append("\n"); + } + } + return sb.toString(); + } + + /** + * @return the job name of this job + */ + public String getJobName() { + return job.getJobName(); + } + + /** + * Set the job name for this job. + * + * @param jobName + * the job name + */ + public void setJobName(String jobName) { + job.setJobName(jobName); + } + + /** + * @return the job ID of this job + */ + public int getJobID() { + return this.jobID; + } + + /** + * @return the mapred ID of this job as assigned by the mapred framework. + */ + public JobID getMapredJobID() { + return this.job.getJobID(); + } + + /** + * @return the mapreduce job + */ + public synchronized Job getJob() { + return this.job; + } + + /** + * @return the state of this job + */ + public synchronized State getJobState() { + return this.state; + } + + /** + * Set the state for this job. + * + * @param state + * the new state for this job. + */ + protected synchronized void setJobState(State state) { + this.state = state; + } + + /** + * @return the message of this job + */ + public synchronized String getMessage() { + return this.message; + } + + /** + * Set the message for this job. + * + * @param message + * the message for this job. + */ + public synchronized void setMessage(String message) { + this.message = message; + } + + /** + * @return the depending jobs of this job + */ + public List getDependentJobs() { + return this.dependingJobs; + } + + /** + * Add a job to this jobs' dependency list. Dependent jobs can only be added + * while a Job is waiting to run, not during or afterwards. + * + * @param dependingJob + * Job that this Job depends on. + * @return true if the Job was added. + */ + public synchronized boolean addDependingJob(CrunchControlledJob dependingJob) { + if (this.state == State.WAITING) { // only allowed to add jobs when waiting + return this.dependingJobs.add(dependingJob); + } else { + return false; + } + } + + /** + * @return true if this job is in a complete state + */ + public synchronized boolean isCompleted() { + return this.state == State.FAILED || this.state == State.DEPENDENT_FAILED + || this.state == State.SUCCESS; + } + + /** + * @return true if this job is in READY state + */ + public synchronized boolean isReady() { + return this.state == State.READY; + } + + public void killJob() throws IOException, InterruptedException { + job.killJob(); + } + + /** + * Check the state of this running job. The state may remain the same, become + * SUCCEEDED or FAILED. + */ + private void checkRunningState() throws IOException, InterruptedException { + try { + if (job.isComplete()) { + if (job.isSuccessful()) { + this.state = State.SUCCESS; + } else { + this.state = State.FAILED; + this.message = "Job failed!"; + } + } else { + // still running + if (job.getConfiguration().getBoolean(RuntimeParameters.LOG_JOB_PROGRESS, false)) { + logJobProgress(); + } + } + } catch (IOException ioe) { + this.state = State.FAILED; + this.message = StringUtils.stringifyException(ioe); + try { + if (job != null) { + job.killJob(); + } + } catch (IOException e) { + } + } + if (isCompleted()) { + completionHook.run(); + } + } + + /** + * Check and update the state of this job. The state changes depending on its + * current state and the states of the depending jobs. + */ + synchronized State checkState() throws IOException, InterruptedException { + if (this.state == State.RUNNING) { + checkRunningState(); + } + if (this.state != State.WAITING) { + return this.state; + } + if (this.dependingJobs == null || this.dependingJobs.size() == 0) { + this.state = State.READY; + return this.state; + } + CrunchControlledJob pred = null; + int n = this.dependingJobs.size(); + for (int i = 0; i < n; i++) { + pred = this.dependingJobs.get(i); + State s = pred.checkState(); + if (s == State.WAITING || s == State.READY || s == State.RUNNING) { + break; // a pred is still not completed, continue in WAITING + // state + } + if (s == State.FAILED || s == State.DEPENDENT_FAILED) { + this.state = State.DEPENDENT_FAILED; + this.message = "depending job " + i + " with jobID " + pred.getJobID() + + " failed. " + pred.getMessage(); + break; + } + // pred must be in success state + if (i == n - 1) { + this.state = State.READY; + } + } + + return this.state; + } + + /** + * Submit this job to mapred. The state becomes RUNNING if submission is + * successful, FAILED otherwise. + */ + protected synchronized void submit() { + try { + prepareHook.run(); + job.submit(); + this.state = State.RUNNING; + LOG.info("Running job \"" + getJobName() + "\""); + LOG.info("Job status available at: " + job.getTrackingURL()); + } catch (Exception ioe) { + this.state = State.FAILED; + this.message = StringUtils.stringifyException(ioe); + LOG.info("Error occurred starting job \"" + getJobName() + "\":"); + LOG.info(getMessage()); + } + } + + private void logJobProgress() throws IOException, InterruptedException { + String progress = String.format("map %.0f%% reduce %.0f%%", + 100.0 * job.mapProgress(), 100.0 * job.reduceProgress()); + if (!Objects.equal(lastKnownProgress, progress)) { + LOG.info(job.getJobName() + " progress: " + progress); + lastKnownProgress = progress; + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java new file mode 100644 index 0000000..727ab6f --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.State; + +/** + * This class encapsulates a set of MapReduce jobs and its dependency. + * + * It tracks the states of the jobs by placing them into different tables + * according to their states. + * + * This class provides APIs for the client app to add a job to the group and to + * get the jobs in the group in different states. When a job is added, an ID + * unique to the group is assigned to the job. + */ +public class CrunchJobControl { + + private Map waitingJobs; + private Map readyJobs; + private Map runningJobs; + private Map successfulJobs; + private Map failedJobs; + + private Log log = LogFactory.getLog(CrunchJobControl.class); + + private final String groupName; + + /** + * Construct a job control for a group of jobs. + * + * @param groupName + * a name identifying this group + */ + public CrunchJobControl(String groupName) { + this.waitingJobs = new Hashtable(); + this.readyJobs = new Hashtable(); + this.runningJobs = new Hashtable(); + this.successfulJobs = new Hashtable(); + this.failedJobs = new Hashtable(); + this.groupName = groupName; + } + + private static List toList(Map jobs) { + ArrayList retv = new ArrayList(); + synchronized (jobs) { + for (CrunchControlledJob job : jobs.values()) { + retv.add(job); + } + } + return retv; + } + + /** + * @return the jobs in the waiting state + */ + public List getWaitingJobList() { + return toList(this.waitingJobs); + } + + /** + * @return the jobs in the running state + */ + public List getRunningJobList() { + return toList(this.runningJobs); + } + + /** + * @return the jobs in the ready state + */ + public List getReadyJobsList() { + return toList(this.readyJobs); + } + + /** + * @return the jobs in the success state + */ + public List getSuccessfulJobList() { + return toList(this.successfulJobs); + } + + public List getFailedJobList() { + return toList(this.failedJobs); + } + + private static void addToQueue(CrunchControlledJob aJob, + Map queue) { + synchronized (queue) { + queue.put(aJob.getJobID(), aJob); + } + } + + private void addToQueue(CrunchControlledJob aJob) { + Map queue = getQueue(aJob.getJobState()); + addToQueue(aJob, queue); + } + + private Map getQueue(State state) { + Map retv = null; + if (state == State.WAITING) { + retv = this.waitingJobs; + } else if (state == State.READY) { + retv = this.readyJobs; + } else if (state == State.RUNNING) { + retv = this.runningJobs; + } else if (state == State.SUCCESS) { + retv = this.successfulJobs; + } else if (state == State.FAILED || state == State.DEPENDENT_FAILED) { + retv = this.failedJobs; + } + return retv; + } + + /** + * Add a new job. + * + * @param aJob + * the new job + */ + synchronized public void addJob(CrunchControlledJob aJob) { + aJob.setJobState(State.WAITING); + this.addToQueue(aJob); + } + + synchronized private void checkRunningJobs() throws IOException, + InterruptedException { + + Map oldJobs = null; + oldJobs = this.runningJobs; + this.runningJobs = new Hashtable(); + + for (CrunchControlledJob nextJob : oldJobs.values()) { + nextJob.checkState(); + this.addToQueue(nextJob); + } + } + + synchronized private void checkWaitingJobs() throws IOException, + InterruptedException { + Map oldJobs = null; + oldJobs = this.waitingJobs; + this.waitingJobs = new Hashtable(); + + for (CrunchControlledJob nextJob : oldJobs.values()) { + nextJob.checkState(); + this.addToQueue(nextJob); + } + } + + synchronized private void startReadyJobs() { + Map oldJobs = null; + oldJobs = this.readyJobs; + this.readyJobs = new Hashtable(); + + for (CrunchControlledJob nextJob : oldJobs.values()) { + // Submitting Job to Hadoop + nextJob.submit(); + this.addToQueue(nextJob); + } + } + + synchronized public void killAllRunningJobs() { + for (CrunchControlledJob job : runningJobs.values()) { + if (!job.isCompleted()) { + try { + job.killJob(); + } catch (Exception e) { + log.error("Exception killing job: " + job.getJobName(), e); + } + } + } + } + + synchronized public boolean allFinished() { + return this.waitingJobs.size() == 0 && this.readyJobs.size() == 0 + && this.runningJobs.size() == 0; + } + + /** + * Checks the states of the running jobs Update the states of waiting jobs, and submits the jobs in + * ready state (i.e. whose dependencies are all finished in success). + */ + public void pollJobStatusAndStartNewOnes() throws IOException, InterruptedException { + checkRunningJobs(); + checkWaitingJobs(); + startReadyJobs(); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/SingleUseIterable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/SingleUseIterable.java b/crunch-core/src/main/java/org/apache/crunch/impl/SingleUseIterable.java new file mode 100644 index 0000000..98f982f --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/SingleUseIterable.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl; + +import java.util.Iterator; + +/** + * Wrapper around a Reducer's input Iterable. Ensures that the + * {@link #iterator()} method is not called more than once. + */ +public class SingleUseIterable implements Iterable { + + private boolean used = false; + private Iterable wrappedIterable; + + /** + * Instantiate around an Iterable that may only be used once. + * + * @param toWrap iterable to wrap + */ + public SingleUseIterable(Iterable toWrap) { + this.wrappedIterable = toWrap; + } + + @Override + public Iterator iterator() { + if (used) { + throw new IllegalStateException("iterator() can only be called once on this Iterable"); + } + used = true; + return wrappedIterable.iterator(); + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java new file mode 100644 index 0000000..272b2af --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mem; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.PipelineExecution; +import org.apache.crunch.PipelineResult; +import org.apache.crunch.Source; +import org.apache.crunch.TableSource; +import org.apache.crunch.Target; +import org.apache.crunch.Target.WriteMode; +import org.apache.crunch.impl.mem.collect.MemCollection; +import org.apache.crunch.impl.mem.collect.MemTable; +import org.apache.crunch.io.At; +import org.apache.crunch.io.PathTarget; +import org.apache.crunch.io.ReadableSource; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Counters; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class MemPipeline implements Pipeline { + + private static final Log LOG = LogFactory.getLog(MemPipeline.class); + private static Counters COUNTERS = new Counters(); + private static final MemPipeline INSTANCE = new MemPipeline(); + + private int outputIndex = 0; + + public static Counters getCounters() { + return COUNTERS; + } + + public static void clearCounters() { + COUNTERS = new Counters(); + } + + public static Pipeline getInstance() { + return INSTANCE; + } + + public static PCollection collectionOf(T... ts) { + return new MemCollection(ImmutableList.copyOf(ts)); + } + + public static PCollection collectionOf(Iterable collect) { + return new MemCollection(collect); + } + + public static PCollection typedCollectionOf(PType ptype, T... ts) { + return new MemCollection(ImmutableList.copyOf(ts), ptype, null); + } + + public static PCollection typedCollectionOf(PType ptype, Iterable collect) { + return new MemCollection(collect, ptype, null); + } + + public static PTable tableOf(S s, T t, Object... more) { + List> pairs = Lists.newArrayList(); + pairs.add(Pair.of(s, t)); + for (int i = 0; i < more.length; i += 2) { + pairs.add(Pair.of((S) more[i], (T) more[i + 1])); + } + return new MemTable(pairs); + } + + public static PTable typedTableOf(PTableType ptype, S s, T t, Object... more) { + List> pairs = Lists.newArrayList(); + pairs.add(Pair.of(s, t)); + for (int i = 0; i < more.length; i += 2) { + pairs.add(Pair.of((S) more[i], (T) more[i + 1])); + } + return new MemTable(pairs, ptype, null); + } + + public static PTable tableOf(Iterable> pairs) { + return new MemTable(pairs); + } + + public static PTable typedTableOf(PTableType ptype, Iterable> pairs) { + return new MemTable(pairs, ptype, null); + } + + private Configuration conf = new Configuration(); + private Set activeTargets = Sets.newHashSet(); + + private MemPipeline() { + } + + @Override + public void setConfiguration(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public PCollection read(Source source) { + if (source instanceof ReadableSource) { + try { + Iterable iterable = ((ReadableSource) source).read(conf); + return new MemCollection(iterable, source.getType(), source.toString()); + } catch (IOException e) { + LOG.error("Exception reading source: " + source.toString(), e); + throw new IllegalStateException(e); + } + } + LOG.error("Source " + source + " is not readable"); + throw new IllegalStateException("Source " + source + " is not readable"); + } + + @Override + public PTable read(TableSource source) { + if (source instanceof ReadableSource) { + try { + Iterable> iterable = ((ReadableSource>) source).read(conf); + return new MemTable(iterable, source.getTableType(), source.toString()); + } catch (IOException e) { + LOG.error("Exception reading source: " + source.toString(), e); + throw new IllegalStateException(e); + } + } + LOG.error("Source " + source + " is not readable"); + throw new IllegalStateException("Source " + source + " is not readable"); + } + + @Override + public void write(PCollection collection, Target target) { + write(collection, target, Target.WriteMode.DEFAULT); + } + + @Override + public void write(PCollection collection, Target target, + Target.WriteMode writeMode) { + target.handleExisting(writeMode, getConfiguration()); + if (writeMode != WriteMode.APPEND && activeTargets.contains(target)) { + throw new CrunchRuntimeException("Target " + target + " is already written in the current run." + + " Use WriteMode.APPEND in order to write additional data to it."); + } + activeTargets.add(target); + if (target instanceof PathTarget) { + Path path = ((PathTarget) target).getPath(); + try { + FileSystem fs = path.getFileSystem(conf); + FSDataOutputStream os = fs.create(new Path(path, "out" + outputIndex)); + outputIndex++; + if (collection instanceof PTable) { + for (Object o : collection.materialize()) { + Pair p = (Pair) o; + os.writeBytes(p.first().toString()); + os.writeBytes("\t"); + os.writeBytes(p.second().toString()); + os.writeBytes("\r\n"); + } + } else { + for (Object o : collection.materialize()) { + os.writeBytes(o.toString() + "\r\n"); + } + } + os.close(); + } catch (IOException e) { + LOG.error("Exception writing target: " + target, e); + } + } else { + LOG.error("Target " + target + " is not a PathTarget instance"); + } + } + + @Override + public PCollection readTextFile(String pathName) { + return read(At.textFile(pathName)); + } + + @Override + public void writeTextFile(PCollection collection, String pathName) { + write(collection, At.textFile(pathName)); + } + + @Override + public Iterable materialize(PCollection pcollection) { + return pcollection.materialize(); + } + + @Override + public PipelineExecution runAsync() { + activeTargets.clear(); + return new PipelineExecution() { + @Override + public String getPlanDotFile() { + return ""; + } + + @Override + public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException { + // no-po + } + + @Override + public void waitUntilDone() throws InterruptedException { + // no-po + } + + @Override + public Status getStatus() { + return Status.SUCCEEDED; + } + + @Override + public PipelineResult getResult() { + return new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("MemPipelineStage", COUNTERS))); + } + + @Override + public void kill() { + } + }; + } + + @Override + public PipelineResult run() { + activeTargets.clear(); + return new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("MemPipelineStage", COUNTERS))); + } + + @Override + public PipelineResult done() { + return run(); + } + + @Override + public void enableDebug() { + LOG.info("Note: in-memory pipelines do not have debug logging"); + } + + @Override + public String getName() { + return "Memory Pipeline"; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java new file mode 100644 index 0000000..c97fac6 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java @@ -0,0 +1,295 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mem.collect; + +import java.lang.reflect.Method; +import java.util.Collection; + +import javassist.util.proxy.MethodFilter; +import javassist.util.proxy.MethodHandler; +import javassist.util.proxy.ProxyFactory; + +import org.apache.crunch.DoFn; +import org.apache.crunch.FilterFn; +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PObject; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.ParallelDoOptions; +import org.apache.crunch.Pipeline; +import org.apache.crunch.Target; +import org.apache.crunch.fn.ExtractKeyFn; +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.impl.mem.emit.InMemoryEmitter; +import org.apache.crunch.lib.Aggregate; +import org.apache.crunch.materialize.pobject.CollectionPObject; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.PTypeFamily; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.StatusReporter; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +public class MemCollection implements PCollection { + + private final Collection collect; + private final PType ptype; + private String name; + + public MemCollection(Iterable collect) { + this(collect, null, null); + } + + public MemCollection(Iterable collect, PType ptype) { + this(collect, ptype, null); + } + + public MemCollection(Iterable collect, PType ptype, String name) { + this.collect = ImmutableList.copyOf(collect); + this.ptype = ptype; + this.name = name; + } + + @Override + public Pipeline getPipeline() { + return MemPipeline.getInstance(); + } + + @Override + public PCollection union(PCollection other) { + return union(new PCollection[] { other }); + } + + @Override + public PCollection union(PCollection... collections) { + Collection output = Lists.newArrayList(); + for (PCollection pcollect : collections) { + for (S s : pcollect.materialize()) { + output.add(s); + } + } + output.addAll(collect); + return new MemCollection(output, collections[0].getPType()); + } + + @Override + public PCollection parallelDo(DoFn doFn, PType type) { + return parallelDo(null, doFn, type); + } + + @Override + public PCollection parallelDo(String name, DoFn doFn, PType type) { + return parallelDo(name, doFn, type, ParallelDoOptions.builder().build()); + } + + @Override + public PCollection parallelDo(String name, DoFn doFn, PType type, + ParallelDoOptions options) { + InMemoryEmitter emitter = new InMemoryEmitter(); + doFn.setContext(getInMemoryContext(getPipeline().getConfiguration())); + doFn.initialize(); + for (S s : collect) { + doFn.process(s, emitter); + } + doFn.cleanup(emitter); + return new MemCollection(emitter.getOutput(), type, name); + } + + @Override + public PTable parallelDo(DoFn> doFn, PTableType type) { + return parallelDo(null, doFn, type); + } + + @Override + public PTable parallelDo(String name, DoFn> doFn, PTableType type) { + return parallelDo(name, doFn, type, ParallelDoOptions.builder().build()); + } + + @Override + public PTable parallelDo(String name, DoFn> doFn, PTableType type, + ParallelDoOptions options) { + InMemoryEmitter> emitter = new InMemoryEmitter>(); + doFn.setContext(getInMemoryContext(getPipeline().getConfiguration())); + doFn.initialize(); + for (S s : collect) { + doFn.process(s, emitter); + } + doFn.cleanup(emitter); + return new MemTable(emitter.getOutput(), type, name); + } + + @Override + public PCollection write(Target target) { + getPipeline().write(this, target); + return this; + } + + @Override + public PCollection write(Target target, Target.WriteMode writeMode) { + getPipeline().write(this, target, writeMode); + return this; + } + + @Override + public Iterable materialize() { + return collect; + } + + /** {@inheritDoc} */ + @Override + public PObject> asCollection() { + return new CollectionPObject(this); + } + + public Collection getCollection() { + return collect; + } + + @Override + public PType getPType() { + return ptype; + } + + @Override + public PTypeFamily getTypeFamily() { + if (ptype != null) { + return ptype.getFamily(); + } + return null; + } + + @Override + public long getSize() { + return collect.isEmpty() ? 0 : 1; // getSize is only used for pipeline optimization in MR + } + + @Override + public String getName() { + return name; + } + + @Override + public String toString() { + return collect.toString(); + } + + @Override + public PTable count() { + return Aggregate.count(this); + } + + @Override + public PObject length() { + return Aggregate.length(this); + } + + @Override + public PObject max() { + return Aggregate.max(this); + } + + @Override + public PObject min() { + return Aggregate.min(this); + } + + @Override + public PCollection filter(FilterFn filterFn) { + return parallelDo(filterFn, getPType()); + } + + @Override + public PCollection filter(String name, FilterFn filterFn) { + return parallelDo(name, filterFn, getPType()); + } + + @Override + public PTable by(MapFn mapFn, PType keyType) { + return parallelDo(new ExtractKeyFn(mapFn), getTypeFamily().tableOf(keyType, getPType())); + } + + @Override + public PTable by(String name, MapFn mapFn, PType keyType) { + return parallelDo(name, new ExtractKeyFn(mapFn), getTypeFamily().tableOf(keyType, getPType())); + } + + /** + * The method creates a {@link TaskInputOutputContext} that will just provide + * {@linkplain Configuration}. The method has been implemented with javaassist + * as there are API changes in versions of Hadoop. In hadoop 1.0.3 the + * {@linkplain TaskInputOutputContext} is abstract class while in version 2 + * the same is an interface. + *

+ * Note: The intention of this is to provide the bare essentials that are + * required to make the {@linkplain MemPipeline} work. It lacks even the basic + * things that can proved some support for unit testing pipeline. + */ + private static TaskInputOutputContext getInMemoryContext(final Configuration conf) { + ProxyFactory factory = new ProxyFactory(); + Class superType = TaskInputOutputContext.class; + Class[] types = new Class[0]; + Object[] args = new Object[0]; + if (superType.isInterface()) { + factory.setInterfaces(new Class[] { superType }); + } else { + types = new Class[] { Configuration.class, TaskAttemptID.class, RecordWriter.class, OutputCommitter.class, + StatusReporter.class }; + args = new Object[] { conf, new TaskAttemptID(), null, null, null }; + factory.setSuperclass(superType); + } + factory.setFilter(new MethodFilter() { + @Override + public boolean isHandled(Method m) { + String name = m.getName(); + return "getConfiguration".equals(name) || "getCounter".equals(name) || "progress".equals(name); + } + }); + MethodHandler handler = new MethodHandler() { + @Override + public Object invoke(Object arg0, Method m, Method arg2, Object[] args) throws Throwable { + String name = m.getName(); + if ("getConfiguration".equals(name)) { + return conf; + } else if ("progress".equals(name)) { + // no-op + return null; + } else { // getCounter + if (args.length == 1) { + return MemPipeline.getCounters().findCounter((Enum) args[0]); + } else { + return MemPipeline.getCounters().findCounter((String) args[0], (String) args[1]); + } + } + } + }; + try { + Object newInstance = factory.create(types, args, handler); + return (TaskInputOutputContext) newInstance; + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java new file mode 100644 index 0000000..d105bb4 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mem.collect; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.crunch.Aggregator; +import org.apache.crunch.CombineFn; +import org.apache.crunch.GroupingOptions; +import org.apache.crunch.PCollection; +import org.apache.crunch.PGroupedTable; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.Target; +import org.apache.crunch.fn.Aggregators; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.PTypeFamily; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.util.ReflectionUtils; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +class MemGroupedTable extends MemCollection>> implements PGroupedTable { + + private final MemTable parent; + + private static Iterable>> buildMap(MemTable parent, GroupingOptions options) { + PType keyType = parent.getKeyType(); + Shuffler shuffler = Shuffler.create(keyType, options, parent.getPipeline()); + + for (Pair pair : parent.materialize()) { + shuffler.add(pair); + } + + return shuffler; + } + + public MemGroupedTable(MemTable parent, GroupingOptions options) { + super(buildMap(parent, options)); + this.parent = parent; + } + + @Override + public PCollection>> union(PCollection>>... collections) { + throw new UnsupportedOperationException(); + } + + @Override + public PCollection>> write(Target target) { + getPipeline().write(this.ungroup(), target); + return this; + } + + @Override + public PType>> getPType() { + PTableType parentType = parent.getPTableType(); + if (parentType != null) { + return parentType.getGroupedTableType(); + } + return null; + } + + @Override + public PTypeFamily getTypeFamily() { + return parent.getTypeFamily(); + } + + @Override + public long getSize() { + return 1; // getSize is only used for pipeline optimization in MR + } + + @Override + public String getName() { + return "MemGrouped(" + parent.getName() + ")"; + } + + @Override + public PTable combineValues(CombineFn combineFn) { + return parallelDo(combineFn, parent.getPTableType()); + } + + @Override + public PTable combineValues(Aggregator agg) { + return combineValues(Aggregators.toCombineFn(agg)); + } + + @Override + public PTable ungroup() { + return parent; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java new file mode 100644 index 0000000..f8a5960 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mem.collect; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.crunch.FilterFn; +import org.apache.crunch.GroupingOptions; +import org.apache.crunch.PCollection; +import org.apache.crunch.PGroupedTable; +import org.apache.crunch.PObject; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Target; +import org.apache.crunch.lib.Aggregate; +import org.apache.crunch.lib.Cogroup; +import org.apache.crunch.lib.Join; +import org.apache.crunch.lib.PTables; +import org.apache.crunch.materialize.MaterializableMap; +import org.apache.crunch.materialize.pobject.MapPObject; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; + +import com.google.common.collect.Lists; + +public class MemTable extends MemCollection> implements PTable { + + private PTableType ptype; + + public MemTable(Iterable> collect) { + this(collect, null, null); + } + + public MemTable(Iterable> collect, PTableType ptype, String name) { + super(collect, ptype, name); + this.ptype = ptype; + } + + @Override + public PTable union(PTable other) { + return union(new PTable[] { other }); + } + + @Override + public PTable union(PTable... others) { + List> values = Lists.newArrayList(); + values.addAll(getCollection()); + for (PTable ptable : others) { + for (Pair p : ptable.materialize()) { + values.add(p); + } + } + return new MemTable(values, others[0].getPTableType(), null); + } + + @Override + public PGroupedTable groupByKey() { + return groupByKey(null); + } + + @Override + public PGroupedTable groupByKey(int numPartitions) { + return groupByKey(null); + } + + @Override + public PGroupedTable groupByKey(GroupingOptions options) { + return new MemGroupedTable(this, options); + } + + @Override + public PTable write(Target target) { + super.write(target); + return this; + } + + @Override + public PTable write(Target target, Target.WriteMode writeMode) { + getPipeline().write(this, target, writeMode); + return this; + } + + @Override + public PTableType getPTableType() { + return ptype; + } + + @Override + public PType getKeyType() { + if (ptype != null) { + return ptype.getKeyType(); + } + return null; + } + + @Override + public PType getValueType() { + if (ptype != null) { + return ptype.getValueType(); + } + return null; + } + + @Override + public PTable filter(FilterFn> filterFn) { + return parallelDo(filterFn, getPTableType()); + } + + @Override + public PTable filter(String name, FilterFn> filterFn) { + return parallelDo(name, filterFn, getPTableType()); + } + + @Override + public PTable top(int count) { + return Aggregate.top(this, count, true); + } + + @Override + public PTable bottom(int count) { + return Aggregate.top(this, count, false); + } + + @Override + public PTable> collectValues() { + return Aggregate.collectValues(this); + } + + @Override + public PTable> join(PTable other) { + return Join.join(this, other); + } + + @Override + public PTable, Collection>> cogroup(PTable other) { + return Cogroup.cogroup(this, other); + } + + @Override + public PCollection keys() { + return PTables.keys(this); + } + + @Override + public PCollection values() { + return PTables.values(this); + } + + @Override + public Map materializeToMap() { + return new MaterializableMap(this.materialize()); + } + + /** {@inheritDoc} */ + @Override + public PObject> asMap() { + return new MapPObject(this); + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java new file mode 100644 index 0000000..2e8f9eb --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mem.collect; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; + +import org.apache.crunch.GroupingOptions; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.impl.SingleUseIterable; +import org.apache.crunch.types.PType; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.util.ReflectionUtils; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * In-memory versions of common MapReduce patterns for aggregating key-value data. + */ +abstract class Shuffler implements Iterable>> { + + public abstract void add(Pair record); + + private static Map getMapForKeyType(PType ptype) { + if (ptype != null && Comparable.class.isAssignableFrom(ptype.getTypeClass())) { + return new TreeMap(); + } else { + return Maps.newHashMap(); + } + } + + public static Shuffler create(PType keyType, GroupingOptions options, + Pipeline pipeline) { + Map> map = getMapForKeyType(keyType); + + if (options != null) { + if (Pair.class.equals(keyType.getTypeClass()) && options.getGroupingComparatorClass() != null) { + PType pairKey = keyType.getSubTypes().get(0); + return new SecondarySortShuffler(getMapForKeyType(pairKey)); + } else if (options.getSortComparatorClass() != null) { + RawComparator rc = ReflectionUtils.newInstance(options.getSortComparatorClass(), + pipeline.getConfiguration()); + map = new TreeMap>(rc); + } + } + + return new MapShuffler(map); + } + + private static class HFunction implements Function>, Pair>> { + @Override + public Pair> apply(Map.Entry> input) { + return Pair.>of(input.getKey(), new SingleUseIterable(input.getValue())); + } + } + + private static class MapShuffler extends Shuffler { + private final Map> map; + + public MapShuffler(Map> map) { + this.map = map; + } + + @Override + public Iterator>> iterator() { + return Iterators.transform(map.entrySet().iterator(), + new HFunction()); + } + + @Override + public void add(Pair record) { + if (!map.containsKey(record.first())) { + Collection values = Lists.newArrayList(); + map.put(record.first(), values); + } + map.get(record.first()).add(record.second()); + } + } + + private static class SSFunction implements + Function>>, Pair, Iterable>> { + @Override + public Pair, Iterable> apply(Entry>> input) { + List> values = input.getValue(); + Collections.sort(values, new Comparator>() { + @Override + public int compare(Pair o1, Pair o2) { + return ((Comparable) o1.first()).compareTo(o2.first()); + } + }); + Pair key = Pair.of(input.getKey(), values.get(0).first()); + return Pair.of(key, Iterables.transform(values, new Function, V>() { + @Override + public V apply(Pair input) { + return input.second(); + } + })); + } + } + + private static class SecondarySortShuffler extends Shuffler, V> { + + private Map>> map; + + public SecondarySortShuffler(Map>> map) { + this.map = map; + } + + @Override + public Iterator, Iterable>> iterator() { + return Iterators.transform(map.entrySet().iterator(), new SSFunction()); + } + + @Override + public void add(Pair, V> record) { + K primary = record.first().first(); + if (!map.containsKey(primary)) { + map.put(primary, Lists.>newArrayList()); + } + map.get(primary).add(Pair.of(record.first().second(), record.second())); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java new file mode 100644 index 0000000..6976615 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mem.emit; + +import java.util.List; + +import org.apache.crunch.Emitter; + +import com.google.common.collect.Lists; + +/** + * An {@code Emitter} instance that writes emitted records to a backing + * {@code List}. + * + * @param + */ +public class InMemoryEmitter implements Emitter { + + private final List output; + + public InMemoryEmitter() { + this(Lists. newArrayList()); + } + + public InMemoryEmitter(List output) { + this.output = output; + } + + @Override + public void emit(T emitted) { + output.add(emitted); + } + + @Override + public void flush() { + + } + + public List getOutput() { + return output; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mem/package-info.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/package-info.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/package-info.java new file mode 100644 index 0000000..a55b673 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * In-memory Pipeline implementation for rapid prototyping and testing. + */ +package org.apache.crunch.impl.mem; http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java new file mode 100644 index 0000000..00cf486 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr; + +import java.io.IOException; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pipeline; +import org.apache.crunch.PipelineExecution; +import org.apache.crunch.PipelineResult; +import org.apache.crunch.Source; +import org.apache.crunch.SourceTarget; +import org.apache.crunch.TableSource; +import org.apache.crunch.Target; +import org.apache.crunch.Target.WriteMode; +import org.apache.crunch.fn.IdentityFn; +import org.apache.crunch.impl.mr.collect.InputCollection; +import org.apache.crunch.impl.mr.collect.InputTable; +import org.apache.crunch.impl.mr.collect.PCollectionImpl; +import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; +import org.apache.crunch.impl.mr.collect.UnionCollection; +import org.apache.crunch.impl.mr.collect.UnionTable; +import org.apache.crunch.impl.mr.exec.MRExecutor; +import org.apache.crunch.impl.mr.plan.MSCRPlanner; +import org.apache.crunch.impl.mr.run.RuntimeParameters; +import org.apache.crunch.io.From; +import org.apache.crunch.io.ReadableSource; +import org.apache.crunch.io.ReadableSourceTarget; +import org.apache.crunch.io.To; +import org.apache.crunch.materialize.MaterializableIterable; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +/** + * Pipeline implementation that is executed within Hadoop MapReduce. + */ +public class MRPipeline implements Pipeline { + + private static final Log LOG = LogFactory.getLog(MRPipeline.class); + + private static final Random RANDOM = new Random(); + + private final Class jarClass; + private final String name; + private final Map, Set> outputTargets; + private final Map, MaterializableIterable> outputTargetsToMaterialize; + private Path tempDirectory; + private int tempFileIndex; + private int nextAnonymousStageId; + + private Configuration conf; + + /** + * Instantiate with a default Configuration and name. + * + * @param jarClass Class containing the main driver method for running the pipeline + */ + public MRPipeline(Class jarClass) { + this(jarClass, new Configuration()); + } + + /** + * Instantiate with a custom pipeline name. The name will be displayed in the Hadoop JobTracker. + * + * @param jarClass Class containing the main driver method for running the pipeline + * @param name Display name of the pipeline + */ + public MRPipeline(Class jarClass, String name) { + this(jarClass, name, new Configuration()); + } + + /** + * Instantiate with a custom configuration and default naming. + * + * @param jarClass Class containing the main driver method for running the pipeline + * @param conf Configuration to be used within all MapReduce jobs run in the pipeline + */ + public MRPipeline(Class jarClass, Configuration conf) { + this(jarClass, jarClass.getName(), conf); + } + + /** + * Instantiate with a custom name and configuration. The name will be displayed in the Hadoop + * JobTracker. + * + * @param jarClass Class containing the main driver method for running the pipeline + * @param name Display name of the pipeline + * @param conf Configuration to be used within all MapReduce jobs run in the pipeline + */ + public MRPipeline(Class jarClass, String name, Configuration conf) { + this.jarClass = jarClass; + this.name = name; + this.outputTargets = Maps.newHashMap(); + this.outputTargetsToMaterialize = Maps.newHashMap(); + this.conf = conf; + this.tempDirectory = createTempDirectory(conf); + this.tempFileIndex = 0; + this.nextAnonymousStageId = 0; + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public void setConfiguration(Configuration conf) { + this.conf = conf; + this.tempDirectory = createTempDirectory(conf); + } + + public MRExecutor plan() { + Map, MaterializableIterable> toMaterialize = Maps.newHashMap(); + for (PCollectionImpl c : outputTargets.keySet()) { + if (outputTargetsToMaterialize.containsKey(c)) { + toMaterialize.put(c, outputTargetsToMaterialize.get(c)); + outputTargetsToMaterialize.remove(c); + } + } + MSCRPlanner planner = new MSCRPlanner(this, outputTargets, toMaterialize); + try { + return planner.plan(jarClass, conf); + } catch (IOException e) { + throw new CrunchRuntimeException(e); + } + } + + @Override + public PipelineResult run() { + try { + PipelineExecution pipelineExecution = runAsync(); + pipelineExecution.waitUntilDone(); + return pipelineExecution.getResult(); + } catch (InterruptedException e) { + // TODO: How to handle this without changing signature? + LOG.error("Exception running pipeline", e); + return PipelineResult.EMPTY; + } + } + + @Override + public PipelineExecution runAsync() { + PipelineExecution res = plan().execute(); + outputTargets.clear(); + return res; + } + + @Override + public PipelineResult done() { + PipelineResult res = null; + if (!outputTargets.isEmpty()) { + res = run(); + } + cleanup(); + return res; + } + + public PCollection read(Source source) { + return new InputCollection(source, this); + } + + public PTable read(TableSource source) { + return new InputTable(source, this); + } + + public PCollection readTextFile(String pathName) { + return read(From.textFile(pathName)); + } + + public void write(PCollection pcollection, Target target) { + write(pcollection, target, Target.WriteMode.DEFAULT); + } + + @SuppressWarnings("unchecked") + public void write(PCollection pcollection, Target target, + Target.WriteMode writeMode) { + if (pcollection instanceof PGroupedTableImpl) { + pcollection = ((PGroupedTableImpl) pcollection).ungroup(); + } else if (pcollection instanceof UnionCollection || pcollection instanceof UnionTable) { + pcollection = pcollection.parallelDo("UnionCollectionWrapper", + (MapFn) IdentityFn. getInstance(), pcollection.getPType()); + } + target.handleExisting(writeMode, getConfiguration()); + if (writeMode != WriteMode.APPEND && targetInCurrentRun(target)) { + throw new CrunchRuntimeException("Target " + target + " is already written in current run." + + " Use WriteMode.APPEND in order to write additional data to it."); + } + addOutput((PCollectionImpl) pcollection, target); + } + + private boolean targetInCurrentRun(Target target) { + for (Set targets : outputTargets.values()) { + if (targets.contains(target)) { + return true; + } + } + return false; + } + + private void addOutput(PCollectionImpl impl, Target target) { + if (!outputTargets.containsKey(impl)) { + outputTargets.put(impl, Sets. newHashSet()); + } + outputTargets.get(impl).add(target); + } + + @Override + public Iterable materialize(PCollection pcollection) { + + PCollectionImpl pcollectionImpl = toPcollectionImpl(pcollection); + ReadableSource readableSrc = getMaterializeSourceTarget(pcollectionImpl); + + MaterializableIterable c = new MaterializableIterable(this, readableSrc); + if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) { + outputTargetsToMaterialize.put(pcollectionImpl, c); + } + return c; + } + + /** + * Retrieve a ReadableSourceTarget that provides access to the contents of a {@link PCollection}. + * This is primarily intended as a helper method to {@link #materialize(PCollection)}. The + * underlying data of the ReadableSourceTarget may not be actually present until the pipeline is + * run. + * + * @param pcollection The collection for which the ReadableSourceTarget is to be retrieved + * @return The ReadableSourceTarget + * @throws IllegalArgumentException If no ReadableSourceTarget can be retrieved for the given + * PCollection + */ + public ReadableSource getMaterializeSourceTarget(PCollection pcollection) { + PCollectionImpl impl = toPcollectionImpl(pcollection); + + // First, check to see if this is a readable input collection. + if (impl instanceof InputCollection) { + InputCollection ic = (InputCollection) impl; + if (ic.getSource() instanceof ReadableSource) { + return (ReadableSource) ic.getSource(); + } else { + throw new IllegalArgumentException( + "Cannot materialize non-readable input collection: " + ic); + } + } else if (impl instanceof InputTable) { + InputTable it = (InputTable) impl; + if (it.getSource() instanceof ReadableSource) { + return (ReadableSource) it.getSource(); + } else { + throw new IllegalArgumentException( + "Cannot materialize non-readable input table: " + it); + } + } + + // Next, check to see if this pcollection has already been materialized. + SourceTarget matTarget = impl.getMaterializedAt(); + if (matTarget != null && matTarget instanceof ReadableSourceTarget) { + return (ReadableSourceTarget) matTarget; + } + + // Check to see if we plan on materializing this collection on the + // next run. + ReadableSourceTarget srcTarget = null; + if (outputTargets.containsKey(pcollection)) { + for (Target target : outputTargets.get(impl)) { + if (target instanceof ReadableSourceTarget) { + return (ReadableSourceTarget) target; + } + } + } + + // If we're not planning on materializing it already, create a temporary + // output to hold the materialized records and return that. + SourceTarget st = createIntermediateOutput(pcollection.getPType()); + if (!(st instanceof ReadableSourceTarget)) { + throw new IllegalArgumentException("The PType for the given PCollection is not readable" + + " and cannot be materialized"); + } else { + srcTarget = (ReadableSourceTarget) st; + addOutput(impl, srcTarget); + return srcTarget; + } + } + + /** + * Safely cast a PCollection into a PCollectionImpl, including handling the case of + * UnionCollections. + * + * @param pcollection The PCollection to be cast/transformed + * @return The PCollectionImpl representation + */ + private PCollectionImpl toPcollectionImpl(PCollection pcollection) { + PCollectionImpl pcollectionImpl = null; + if (pcollection instanceof UnionCollection || pcollection instanceof UnionTable) { + pcollectionImpl = (PCollectionImpl) pcollection.parallelDo("UnionCollectionWrapper", + (MapFn) IdentityFn. getInstance(), pcollection.getPType()); + } else { + pcollectionImpl = (PCollectionImpl) pcollection; + } + return pcollectionImpl; + } + + public SourceTarget createIntermediateOutput(PType ptype) { + return ptype.getDefaultFileSource(createTempPath()); + } + + public Path createTempPath() { + tempFileIndex++; + return new Path(tempDirectory, "p" + tempFileIndex); + } + + private static Path createTempDirectory(Configuration conf) { + Path dir = createTemporaryPath(conf); + try { + dir.getFileSystem(conf).mkdirs(dir); + } catch (IOException e) { + throw new RuntimeException("Cannot create job output directory " + dir, e); + } + return dir; + } + + private static Path createTemporaryPath(Configuration conf) { + String baseDir = conf.get(RuntimeParameters.TMP_DIR, "/tmp"); + return new Path(baseDir, "crunch-" + (RANDOM.nextInt() & Integer.MAX_VALUE)); + } + + @Override + public void writeTextFile(PCollection pcollection, String pathName) { + pcollection.parallelDo("asText", new StringifyFn(), Writables.strings()) + .write(To.textFile(pathName)); + } + + private static class StringifyFn extends MapFn { + @Override + public String map(T input) { + return input.toString(); + } + } + + private void cleanup() { + if (!outputTargets.isEmpty()) { + LOG.warn("Not running cleanup while output targets remain"); + return; + } + try { + FileSystem fs = tempDirectory.getFileSystem(conf); + if (fs.exists(tempDirectory)) { + fs.delete(tempDirectory, true); + } + } catch (IOException e) { + LOG.info("Exception during cleanup", e); + } + } + + public int getNextAnonymousStageId() { + return nextAnonymousStageId++; + } + + @Override + public void enableDebug() { + // Turn on Crunch runtime error catching. + getConfiguration().setBoolean(RuntimeParameters.DEBUG, true); + } + + @Override + public String getName() { + return name; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java new file mode 100644 index 0000000..7b8f2ea --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.collect; + +import java.util.List; +import java.util.Set; + +import org.apache.crunch.DoFn; +import org.apache.crunch.ParallelDoOptions; +import org.apache.crunch.SourceTarget; +import org.apache.crunch.impl.mr.plan.DoNode; +import org.apache.crunch.types.PType; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +public class DoCollectionImpl extends PCollectionImpl { + + private final PCollectionImpl parent; + private final DoFn fn; + private final PType ntype; + + DoCollectionImpl(String name, PCollectionImpl parent, DoFn fn, PType ntype) { + this(name, parent, fn, ntype, ParallelDoOptions.builder().build()); + } + + DoCollectionImpl(String name, PCollectionImpl parent, DoFn fn, PType ntype, + ParallelDoOptions options) { + super(name, options); + this.parent = (PCollectionImpl) parent; + this.fn = (DoFn) fn; + this.ntype = ntype; + } + + @Override + protected long getSizeInternal() { + return (long) (fn.scaleFactor() * parent.getSize()); + } + + @Override + public PType getPType() { + return ntype; + } + + @Override + protected void acceptInternal(PCollectionImpl.Visitor visitor) { + visitor.visitDoFnCollection(this); + } + + @Override + public List> getParents() { + return ImmutableList.> of(parent); + } + + @Override + public DoNode createDoNode() { + return DoNode.createFnNode(getName(), fn, ntype); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java new file mode 100644 index 0000000..176643b --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.collect; + +import java.util.List; + +import org.apache.crunch.CombineFn; +import org.apache.crunch.DoFn; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.ParallelDoOptions; +import org.apache.crunch.impl.mr.plan.DoNode; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; + +import com.google.common.collect.ImmutableList; + +public class DoTableImpl extends PTableBase implements PTable { + + private final PCollectionImpl parent; + private final DoFn> fn; + private final PTableType type; + + DoTableImpl(String name, PCollectionImpl parent, DoFn> fn, PTableType ntype) { + this(name, parent, fn, ntype, ParallelDoOptions.builder().build()); + } + + DoTableImpl(String name, PCollectionImpl parent, DoFn> fn, PTableType ntype, + ParallelDoOptions options) { + super(name, options); + this.parent = parent; + this.fn = fn; + this.type = ntype; + } + + @Override + protected long getSizeInternal() { + return (long) (fn.scaleFactor() * parent.getSize()); + } + + @Override + public PTableType getPTableType() { + return type; + } + + @Override + protected void acceptInternal(PCollectionImpl.Visitor visitor) { + visitor.visitDoTable(this); + } + + @Override + public PType> getPType() { + return type; + } + + @Override + public List> getParents() { + return ImmutableList.> of(parent); + } + + @Override + public DoNode createDoNode() { + return DoNode.createFnNode(getName(), fn, type); + } + + public boolean hasCombineFn() { + return fn instanceof CombineFn; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java new file mode 100644 index 0000000..ace5cc1 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.collect; + +import java.util.List; + +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.crunch.Source; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.impl.mr.plan.DoNode; +import org.apache.crunch.types.PType; + +import com.google.common.collect.ImmutableList; + +public class InputCollection extends PCollectionImpl { + + private final Source source; + + public InputCollection(Source source, MRPipeline pipeline) { + super(source.toString()); + this.source = source; + this.pipeline = pipeline; + } + + @Override + public PType getPType() { + return source.getType(); + } + + public Source getSource() { + return source; + } + + @Override + protected long getSizeInternal() { + long sz = source.getSize(pipeline.getConfiguration()); + if (sz < 0) { + throw new IllegalStateException("Input source " + source + " does not exist!"); + } + return sz; + } + + @Override + protected void acceptInternal(PCollectionImpl.Visitor visitor) { + visitor.visitInputCollection(this); + } + + @Override + public List> getParents() { + return ImmutableList.of(); + } + + @Override + public DoNode createDoNode() { + return DoNode.createInputNode(source); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || !(obj instanceof InputCollection)) { + return false; + } + return source.equals(((InputCollection) obj).source); + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(source).toHashCode(); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java new file mode 100644 index 0000000..71f11c5 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.collect; + +import java.util.List; + +import org.apache.crunch.Pair; +import org.apache.crunch.TableSource; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.impl.mr.plan.DoNode; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; + +import com.google.common.collect.ImmutableList; + +public class InputTable extends PTableBase { + + private final TableSource source; + private final InputCollection> asCollection; + + public InputTable(TableSource source, MRPipeline pipeline) { + super(source.toString()); + this.source = source; + this.pipeline = pipeline; + this.asCollection = new InputCollection>(source, pipeline); + } + + public TableSource getSource() { + return source; + } + + @Override + protected long getSizeInternal() { + return asCollection.getSizeInternal(); + } + + @Override + public PTableType getPTableType() { + return source.getTableType(); + } + + @Override + public PType> getPType() { + return source.getType(); + } + + @Override + public List> getParents() { + return ImmutableList.of(); + } + + @Override + protected void acceptInternal(PCollectionImpl.Visitor visitor) { + visitor.visitInputCollection(asCollection); + } + + @Override + public DoNode createDoNode() { + return DoNode.createInputNode(source); + } + + @Override + public int hashCode() { + return asCollection.hashCode(); + } + + @Override + public boolean equals(Object other) { + return asCollection.equals(other); + } +}