Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 43CC1200C5C for ; Thu, 20 Apr 2017 23:49:31 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 426DD160BB1; Thu, 20 Apr 2017 21:49:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BB663160B90 for ; Thu, 20 Apr 2017 23:49:29 +0200 (CEST) Received: (qmail 4142 invoked by uid 500); 20 Apr 2017 21:49:28 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 4133 invoked by uid 99); 20 Apr 2017 21:49:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Apr 2017 21:49:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B71D9F4A24; Thu, 20 Apr 2017 21:49:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.apache.org Date: Thu, 20 Apr 2017 21:49:28 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/4] beam git commit: Remove Sink in favor of FileBasedSink archived-at: Thu, 20 Apr 2017 21:49:31 -0000 Repository: beam Updated Branches: refs/heads/master 33078d20c -> 4f8b1cc22 http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java new file mode 100644 index 0000000..ee349a9 --- /dev/null +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import java.io.Serializable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; + +/** + * This class is deprecated, and only exists for HDFSFileSink. + */ +@Deprecated +public abstract class Sink implements Serializable, HasDisplayData { + /** + * Ensures that the sink is valid and can be written to before the write operation begins. One + * should use {@link com.google.common.base.Preconditions} to implement this method. + */ + public abstract void validate(PipelineOptions options); + + /** + * Returns an instance of a {@link WriteOperation} that can write to this Sink. + */ + public abstract WriteOperation createWriteOperation(PipelineOptions options); + + /** + * {@inheritDoc} + * + *

By default, does not register any display data. Implementors may override this method + * to provide their own display data. + */ + @Override + public void populateDisplayData(DisplayData.Builder builder) {} + + /** + * A {@link WriteOperation} defines the process of a parallel write of objects to a Sink. + * + *

The {@code WriteOperation} defines how to perform initialization and finalization of a + * parallel write to a sink as well as how to create a {@link Sink.Writer} object that can write + * a bundle to the sink. + * + *

Since operations in Beam may be run multiple times for redundancy or fault-tolerance, + * the initialization and finalization defined by a WriteOperation must be idempotent. + * + *

{@code WriteOperation}s may be mutable; a {@code WriteOperation} is serialized after the + * call to {@code initialize} method and deserialized before calls to + * {@code createWriter} and {@code finalized}. However, it is not + * reserialized after {@code createWriter}, so {@code createWriter} should not mutate the + * state of the {@code WriteOperation}. + * + *

See {@link Sink} for more detailed documentation about the process of writing to a Sink. + * + * @param The type of objects to write + * @param The result of a per-bundle write + */ + public abstract static class WriteOperation implements Serializable { + /** + * Performs initialization before writing to the sink. Called before writing begins. + */ + public abstract void initialize(PipelineOptions options) throws Exception; + + /** + * Indicates that the operation will be performing windowed writes. + */ + public abstract void setWindowedWrites(boolean windowedWrites); + + /** + * Given an Iterable of results from bundle writes, performs finalization after writing and + * closes the sink. Called after all bundle writes are complete. + * + *

The results that are passed to finalize are those returned by bundles that completed + * successfully. Although bundles may have been run multiple times (for fault-tolerance), only + * one writer result will be passed to finalize for each bundle. An implementation of finalize + * should perform clean up of any failed and successfully retried bundles. Note that these + * failed bundles will not have their writer result passed to finalize, so finalize should be + * capable of locating any temporary/partial output written by failed bundles. + * + *

A best practice is to make finalize atomic. If this is impossible given the semantics + * of the sink, finalize should be idempotent, as it may be called multiple times in the case of + * failure/retry or for redundancy. + * + *

Note that the iteration order of the writer results is not guaranteed to be consistent if + * finalize is called multiple times. + * + * @param writerResults an Iterable of results from successful bundle writes. + */ + public abstract void finalize(Iterable writerResults, PipelineOptions options) + throws Exception; + + /** + * Creates a new {@link Sink.Writer} to write a bundle of the input to the sink. + * + *

The bundle id that the writer will use to uniquely identify its output will be passed to + * {@link Writer#openWindowed} or {@link Writer#openUnwindowed}. + * + *

Must not mutate the state of the WriteOperation. + */ + public abstract Writer createWriter(PipelineOptions options) throws Exception; + + /** + * Returns the Sink that this write operation writes to. + */ + public abstract Sink getSink(); + + /** + * Returns a coder for the writer result type. + */ + public abstract Coder getWriterResultCoder(); + } + + /** + * A Writer writes a bundle of elements from a PCollection to a sink. + * {@link Writer#openWindowed} or {@link Writer#openUnwindowed} is called before writing begins + * and {@link Writer#close} is called after all elements in the bundle have been written. + * {@link Writer#write} writes an element to the sink. + * + *

Note that any access to static members or methods of a Writer must be thread-safe, as + * multiple instances of a Writer may be instantiated in different threads on the same worker. + * + *

See {@link Sink} for more detailed documentation about the process of writing to a Sink. + * + * @param The type of object to write + * @param The writer results type (e.g., the bundle's output filename, as String) + */ + public abstract static class Writer { + /** + * Performs bundle initialization. For example, creates a temporary file for writing or + * initializes any state that will be used across calls to {@link Writer#write}. + * + *

The unique id that is given to open should be used to ensure that the writer's output does + * not interfere with the output of other Writers, as a bundle may be executed many times for + * fault tolerance. See {@link Sink} for more information about bundle ids. + * + *

The window and paneInfo arguments are populated when windowed writes are requested. + * shard and numbShards are populated for the case of static sharding. In cases where the + * runner is dynamically picking sharding, shard and numShards might both be set to -1. + */ + public abstract void openWindowed(String uId, + BoundedWindow window, + PaneInfo paneInfo, + int shard, + int numShards) throws Exception; + + /** + * Perform bundle initialization for the case where the file is written unwindowed. + */ + public abstract void openUnwindowed(String uId, + int shard, + int numShards) throws Exception; + + public abstract void cleanup() throws Exception; + + /** + * Called for each value in the bundle. + */ + public abstract void write(T value) throws Exception; + + /** + * Finishes writing the bundle. Closes any resources used for writing the bundle. + * + *

Returns a writer result that will be used in the {@link Sink.WriteOperation}'s + * finalization. The result should contain some way to identify the output of this bundle (using + * the bundle id). {@link WriteOperation#finalize} will use the writer result to identify + * successful writes. See {@link Sink} for more information about bundle ids. + * + * @return the writer result + */ + public abstract WriteT close() throws Exception; + + /** + * Returns the write operation this writer belongs to. + */ + public abstract WriteOperation getWriteOperation(); + + + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java new file mode 100644 index 0000000..8c2fc99 --- /dev/null +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java @@ -0,0 +1,582 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.Nullable; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.hdfs.Sink.WriteOperation; +import org.apache.beam.sdk.io.hdfs.Sink.Writer; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PDone; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is deprecated, and only exists currently for HDFSFileSink. + */ +@Deprecated +public class Write extends PTransform, PDone> { + private static final Logger LOG = LoggerFactory.getLogger(Write.class); + + private static final int UNKNOWN_SHARDNUM = -1; + private static final int UNKNOWN_NUMSHARDS = -1; + + private final Sink sink; + // This allows the number of shards to be dynamically computed based on the input + // PCollection. + @Nullable + private final PTransform, PCollectionView> computeNumShards; + // We don't use a side input for static sharding, as we want this value to be updatable + // when a pipeline is updated. + @Nullable + private final ValueProvider numShardsProvider; + private boolean windowedWrites; + + /** + * Creates a {@link Write} transform that writes to the given {@link Sink}, letting the runner + * control how many different shards are produced. + */ + public static Write to(Sink sink) { + checkNotNull(sink, "sink"); + return new Write<>(sink, null /* runner-determined sharding */, null, false); + } + + private Write( + Sink sink, + @Nullable PTransform, PCollectionView> computeNumShards, + @Nullable ValueProvider numShardsProvider, + boolean windowedWrites) { + this.sink = sink; + this.computeNumShards = computeNumShards; + this.numShardsProvider = numShardsProvider; + this.windowedWrites = windowedWrites; + } + + @Override + public PDone expand(PCollection input) { + checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites, + "%s can only be applied to an unbounded PCollection if doing windowed writes", + Write.class.getSimpleName()); + PipelineOptions options = input.getPipeline().getOptions(); + sink.validate(options); + return createWrite(input, sink.createWriteOperation(options)); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink")) + .include("sink", sink); + if (getSharding() != null) { + builder.include("sharding", getSharding()); + } else if (getNumShards() != null) { + String numShards = getNumShards().isAccessible() + ? getNumShards().get().toString() : getNumShards().toString(); + builder.add(DisplayData.item("numShards", numShards) + .withLabel("Fixed Number of Shards")); + } + } + + /** + * Returns the {@link Sink} associated with this PTransform. + */ + public Sink getSink() { + return sink; + } + + /** + * Gets the {@link PTransform} that will be used to determine sharding. This can be either a + * static number of shards (as following a call to {@link #withNumShards(int)}), dynamic (by + * {@link #withSharding(PTransform)}), or runner-determined (by {@link + * #withRunnerDeterminedSharding()}. + */ + @Nullable + public PTransform, PCollectionView> getSharding() { + return computeNumShards; + } + + public ValueProvider getNumShards() { + return numShardsProvider; + } + + /** + * Returns a new {@link Write} that will write to the current {@link Sink} using the + * specified number of shards. + * + *

This option should be used sparingly as it can hurt performance. See {@link Write} for + * more information. + * + *

A value less than or equal to 0 will be equivalent to the default behavior of + * runner-determined sharding. + */ + public Write withNumShards(int numShards) { + if (numShards > 0) { + return withNumShards(StaticValueProvider.of(numShards)); + } + return withRunnerDeterminedSharding(); + } + + /** + * Returns a new {@link Write} that will write to the current {@link Sink} using the + * {@link ValueProvider} specified number of shards. + * + *

This option should be used sparingly as it can hurt performance. See {@link Write} for + * more information. + */ + public Write withNumShards(ValueProvider numShardsProvider) { + return new Write<>(sink, null, numShardsProvider, windowedWrites); + } + + /** + * Returns a new {@link Write} that will write to the current {@link Sink} using the + * specified {@link PTransform} to compute the number of shards. + * + *

This option should be used sparingly as it can hurt performance. See {@link Write} for + * more information. + */ + public Write withSharding(PTransform, PCollectionView> sharding) { + checkNotNull( + sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead"); + return new Write<>(sink, sharding, null, windowedWrites); + } + + /** + * Returns a new {@link Write} that will write to the current {@link Sink} with + * runner-determined sharding. + */ + public Write withRunnerDeterminedSharding() { + return new Write<>(sink, null, null, windowedWrites); + } + + /** + * Returns a new {@link Write} that writes preserves windowing on it's input. + * + *

If this option is not specified, windowing and triggering are replaced by + * {@link GlobalWindows} and {@link DefaultTrigger}. + * + *

If there is no data for a window, no output shards will be generated for that window. + * If a window triggers multiple times, then more than a single output shard might be + * generated multiple times; it's up to the sink implementation to keep these output shards + * unique. + * + *

This option can only be used if {@link #withNumShards(int)} is also set to a + * positive value. + */ + public Write withWindowedWrites() { + return new Write<>(sink, computeNumShards, numShardsProvider, true); + } + + /** + * Writes all the elements in a bundle using a {@link Writer} produced by the + * {@link WriteOperation} associated with the {@link Sink}. + */ + private class WriteBundles extends DoFn { + // Writer that will write the records in this bundle. Lazily + // initialized in processElement. + private Writer writer = null; + private final PCollectionView> writeOperationView; + + WriteBundles(PCollectionView> writeOperationView) { + this.writeOperationView = writeOperationView; + } + + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) throws Exception { + // Lazily initialize the Writer + if (writer == null) { + WriteOperation writeOperation = c.sideInput(writeOperationView); + LOG.info("Opening writer for write operation {}", writeOperation); + writer = writeOperation.createWriter(c.getPipelineOptions()); + + if (windowedWrites) { + writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), UNKNOWN_SHARDNUM, + UNKNOWN_NUMSHARDS); + } else { + writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS); + } + LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView); + } + try { + writer.write(c.element()); + } catch (Exception e) { + // Discard write result and close the write. + try { + writer.close(); + // The writer does not need to be reset, as this DoFn cannot be reused. + } catch (Exception closeException) { + if (closeException instanceof InterruptedException) { + // Do not silently ignore interrupted state. + Thread.currentThread().interrupt(); + } + // Do not mask the exception that caused the write to fail. + e.addSuppressed(closeException); + } + throw e; + } + } + + @FinishBundle + public void finishBundle(Context c) throws Exception { + if (writer != null) { + WriteT result = writer.close(); + c.output(result); + // Reset state in case of reuse. + writer = null; + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.delegate(Write.this); + } + } + + /** + * Like {@link WriteBundles}, but where the elements for each shard have been collected into + * a single iterable. + * + * @see WriteBundles + */ + private class WriteShardedBundles extends DoFn>, WriteT> { + private final PCollectionView> writeOperationView; + private final PCollectionView numShardsView; + + WriteShardedBundles(PCollectionView> writeOperationView, + PCollectionView numShardsView) { + this.writeOperationView = writeOperationView; + this.numShardsView = numShardsView; + } + + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) throws Exception { + int numShards = numShardsView != null ? c.sideInput(numShardsView) : getNumShards().get(); + // In a sharded write, single input element represents one shard. We can open and close + // the writer in each call to processElement. + WriteOperation writeOperation = c.sideInput(writeOperationView); + LOG.info("Opening writer for write operation {}", writeOperation); + Writer writer = writeOperation.createWriter(c.getPipelineOptions()); + if (windowedWrites) { + writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey(), + numShards); + } else { + writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS); + } + LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView); + + try { + try { + for (T t : c.element().getValue()) { + writer.write(t); + } + } catch (Exception e) { + try { + writer.close(); + } catch (Exception closeException) { + if (closeException instanceof InterruptedException) { + // Do not silently ignore interrupted state. + Thread.currentThread().interrupt(); + } + // Do not mask the exception that caused the write to fail. + e.addSuppressed(closeException); + } + throw e; + } + + // Close the writer; if this throws let the error propagate. + WriteT result = writer.close(); + c.output(result); + } catch (Exception e) { + // If anything goes wrong, make sure to delete the temporary file. + writer.cleanup(); + throw e; + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.delegate(Write.this); + } + } + + private static class ApplyShardingKey extends DoFn> { + private final PCollectionView numShardsView; + private final ValueProvider numShardsProvider; + private int shardNumber; + + ApplyShardingKey(PCollectionView numShardsView, + ValueProvider numShardsProvider) { + this.numShardsView = numShardsView; + this.numShardsProvider = numShardsProvider; + shardNumber = UNKNOWN_SHARDNUM; + } + + @ProcessElement + public void processElement(ProcessContext context) { + int shardCount = 0; + if (numShardsView != null) { + shardCount = context.sideInput(numShardsView); + } else { + checkNotNull(numShardsProvider); + shardCount = numShardsProvider.get(); + } + checkArgument( + shardCount > 0, + "Must have a positive number of shards specified for non-runner-determined sharding." + + " Got %s", + shardCount); + if (shardNumber == UNKNOWN_SHARDNUM) { + // We want to desynchronize the first record sharding key for each instance of + // ApplyShardingKey, so records in a small PCollection will be statistically balanced. + shardNumber = ThreadLocalRandom.current().nextInt(shardCount); + } else { + shardNumber = (shardNumber + 1) % shardCount; + } + context.output(KV.of(shardNumber, context.element())); + } + } + + /** + * A write is performed as sequence of three {@link ParDo}'s. + * + *

In the first, a do-once ParDo is applied to a singleton PCollection containing the Sink's + * {@link WriteOperation}. In this initialization ParDo, {@link WriteOperation#initialize} is + * called. The output of this ParDo is a singleton PCollection + * containing the WriteOperation. + * + *

This singleton collection containing the WriteOperation is then used as a side input to a + * ParDo over the PCollection of elements to write. In this bundle-writing phase, + * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}. + * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and + * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for + * every element in the bundle. The output of this ParDo is a PCollection of + * writer result objects (see {@link Sink} for a description of writer results)-one for + * each bundle. + * + *

The final do-once ParDo uses the singleton collection of the WriteOperation as input and + * the collection of writer results as a side-input. In this ParDo, + * {@link WriteOperation#finalize} is called to finalize the write. + * + *

If the write of any element in the PCollection fails, {@link Writer#close} will be called + * before the exception that caused the write to fail is propagated and the write result will be + * discarded. + * + *

Since the {@link WriteOperation} is serialized after the initialization ParDo and + * deserialized in the bundle-writing and finalization phases, any state change to the + * WriteOperation object that occurs during initialization is visible in the latter phases. + * However, the WriteOperation is not serialized after the bundle-writing phase. This is why + * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate + * WriteOperation). + */ + private PDone createWrite( + PCollection input, WriteOperation writeOperation) { + Pipeline p = input.getPipeline(); + writeOperation.setWindowedWrites(windowedWrites); + + // A coder to use for the WriteOperation. + @SuppressWarnings("unchecked") + Coder> operationCoder = + (Coder>) SerializableCoder.of(writeOperation.getClass()); + + // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize + // the sink. + PCollection> operationCollection = + p.apply("CreateOperationCollection", Create.of(writeOperation).withCoder(operationCoder)); + + // Initialize the resource in a do-once ParDo on the WriteOperation. + operationCollection = operationCollection + .apply("Initialize", ParDo.of( + new DoFn, WriteOperation>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + WriteOperation writeOperation = c.element(); + LOG.info("Initializing write operation {}", writeOperation); + writeOperation.initialize(c.getPipelineOptions()); + writeOperation.setWindowedWrites(windowedWrites); + LOG.debug("Done initializing write operation {}", writeOperation); + // The WriteOperation is also the output of this ParDo, so it can have mutable + // state. + c.output(writeOperation); + } + })) + .setCoder(operationCoder); + + // Create a view of the WriteOperation to be used as a sideInput to the parallel write phase. + final PCollectionView> writeOperationView = + operationCollection.apply(View.>asSingleton()); + + if (!windowedWrites) { + // Re-window the data into the global window and remove any existing triggers. + input = + input.apply( + Window.into(new GlobalWindows()) + .triggering(DefaultTrigger.of()) + .discardingFiredPanes()); + } + + + // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation + // as a side input) and collect the results of the writes in a PCollection. + // There is a dependency between this ParDo and the first (the WriteOperation PCollection + // as a side input), so this will happen after the initial ParDo. + PCollection results; + final PCollectionView numShardsView; + if (computeNumShards == null && numShardsProvider == null) { + if (windowedWrites) { + throw new IllegalStateException("When doing windowed writes, numShards must be set" + + "explicitly to a positive value"); + } + numShardsView = null; + results = input + .apply("WriteBundles", + ParDo.of(new WriteBundles<>(writeOperationView)) + .withSideInputs(writeOperationView)); + } else { + if (computeNumShards != null) { + numShardsView = input.apply(computeNumShards); + results = input + .apply("ApplyShardLabel", ParDo.of( + new ApplyShardingKey(numShardsView, null)).withSideInputs(numShardsView)) + .apply("GroupIntoShards", GroupByKey.create()) + .apply("WriteShardedBundles", + ParDo.of(new WriteShardedBundles<>(writeOperationView, numShardsView)) + .withSideInputs(numShardsView, writeOperationView)); + } else { + numShardsView = null; + results = input + .apply("ApplyShardLabel", ParDo.of(new ApplyShardingKey(null, numShardsProvider))) + .apply("GroupIntoShards", GroupByKey.create()) + .apply("WriteShardedBundles", + ParDo.of(new WriteShardedBundles<>(writeOperationView, null)) + .withSideInputs(writeOperationView)); + } + } + results.setCoder(writeOperation.getWriterResultCoder()); + + if (windowedWrites) { + // When processing streaming windowed writes, results will arrive multiple times. This + // means we can't share the below implementation that turns the results into a side input, + // as new data arriving into a side input does not trigger the listening DoFn. Instead + // we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered + // whenever new data arrives. + PCollection> keyedResults = + results.apply("AttachSingletonKey", WithKeys.of((Void) null)); + keyedResults.setCoder(KvCoder.of(VoidCoder.of(), writeOperation + .getWriterResultCoder())); + + // Is the continuation trigger sufficient? + keyedResults + .apply("FinalizeGroupByKey", GroupByKey.create()) + .apply("Finalize", ParDo.of(new DoFn>, Integer>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + WriteOperation writeOperation = c.sideInput(writeOperationView); + LOG.info("Finalizing write operation {}.", writeOperation); + List results = Lists.newArrayList(c.element().getValue()); + writeOperation.finalize(results, c.getPipelineOptions()); + LOG.debug("Done finalizing write operation {}", writeOperation); + } + }).withSideInputs(writeOperationView)); + } else { + final PCollectionView> resultsView = + results.apply(View.asIterable()); + ImmutableList.Builder> sideInputs = + ImmutableList.>builder().add(resultsView); + if (numShardsView != null) { + sideInputs.add(numShardsView); + } + + // Finalize the write in another do-once ParDo on the singleton collection containing the + // Writer. The results from the per-bundle writes are given as an Iterable side input. + // The WriteOperation's state is the same as after its initialization in the first do-once + // ParDo. There is a dependency between this ParDo and the parallel write (the writer + // results collection as a side input), so it will happen after the parallel write. + // For the non-windowed case, we guarantee that if no data is written but the user has + // set numShards, then all shards will be written out as empty files. For this reason we + // use a side input here. + operationCollection + .apply("Finalize", ParDo.of(new DoFn, Integer>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + WriteOperation writeOperation = c.element(); + LOG.info("Finalizing write operation {}.", writeOperation); + List results = Lists.newArrayList(c.sideInput(resultsView)); + LOG.debug("Side input initialized to finalize write operation {}.", writeOperation); + + // We must always output at least 1 shard, and honor user-specified numShards if + // set. + int minShardsNeeded; + if (numShardsView != null) { + minShardsNeeded = c.sideInput(numShardsView); + } else if (numShardsProvider != null) { + minShardsNeeded = numShardsProvider.get(); + } else { + minShardsNeeded = 1; + } + int extraShardsNeeded = minShardsNeeded - results.size(); + if (extraShardsNeeded > 0) { + LOG.info( + "Creating {} empty output shards in addition to {} written for a total of " + + " {}.", extraShardsNeeded, results.size(), minShardsNeeded); + for (int i = 0; i < extraShardsNeeded; ++i) { + Writer writer = writeOperation.createWriter(c.getPipelineOptions()); + writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, + UNKNOWN_NUMSHARDS); + WriteT emptyWrite = writer.close(); + results.add(emptyWrite); + } + LOG.debug("Done creating extra shards."); + } + writeOperation.finalize(results, c.getPipelineOptions()); + LOG.debug("Done finalizing write operation {}", writeOperation); + } + }).withSideInputs(sideInputs.build())); + } + return PDone.in(input.getPipeline()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java index cedd812..aeb258f 100644 --- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java +++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java @@ -34,7 +34,6 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.mapred.AvroKey; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; -import org.apache.beam.sdk.io.Sink; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.SerializableFunction;