crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rushi <>
Subject Reuse PCollection / fork processing
Date Wed, 21 Oct 2015 18:56:57 GMT
In Crunch, is it possible to reuse a PCollection multiple times for
different purposes in the same pipeline run? My pseudocode looks something
like the following, but I get an error File does not exist: /tmp/crunch-..
when I run it. If I comment out the second processing path
(processFinal(paths.second()) line in process() method) I do not get the
error and the pipeline executes successfully.

// 1. Entry point
*public int run() {*

*} // end of run()*

// 2.
*private void process() {*

  Pair<Path, Path> paths = processIntermediate();

  PTable<String, String> ret1 = processFinal(paths.first());
  PTable<String, String> ret2 = processFinal(paths.second());

  return ret1.union(ret2);

*} // end of process()*

// 3.
*private Pair<Path, Path> processIntermediate() {*

  PTable<String, Integer> data = ...; // read data from wherever

  // filter data from the input
  Path path1 = filter(data, fs, true);   // filter() will write a
PCollection to an AvroFileSourceTarget and return its path, which will be
used later to read the collection back and do further processing.
  Path path2 = filter(data, fs, false);

  // getPipeline().run();

  return Pair.of(path1, path2);

*} // end of **processIntermediate*

// 4.
*private PTable<String, String> processFinal(Path path) {*

  PCollection<String> table = getPipeline().read(new
AvroFileSource<>(path), records(Strings));

  return table.parallelDo(...);

*} // end of processFinal*

I imagine I could probably use Oozie workflow actions to simplify the
processing but if this is just a matter of syntax/rearranging the code, I
would like to know it.

*Thanks in advance!*

View raw message