Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 39F82200C8C for ; Tue, 2 May 2017 03:45:59 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 38BDE160BC1; Tue, 2 May 2017 01:45:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 350FD160BC2 for ; Tue, 2 May 2017 03:45:58 +0200 (CEST) Received: (qmail 48671 invoked by uid 500); 2 May 2017 01:45:56 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 48056 invoked by uid 99); 2 May 2017 01:45:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 May 2017 01:45:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 43AEAE381C; Tue, 2 May 2017 01:45:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jkff@apache.org To: commits@beam.apache.org Date: Tue, 02 May 2017 01:46:00 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [05/11] beam git commit: Moves AvroSink to upper level archived-at: Tue, 02 May 2017 01:45:59 -0000 Moves AvroSink to upper level Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0166e199 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0166e199 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0166e199 Branch: refs/heads/master Commit: 0166e19991af956a48ef99310f5f1916225255aa Parents: 2fa3c34 Author: Eugene Kirpichov Authored: Fri Apr 28 18:05:00 2017 -0700 Committer: Eugene Kirpichov Committed: Mon May 1 18:43:38 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroIO.java | 131 ---------------- .../java/org/apache/beam/sdk/io/AvroSink.java | 150 +++++++++++++++++++ 2 files changed, 150 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0166e199/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 2031569..75e14d5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -19,33 +19,24 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.io.BaseEncoding; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; import java.util.Map; import java.util.regex.Pattern; import javax.annotation.Nullable; import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumWriter; import org.apache.avro.reflect.ReflectData; -import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.Read.Bounded; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; -import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -952,126 +943,4 @@ public class AvroIO { /** Disallow construction of utility class. */ private AvroIO() {} - - /** - * A {@link FileBasedSink} for Avro files. - */ - @VisibleForTesting - static class AvroSink extends FileBasedSink { - private final AvroCoder coder; - private final SerializableAvroCodecFactory codec; - private final ImmutableMap metadata; - - @VisibleForTesting - AvroSink( - FilenamePolicy filenamePolicy, - AvroCoder coder, - SerializableAvroCodecFactory codec, - ImmutableMap metadata) { - super(filenamePolicy); - this.coder = coder; - this.codec = codec; - this.metadata = metadata; - } - - @VisibleForTesting - AvroSink( - String baseOutputFilename, - String extension, - String fileNameTemplate, - AvroCoder coder, - SerializableAvroCodecFactory codec, - ImmutableMap metadata) { - super(baseOutputFilename, extension, fileNameTemplate); - this.coder = coder; - this.codec = codec; - this.metadata = metadata; - } - - @Override - public FileBasedSink.FileBasedWriteOperation createWriteOperation() { - return new AvroWriteOperation<>(this, coder, codec, metadata); - } - - /** - * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation - * FileBasedWriteOperation} for Avro files. - */ - private static class AvroWriteOperation extends FileBasedWriteOperation { - private final AvroCoder coder; - private final SerializableAvroCodecFactory codec; - private final ImmutableMap metadata; - - private AvroWriteOperation(AvroSink sink, - AvroCoder coder, - SerializableAvroCodecFactory codec, - ImmutableMap metadata) { - super(sink); - this.coder = coder; - this.codec = codec; - this.metadata = metadata; - } - - @Override - public FileBasedWriter createWriter(PipelineOptions options) throws Exception { - return new AvroWriter<>(this, coder, codec, metadata); - } - } - - /** - * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter FileBasedWriter} - * for Avro files. - */ - private static class AvroWriter extends FileBasedWriter { - private final AvroCoder coder; - private DataFileWriter dataFileWriter; - private SerializableAvroCodecFactory codec; - private final ImmutableMap metadata; - - public AvroWriter(FileBasedWriteOperation writeOperation, - AvroCoder coder, - SerializableAvroCodecFactory codec, - ImmutableMap metadata) { - super(writeOperation, MimeTypes.BINARY); - this.coder = coder; - this.codec = codec; - this.metadata = metadata; - } - - @SuppressWarnings("deprecation") // uses internal test functionality. - @Override - protected void prepareWrite(WritableByteChannel channel) throws Exception { - DatumWriter datumWriter = coder.getType().equals(GenericRecord.class) - ? new GenericDatumWriter(coder.getSchema()) - : new ReflectDatumWriter(coder.getSchema()); - - dataFileWriter = new DataFileWriter<>(datumWriter).setCodec(codec.getCodec()); - for (Map.Entry entry : metadata.entrySet()) { - Object v = entry.getValue(); - if (v instanceof String) { - dataFileWriter.setMeta(entry.getKey(), (String) v); - } else if (v instanceof Long) { - dataFileWriter.setMeta(entry.getKey(), (Long) v); - } else if (v instanceof byte[]) { - dataFileWriter.setMeta(entry.getKey(), (byte[]) v); - } else { - throw new IllegalStateException( - "Metadata value type must be one of String, Long, or byte[]. Found " - + v.getClass().getSimpleName()); - } - } - dataFileWriter.create(coder.getSchema(), Channels.newOutputStream(channel)); - } - - @Override - public void write(T value) throws Exception { - dataFileWriter.append(value); - } - - @Override - protected void finishWrite() throws Exception { - dataFileWriter.flush(); - } - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/0166e199/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java new file mode 100644 index 0000000..16f233c --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import com.google.common.collect.ImmutableMap; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.Map; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.MimeTypes; + +/** + * A {@link FileBasedSink} for Avro files. + */ +class AvroSink extends FileBasedSink { + private final AvroCoder coder; + private final SerializableAvroCodecFactory codec; + private final ImmutableMap metadata; + + AvroSink( + FilenamePolicy filenamePolicy, + AvroCoder coder, + SerializableAvroCodecFactory codec, + ImmutableMap metadata) { + super(filenamePolicy); + this.coder = coder; + this.codec = codec; + this.metadata = metadata; + } + + AvroSink( + String baseOutputFilename, + String extension, + String fileNameTemplate, + AvroCoder coder, + SerializableAvroCodecFactory codec, + ImmutableMap metadata) { + super(baseOutputFilename, extension, fileNameTemplate); + this.coder = coder; + this.codec = codec; + this.metadata = metadata; + } + + @Override + public FileBasedWriteOperation createWriteOperation() { + return new AvroWriteOperation<>(this, coder, codec, metadata); + } + + /** + * A {@link FileBasedWriteOperation + * FileBasedWriteOperation} for Avro files. + */ + private static class AvroWriteOperation extends FileBasedWriteOperation { + private final AvroCoder coder; + private final SerializableAvroCodecFactory codec; + private final ImmutableMap metadata; + + private AvroWriteOperation(AvroSink sink, + AvroCoder coder, + SerializableAvroCodecFactory codec, + ImmutableMap metadata) { + super(sink); + this.coder = coder; + this.codec = codec; + this.metadata = metadata; + } + + @Override + public FileBasedWriter createWriter(PipelineOptions options) throws Exception { + return new AvroWriter<>(this, coder, codec, metadata); + } + } + + /** + * A {@link FileBasedWriter FileBasedWriter} + * for Avro files. + */ + private static class AvroWriter extends FileBasedWriter { + private final AvroCoder coder; + private DataFileWriter dataFileWriter; + private SerializableAvroCodecFactory codec; + private final ImmutableMap metadata; + + public AvroWriter(FileBasedWriteOperation writeOperation, + AvroCoder coder, + SerializableAvroCodecFactory codec, + ImmutableMap metadata) { + super(writeOperation, MimeTypes.BINARY); + this.coder = coder; + this.codec = codec; + this.metadata = metadata; + } + + @SuppressWarnings("deprecation") // uses internal test functionality. + @Override + protected void prepareWrite(WritableByteChannel channel) throws Exception { + DatumWriter datumWriter = coder.getType().equals(GenericRecord.class) + ? new GenericDatumWriter(coder.getSchema()) + : new ReflectDatumWriter(coder.getSchema()); + + dataFileWriter = new DataFileWriter<>(datumWriter).setCodec(codec.getCodec()); + for (Map.Entry entry : metadata.entrySet()) { + Object v = entry.getValue(); + if (v instanceof String) { + dataFileWriter.setMeta(entry.getKey(), (String) v); + } else if (v instanceof Long) { + dataFileWriter.setMeta(entry.getKey(), (Long) v); + } else if (v instanceof byte[]) { + dataFileWriter.setMeta(entry.getKey(), (byte[]) v); + } else { + throw new IllegalStateException( + "Metadata value type must be one of String, Long, or byte[]. Found " + + v.getClass().getSimpleName()); + } + } + dataFileWriter.create(coder.getSchema(), Channels.newOutputStream(channel)); + } + + @Override + public void write(T value) throws Exception { + dataFileWriter.append(value); + } + + @Override + protected void finishWrite() throws Exception { + dataFileWriter.flush(); + } + } +}