beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [2/2] beam git commit: Add an ArtifactServiceStager
Date Thu, 05 Oct 2017 22:48:03 GMT
Add an ArtifactServiceStager

This stages artifacts over a GRPC channel.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dbf1dc0a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dbf1dc0a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dbf1dc0a

Branch: refs/heads/master
Commit: dbf1dc0a29e7c82cd13f2c1e4abe20dc0e7ea87e
Parents: 0dd4a1c
Author: Thomas Groh <tgroh@google.com>
Authored: Fri Aug 18 15:06:06 2017 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Thu Oct 5 15:47:51 2017 -0700

----------------------------------------------------------------------
 runners/core-construction-java/pom.xml          |  10 +
 .../construction/ArtifactServiceStager.java     | 244 +++++++++++++++++++
 .../construction/ArtifactServiceStagerTest.java | 138 +++++++++++
 .../InMemoryArtifactStagerService.java          | 152 ++++++++++++
 .../src/main/proto/beam_artifact_api.proto      |   4 +-
 5 files changed, 546 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/dbf1dc0a/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index 1a52914..ac712b0 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -121,6 +121,16 @@
       <scope>provided</scope>
     </dependency>
 
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-stub</artifactId>
+    </dependency>
+
     <!-- test dependencies -->
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/dbf1dc0a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java
new file mode 100644
index 0000000..c37f289
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.io.BaseEncoding;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.protobuf.ByteString;
+import io.grpc.Channel;
+import io.grpc.stub.StreamObserver;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.security.MessageDigest;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.ArtifactChunk;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.ArtifactMetadata;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.CommitManifestRequest;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.Manifest;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.PutArtifactRequest;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.PutArtifactResponse;
+import org.apache.beam.sdk.common.runner.v1.ArtifactStagingServiceGrpc;
+import org.apache.beam.sdk.common.runner.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceBlockingStub;
+import org.apache.beam.sdk.common.runner.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceStub;
+
+/** A client to stage files on an {@link ArtifactStagingServiceGrpc ArtifactService}. */
+public class ArtifactServiceStager {
+  // 2 MB per file-request
+  private static final int DEFAULT_BUFFER_SIZE = 2 * 1024 * 1024;
+
+  public static ArtifactServiceStager overChannel(Channel channel) {
+    return overChannel(channel, DEFAULT_BUFFER_SIZE);
+  }
+
+  /**
+   * Create a new ArtifactServiceStager with the specified buffer size. Useful for testing
+   * multi-part uploads.
+   *
+   * @param bufferSize the maximum size of the artifact chunk, in bytes.
+   */
+  static ArtifactServiceStager overChannel(Channel channel, int bufferSize) {
+    return new ArtifactServiceStager(channel, bufferSize);
+  }
+
+  private final int bufferSize;
+  private final ArtifactStagingServiceStub stub;
+  private final ArtifactStagingServiceBlockingStub blockingStub;
+  private final ListeningExecutorService executorService =
+      MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+
+  private ArtifactServiceStager(Channel channel, int bufferSize) {
+    this.stub = ArtifactStagingServiceGrpc.newStub(channel);
+    this.blockingStub = ArtifactStagingServiceGrpc.newBlockingStub(channel);
+    this.bufferSize = bufferSize;
+  }
+
+  public void stage(Iterable<File> files) throws IOException, InterruptedException
{
+    final Map<File, ListenableFuture<ArtifactMetadata>> futures = new HashMap<>();
+    for (File file : files) {
+      futures.put(file, executorService.submit(new StagingCallable(file)));
+    }
+    ListenableFuture<StagingResult> stagingResult =
+        Futures.whenAllComplete(futures.values()).call(new ExtractStagingResultsCallable(futures));
+    stageManifest(stagingResult);
+  }
+
+  private void stageManifest(ListenableFuture<StagingResult> stagingFuture)
+      throws InterruptedException {
+    try {
+      StagingResult stagingResult = stagingFuture.get();
+      if (stagingResult.isSuccess()) {
+        Manifest manifest =
+            Manifest.newBuilder().addAllArtifact(stagingResult.getMetadata()).build();
+        blockingStub.commitManifest(
+            CommitManifestRequest.newBuilder().setManifest(manifest).build());
+      } else {
+        RuntimeException failure =
+            new RuntimeException(
+                String.format(
+                    "Failed to stage %s files: %s",
+                    stagingResult.getFailures().size(), stagingResult.getFailures().keySet()));
+        for (Throwable t : stagingResult.getFailures().values()) {
+          failure.addSuppressed(t);
+        }
+        throw failure;
+      }
+    } catch (ExecutionException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private class StagingCallable implements Callable<ArtifactMetadata> {
+    private final File file;
+
+    private StagingCallable(File file) {
+      this.file = file;
+    }
+
+    @Override
+    public ArtifactMetadata call() throws Exception {
+      // TODO: Add Retries
+      PutArtifactResponseObserver responseObserver = new PutArtifactResponseObserver();
+      StreamObserver<PutArtifactRequest> requestObserver = stub.putArtifact(responseObserver);
+      ArtifactMetadata metadata = ArtifactMetadata.newBuilder().setName(file.getName()).build();
+      requestObserver.onNext(PutArtifactRequest.newBuilder().setMetadata(metadata).build());
+
+      MessageDigest md5Digest = MessageDigest.getInstance("MD5");
+      FileChannel channel = new FileInputStream(file).getChannel();
+      ByteBuffer readBuffer = ByteBuffer.allocate(bufferSize);
+      while (!responseObserver.isTerminal() && channel.position() < channel.size())
{
+        readBuffer.clear();
+        channel.read(readBuffer);
+        readBuffer.flip();
+        md5Digest.update(readBuffer);
+        readBuffer.rewind();
+        PutArtifactRequest request =
+            PutArtifactRequest.newBuilder()
+                .setData(
+                    ArtifactChunk.newBuilder().setData(ByteString.copyFrom(readBuffer)).build())
+                .build();
+        requestObserver.onNext(request);
+      }
+
+      requestObserver.onCompleted();
+      responseObserver.awaitTermination();
+      if (responseObserver.err.get() != null) {
+        throw new RuntimeException(responseObserver.err.get());
+      }
+      return metadata.toBuilder().setMd5(BaseEncoding.base64().encode(md5Digest.digest())).build();
+    }
+
+    private class PutArtifactResponseObserver implements StreamObserver<PutArtifactResponse>
{
+      private final CountDownLatch completed = new CountDownLatch(1);
+      private final AtomicReference<Throwable> err = new AtomicReference<>(null);
+
+      @Override
+      public void onNext(PutArtifactResponse value) {}
+
+      @Override
+      public void onError(Throwable t) {
+        err.set(t);
+        completed.countDown();
+        throw new RuntimeException(t);
+      }
+
+      @Override
+      public void onCompleted() {
+        completed.countDown();
+      }
+
+      public boolean isTerminal() {
+        return completed.getCount() == 0;
+      }
+
+      public void awaitTermination() throws InterruptedException {
+        completed.await();
+      }
+    }
+  }
+
+  private static class ExtractStagingResultsCallable implements Callable<StagingResult>
{
+    private final Map<File, ListenableFuture<ArtifactMetadata>> futures;
+
+    private ExtractStagingResultsCallable(
+        Map<File, ListenableFuture<ArtifactMetadata>> futures) {
+      this.futures = futures;
+    }
+
+    @Override
+    public StagingResult call() throws Exception {
+      Set<ArtifactMetadata> metadata = new HashSet<>();
+      Map<File, Throwable> failures = new HashMap<>();
+      for (Entry<File, ListenableFuture<ArtifactMetadata>> stagedFileResult :
futures.entrySet()) {
+        try {
+          metadata.add(stagedFileResult.getValue().get());
+        } catch (ExecutionException ee) {
+          failures.put(stagedFileResult.getKey(), ee.getCause());
+        } catch (InterruptedException ie) {
+          throw new AssertionError(
+              "This should never happen. " + "All of the futures are complete by construction",
ie);
+        }
+      }
+      if (failures.isEmpty()) {
+        return StagingResult.success(metadata);
+      } else {
+        return StagingResult.failure(failures);
+      }
+    }
+  }
+
+  @AutoValue
+  abstract static class StagingResult {
+    static StagingResult success(Set<ArtifactMetadata> metadata) {
+      return new AutoValue_ArtifactServiceStager_StagingResult(
+          metadata, Collections.<File, Throwable>emptyMap());
+    }
+
+    static StagingResult failure(Map<File, Throwable> failures) {
+      return new AutoValue_ArtifactServiceStager_StagingResult(
+          null, failures);
+    }
+
+    boolean isSuccess() {
+      return getMetadata() != null;
+    }
+
+    @Nullable
+    abstract Set<ArtifactMetadata> getMetadata();
+
+    abstract Map<File, Throwable> getFailures();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/dbf1dc0a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ArtifactServiceStagerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ArtifactServiceStagerTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ArtifactServiceStagerTest.java
new file mode 100644
index 0000000..264aaf8
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ArtifactServiceStagerTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.io.BaseEncoding;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.internal.ServerImpl;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.security.MessageDigest;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.ArtifactMetadata;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Tests for {@link ArtifactServiceStager}.
+ */
+public class ArtifactServiceStagerTest {
+  @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+  private ServerImpl server;
+  private InMemoryArtifactStagerService service;
+  private ArtifactServiceStager stager;
+
+  @Before
+  public void setup() throws IOException {
+    stager =
+        ArtifactServiceStager.overChannel(
+            InProcessChannelBuilder.forName("service_stager").build(), 6);
+    service = new InMemoryArtifactStagerService();
+    server =
+        InProcessServerBuilder.forName("service_stager")
+            .directExecutor()
+            .addService(service)
+            .build()
+            .start();
+  }
+
+  @After
+  public void teardown() {
+    server.shutdownNow();
+  }
+
+  @Test
+  public void testStage() throws Exception {
+    File file = temp.newFile();
+    byte[] content = "foo-bar-baz".getBytes();
+    byte[] contentMd5 = MessageDigest.getInstance("MD5").digest(content);
+    try (FileChannel contentChannel = new FileOutputStream(file).getChannel()) {
+      contentChannel.write(ByteBuffer.wrap(content));
+    }
+
+    stager.stage(Collections.singleton(file));
+
+    assertThat(service.getStagedArtifacts().entrySet(), hasSize(1));
+    byte[] stagedContent = Iterables.getOnlyElement(service.getStagedArtifacts().values());
+    assertThat(stagedContent, equalTo(content));
+
+    ArtifactMetadata staged = service.getManifest().getArtifact(0);
+    assertThat(staged.getName(), equalTo(file.getName()));
+    byte[] manifestMd5 = BaseEncoding.base64().decode(staged.getMd5());
+    assertArrayEquals(contentMd5, manifestMd5);
+
+    assertThat(service.getManifest().getArtifactCount(), equalTo(1));
+    assertThat(staged, equalTo(Iterables.getOnlyElement(service.getStagedArtifacts().keySet())));
+  }
+
+  @Test
+  public void testStagingMultipleFiles() throws Exception {
+    File file = temp.newFile();
+    byte[] content = "foo-bar-baz".getBytes();
+    try (FileChannel contentChannel = new FileOutputStream(file).getChannel()) {
+      contentChannel.write(ByteBuffer.wrap(content));
+    }
+
+    File otherFile = temp.newFile();
+    byte[] otherContent = "spam-ham-eggs".getBytes();
+    try (FileChannel contentChannel = new FileOutputStream(otherFile).getChannel()) {
+      contentChannel.write(ByteBuffer.wrap(otherContent));
+    }
+
+    File thirdFile = temp.newFile();
+    byte[] thirdContent = "up, down, charm, top, bottom, strange".getBytes();
+    try (FileChannel contentChannel = new FileOutputStream(thirdFile).getChannel()) {
+      contentChannel.write(ByteBuffer.wrap(thirdContent));
+    }
+
+    stager.stage(ImmutableList.<File>of(file, otherFile, thirdFile));
+
+    assertThat(service.getManifest().getArtifactCount(), equalTo(3));
+    assertThat(service.getStagedArtifacts().entrySet(), hasSize(3));
+    Set<File> stagedFiles = new HashSet<>();
+    for (byte[] staged : service.getStagedArtifacts().values()) {
+      if (Arrays.equals(staged, content)) {
+        stagedFiles.add(file);
+      } else if (Arrays.equals(staged, otherContent)) {
+        stagedFiles.add(otherFile);
+      } else if (Arrays.equals(staged, thirdContent)) {
+        stagedFiles.add(thirdFile);
+      }
+    }
+    assertThat("All of the files contents should be staged", stagedFiles, hasSize(3));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/dbf1dc0a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/InMemoryArtifactStagerService.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/InMemoryArtifactStagerService.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/InMemoryArtifactStagerService.java
new file mode 100644
index 0000000..e89f829
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/InMemoryArtifactStagerService.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.io.BaseEncoding;
+import io.grpc.stub.StreamObserver;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.ArtifactMetadata;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.CommitManifestResponse;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.Manifest;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.PutArtifactRequest;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.PutArtifactRequest.ContentCase;
+import org.apache.beam.sdk.common.runner.v1.ArtifactApi.PutArtifactResponse;
+import org.apache.beam.sdk.common.runner.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase;
+
+/**
+ * An {@link ArtifactStagingServiceImplBase ArtifactStagingService} which stores the bytes
of the
+ * artifacts in memory..
+ */
+public class InMemoryArtifactStagerService extends ArtifactStagingServiceImplBase {
+  private final Map<ArtifactMetadata, byte[]> artifactBytes;
+  private Manifest manifest;
+
+  public InMemoryArtifactStagerService() {
+    artifactBytes = new HashMap<>();
+  }
+
+  @Override
+  public StreamObserver<ArtifactApi.PutArtifactRequest> putArtifact(
+      StreamObserver<ArtifactApi.PutArtifactResponse> responseObserver) {
+    return new BufferingObserver(responseObserver);
+  }
+
+  @Override
+  public void commitManifest(
+      ArtifactApi.CommitManifestRequest request,
+      StreamObserver<ArtifactApi.CommitManifestResponse> responseObserver) {
+    this.manifest = request.getManifest();
+    responseObserver.onNext(CommitManifestResponse.getDefaultInstance());
+    responseObserver.onCompleted();
+  }
+
+  public Map<ArtifactMetadata, byte[]> getStagedArtifacts() {
+    return Collections.unmodifiableMap(artifactBytes);
+  }
+
+  public Manifest getManifest() {
+    return manifest;
+  }
+
+  private class BufferingObserver implements StreamObserver<PutArtifactRequest> {
+    private final StreamObserver<PutArtifactResponse> responseObserver;
+    private ArtifactMetadata destination = null;
+    private BufferWritingObserver writer = null;
+
+    public BufferingObserver(StreamObserver<PutArtifactResponse> responseObserver)
{
+      this.responseObserver = responseObserver;
+    }
+
+    @Override
+    public void onNext(PutArtifactRequest value) {
+      if (writer == null) {
+        checkArgument(value.getContentCase().equals(ContentCase.METADATA));
+        writer = new BufferWritingObserver();
+        destination = value.getMetadata();
+      } else {
+        writer.onNext(value);
+      }
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      if (writer != null) {
+        writer.onError(t);
+      }
+      onCompleted();
+    }
+
+    @Override
+    public void onCompleted() {
+      if (writer != null) {
+        writer.onCompleted();
+        try {
+          artifactBytes.put(
+              destination
+                  .toBuilder()
+                  .setMd5(
+                      BaseEncoding.base64()
+                          .encode(
+                              MessageDigest.getInstance("MD5").digest(writer.stream.toByteArray())))
+                  .build(),
+              writer.stream.toByteArray());
+        } catch (NoSuchAlgorithmException e) {
+          throw new AssertionError("The Java Spec requires all JVMs to support MD5", e);
+        }
+      }
+      responseObserver.onNext(PutArtifactResponse.getDefaultInstance());
+      responseObserver.onCompleted();
+    }
+  }
+
+  private static class BufferWritingObserver implements StreamObserver<PutArtifactRequest>
{
+    private final ByteArrayOutputStream stream;
+
+    BufferWritingObserver() {
+      stream = new ByteArrayOutputStream();
+    }
+
+    @Override
+    public void onNext(PutArtifactRequest value) {
+      try {
+        stream.write(value.getData().getData().toByteArray());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      onCompleted();
+    }
+
+    @Override
+    public void onCompleted() {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/dbf1dc0a/sdks/common/runner-api/src/main/proto/beam_artifact_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_artifact_api.proto b/sdks/common/runner-api/src/main/proto/beam_artifact_api.proto
index 1ca535b..12b0217 100644
--- a/sdks/common/runner-api/src/main/proto/beam_artifact_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_artifact_api.proto
@@ -62,8 +62,8 @@ message ArtifactMetadata {
   // (Optional) The Unix-like permissions of the artifact
   uint32 permissions = 2;
 
-  // (Optional) The md5 checksum of the artifact. Used, among other things, by harness boot
code to
-  // validate the integrity of the artifact.
+  // (Optional) The base64-encoded md5 checksum of the artifact. Used, among other things,
by
+  // harness boot code to validate the integrity of the artifact.
   string md5 = 3;
 }
 


Mime
View raw message