beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamesmal...@apache.org
Subject [41/50] [abbrv] incubator-beam git commit: Rollback reverts "Move Google Cloud Dataflow native sinks to worker module"
Date Fri, 26 Feb 2016 22:55:18 GMT
Rollback reverts "Move Google Cloud Dataflow native sinks to worker module"

This is for Apache Beam.

----Release Notes----

[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=115507028


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7b28d235
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7b28d235
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7b28d235

Branch: refs/heads/master
Commit: 7b28d235000b9232a4fe55ebde76f77be26bd094
Parents: ca98da2
Author: lcwik <lcwik@google.com>
Authored: Wed Feb 24 16:27:52 2016 -0800
Committer: Davor Bonaci <davorbonaci@users.noreply.github.com>
Committed: Thu Feb 25 23:58:28 2016 -0800

----------------------------------------------------------------------
 .../dataflow/sdk/runners/worker/AvroSink.java   | 135 ---------
 .../dataflow/sdk/runners/worker/TextSink.java   | 291 -------------------
 .../dataflow/sdk/util/common/worker/Sink.java   |  64 ----
 3 files changed, 490 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7b28d235/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/AvroSink.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/AvroSink.java
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/AvroSink.java
deleted file mode 100644
index b101a2b..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/AvroSink.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners.worker;
-
-import com.google.cloud.dataflow.sdk.coders.AvroCoder;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
-import com.google.cloud.dataflow.sdk.util.MimeTypes;
-import com.google.cloud.dataflow.sdk.util.ShardingWritableByteChannel;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowedValue.ValueOnlyWindowedValueCoder;
-import com.google.cloud.dataflow.sdk.util.common.worker.Sink;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-
-import java.io.IOException;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.util.ArrayList;
-import java.util.Random;
-
-/**
- * A sink that writes Avro files.
- *
- * @param <T> the type of the elements written to the sink
- */
-public class AvroSink<T> extends Sink<WindowedValue<T>> {
-
-  final String filenamePrefix;
-  final String shardFormat;
-  final String filenameSuffix;
-  final int shardCount;
-  final AvroCoder<T> avroCoder;
-  final Schema schema;
-
-  public AvroSink(String filename, ValueOnlyWindowedValueCoder<T> coder) {
-    this(filename, "", "", 1, coder);
-  }
-
-  public AvroSink(String filenamePrefix, String shardFormat, String filenameSuffix, int shardCount,
-                  ValueOnlyWindowedValueCoder<T> coder) {
-    if (!(coder.getValueCoder() instanceof AvroCoder)) {
-      throw new IllegalArgumentException(String.format(
-          "AvroSink requires an AvroCoder, not a %s", coder.getValueCoder().getClass()));
-    }
-
-    this.filenamePrefix = filenamePrefix;
-    this.shardFormat = shardFormat;
-    this.filenameSuffix = filenameSuffix;
-    this.shardCount = shardCount;
-    this.avroCoder = (AvroCoder<T>) coder.getValueCoder();
-    this.schema = this.avroCoder.getSchema();
-  }
-
-  public SinkWriter<WindowedValue<T>> writer(DatumWriter<T> datumWriter)
throws IOException {
-    WritableByteChannel writer = IOChannelUtils.create(
-        filenamePrefix, shardFormat, filenameSuffix, shardCount, MimeTypes.BINARY);
-
-    if (writer instanceof ShardingWritableByteChannel) {
-      return new AvroShardingFileWriter(datumWriter, (ShardingWritableByteChannel) writer);
-    } else {
-      return new AvroFileWriter(datumWriter, writer);
-    }
-  }
-
-  @Override
-  public SinkWriter<WindowedValue<T>> writer() throws IOException {
-    return writer(avroCoder.createDatumWriter());
-  }
-
-  /** The SinkWriter for an AvroSink. */
-  class AvroFileWriter implements SinkWriter<WindowedValue<T>> {
-    DataFileWriter<T> fileWriter;
-
-    public AvroFileWriter(DatumWriter<T> datumWriter, WritableByteChannel outputChannel)
-        throws IOException {
-      fileWriter = new DataFileWriter<>(datumWriter);
-      fileWriter.create(schema, Channels.newOutputStream(outputChannel));
-    }
-
-    @Override
-    public long add(WindowedValue<T> value) throws IOException {
-      fileWriter.append(value.getValue());
-      // DataFileWriter doesn't support returning the length written. Use the
-      // coder instead.
-      return CoderUtils.encodeToByteArray(avroCoder, value.getValue()).length;
-    }
-
-    @Override
-    public void close() throws IOException {
-      fileWriter.close();
-    }
-  }
-
-  /** The SinkWriter for an AvroSink, which supports sharding. */
-  class AvroShardingFileWriter implements SinkWriter<WindowedValue<T>> {
-    private ArrayList<AvroFileWriter> fileWriters = new ArrayList<>();
-    private final Random random = new Random();
-
-    public AvroShardingFileWriter(
-        DatumWriter<T> datumWriter, ShardingWritableByteChannel outputChannel) throws
IOException {
-      for (int i = 0; i < outputChannel.getNumShards(); i++) {
-        fileWriters.add(new AvroFileWriter(datumWriter, outputChannel.getChannel(i)));
-      }
-    }
-
-    @Override
-    public long add(WindowedValue<T> value) throws IOException {
-      return fileWriters.get(random.nextInt(fileWriters.size())).add(value);
-    }
-
-    @Override
-    public void close() throws IOException {
-      for (AvroFileWriter fileWriter : fileWriters) {
-        fileWriter.close();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7b28d235/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/TextSink.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/TextSink.java
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/TextSink.java
deleted file mode 100644
index f48183c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/TextSink.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners.worker;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
-import com.google.cloud.dataflow.sdk.util.MimeTypes;
-import com.google.cloud.dataflow.sdk.util.ShardingWritableByteChannel;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowedValue.WindowedValueCoder;
-import com.google.cloud.dataflow.sdk.util.common.worker.Sink;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
-import java.util.Random;
-
-import javax.annotation.Nullable;
-
-/**
- * A sink that writes text files.
- *
- * @param <T> the type of the elements written to the sink
- */
-public class TextSink<T> extends Sink<T> {
-
-  static final byte[] NEWLINE = getNewline();
-
-  private static byte[] getNewline() {
-    String newline = "\n";
-    try {
-      return newline.getBytes("UTF-8");
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException("UTF-8 not supported", e);
-    }
-  }
-
-  final String namePrefix;
-  final String shardFormat;
-  final String nameSuffix;
-  final int shardCount;
-  final boolean appendTrailingNewlines;
-  final String header;
-  final String footer;
-  final Coder<T> coder;
-
-  /**
-   * For testing only.
-   *
-   * <p>Used by simple tests that write to a single unsharded file.
-   */
-  public static <V> TextSink<WindowedValue<V>> createForTest(
-      String filename,
-      boolean appendTrailingNewlines,
-      @Nullable String header,
-      @Nullable String footer,
-      Coder<V> coder) {
-    return create(filename,
-                  "",
-                  "",
-                  1,
-                  appendTrailingNewlines,
-                  header,
-                  footer,
-                  WindowedValue.getValueOnlyCoder(coder));
-  }
-
-  /**
-   * For DirectPipelineRunner only.
-   * It wraps the coder with {@code WindowedValue.ValueOnlyCoder}.
-   */
-  public static <V> TextSink<WindowedValue<V>> createForDirectPipelineRunner(
-      String filenamePrefix,
-      String shardFormat,
-      String filenameSuffix,
-      int shardCount,
-      boolean appendTrailingNewlines,
-      @Nullable String header,
-      @Nullable String footer,
-      Coder<V> coder) {
-    return create(filenamePrefix,
-                  shardFormat,
-                  filenameSuffix,
-                  shardCount,
-                  appendTrailingNewlines,
-                  header,
-                  footer,
-                  WindowedValue.getValueOnlyCoder(coder));
-  }
-
-  /**
-   * Constructs a new TextSink.
-   *
-   * @param filenamePrefix the prefix of output filenames.
-   * @param shardFormat the shard name template to use for output filenames.
-   * @param filenameSuffix the suffix of output filenames.
-   * @param shardCount the number of outupt shards to produce.
-   * @param appendTrailingNewlines true to append newlines to each output line.
-   * @param header text to place at the beginning of each output file.
-   * @param footer text to place at the end of each output file.
-   * @param coder the code used to encode elements for output.
-   */
-  public static <V> TextSink<V> create(String filenamePrefix,
-                                       String shardFormat,
-                                       String filenameSuffix,
-                                       int shardCount,
-                                       boolean appendTrailingNewlines,
-                                       @Nullable String header,
-                                       @Nullable String footer,
-                                       Coder<V> coder) {
-    return new TextSink<>(filenamePrefix,
-                          shardFormat,
-                          filenameSuffix,
-                          shardCount,
-                          appendTrailingNewlines,
-                          header,
-                          footer,
-                          coder);
-  }
-
-  private TextSink(String filenamePrefix,
-                   String shardFormat,
-                   String filenameSuffix,
-                   int shardCount,
-                   boolean appendTrailingNewlines,
-                   @Nullable String header,
-                   @Nullable String footer,
-                   Coder<T> coder) {
-    this.namePrefix = filenamePrefix;
-    this.shardFormat = shardFormat;
-    this.nameSuffix = filenameSuffix;
-    this.shardCount = shardCount;
-    this.appendTrailingNewlines = appendTrailingNewlines;
-    this.header = header;
-    this.footer = footer;
-    this.coder = coder;
-  }
-
-  @Override
-  public SinkWriter<T> writer() throws IOException {
-    String mimeType;
-
-    if (!(coder instanceof WindowedValueCoder)) {
-      throw new IOException(
-          "Expected WindowedValueCoder for inputCoder, got: "
-          + coder.getClass().getName());
-    }
-    Coder<?> valueCoder = ((WindowedValueCoder<?>) coder).getValueCoder();
-    if (valueCoder.equals(StringUtf8Coder.of())) {
-      mimeType = MimeTypes.TEXT;
-    } else {
-      mimeType = MimeTypes.BINARY;
-    }
-
-    WritableByteChannel writer = IOChannelUtils.create(namePrefix, shardFormat,
-        nameSuffix, shardCount, mimeType);
-
-    if (writer instanceof ShardingWritableByteChannel) {
-      return new ShardingTextFileWriter((ShardingWritableByteChannel) writer);
-    } else {
-      return new TextFileWriter(writer);
-    }
-  }
-
-  /**
-   * Abstract SinkWriter base class shared by sharded and unsharded Text
-   * writer implementations.
-   */
-  abstract class AbstractTextFileWriter implements SinkWriter<T> {
-    protected void init() throws IOException {
-      if (header != null) {
-        printLine(ShardingWritableByteChannel.ALL_SHARDS,
-            CoderUtils.encodeToByteArray(StringUtf8Coder.of(), header));
-      }
-    }
-
-    /**
-     * Adds a value to the sink. Returns the size in bytes of the data written.
-     * The return value does -not- include header/footer size.
-     */
-    @Override
-    public long add(T value) throws IOException {
-      return printLine(getShardNum(value),
-          CoderUtils.encodeToByteArray(coder, value));
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (footer != null) {
-        printLine(ShardingWritableByteChannel.ALL_SHARDS,
-            CoderUtils.encodeToByteArray(StringUtf8Coder.of(), footer));
-      }
-    }
-
-    protected long printLine(int shardNum, byte[] line) throws IOException {
-      long length = line.length;
-      write(shardNum, ByteBuffer.wrap(line));
-
-      if (appendTrailingNewlines) {
-        write(shardNum, ByteBuffer.wrap(NEWLINE));
-        length += NEWLINE.length;
-      }
-
-      return length;
-    }
-
-    protected abstract void write(int shardNum, ByteBuffer buf)
-        throws IOException;
-    protected abstract int getShardNum(T value);
-  }
-
-  /** An unsharded SinkWriter for a TextSink. */
-  class TextFileWriter extends AbstractTextFileWriter {
-    private final WritableByteChannel outputChannel;
-
-    TextFileWriter(WritableByteChannel outputChannel) throws IOException {
-      this.outputChannel = outputChannel;
-      init();
-    }
-
-    @Override
-    public void close() throws IOException {
-      try {
-        super.close();
-      } finally {
-        outputChannel.close();
-      }
-    }
-
-    @Override
-    protected void write(int shardNum, ByteBuffer buf) throws IOException {
-      outputChannel.write(buf);
-    }
-
-    @Override
-    protected int getShardNum(T value) {
-      return 0;
-    }
-  }
-
-  /** A sharding SinkWriter for a TextSink. */
-  class ShardingTextFileWriter extends AbstractTextFileWriter {
-    private final Random rng = new Random();
-    private final int numShards;
-    private final ShardingWritableByteChannel outputChannel;
-
-    // TODO: add support for user-defined sharding function.
-    ShardingTextFileWriter(ShardingWritableByteChannel outputChannel)
-        throws IOException {
-      this.outputChannel = outputChannel;
-      numShards = outputChannel.getNumShards();
-      init();
-    }
-
-    @Override
-    public void close() throws IOException {
-      try {
-        super.close();
-      } finally {
-        outputChannel.close();
-      }
-    }
-
-    @Override
-    protected void write(int shardNum, ByteBuffer buf) throws IOException {
-      outputChannel.writeToShard(shardNum, buf);
-    }
-
-    @Override
-    protected int getShardNum(T value) {
-      return rng.nextInt(numShards);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7b28d235/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Sink.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Sink.java
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Sink.java
deleted file mode 100644
index b48d70b..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/Sink.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.util.common.worker;
-
-import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler.StateKind;
-
-import java.io.IOException;
-
-/**
- * Abstract base class for Sinks.
- *
- * <p>A Sink is written to by getting a SinkWriter and adding values to
- * it.
- *
- * @param <T> the type of the elements written to the sink
- */
-public abstract class Sink<T> {
-  /**
-   * Returns a Writer that allows writing to this Sink.
-   */
-  public abstract SinkWriter<T> writer() throws IOException;
-
-  /**
-   * Writes to a Sink.
-   */
-  public interface SinkWriter<ElemT> extends AutoCloseable {
-    /**
-     * Adds a value to the sink. Returns the size in bytes of the data written.
-     */
-    public long add(ElemT value) throws IOException;
-
-    @Override
-    public void close() throws IOException;
-  }
-
-  /**
-   * Returns whether this Sink can be restarted.
-   */
-  public boolean supportsRestart() {
-    return false;
-  }
-
-  /**
-   * The default state kind of all the states reported in this Sink.
-   * Defaults to {@link StateKind#USER}.
-   */
-  protected StateKind getStateSamplerStateKind() {
-    return StateKind.USER;
-  }
-}


Mime
View raw message