beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-2899) Universal Local Runner
Date Mon, 27 Nov 2017 23:05:00 GMT

    [ https://issues.apache.org/jira/browse/BEAM-2899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16267754#comment-16267754
] 

ASF GitHub Bot commented on BEAM-2899:
--------------------------------------

tgroh closed pull request #4127: [BEAM-2899] Add a Logging Service to FnExecution
URL: https://github.com/apache/beam/pull/4127
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto b/model/fn-execution/src/main/proto/beam_fn_api.proto
index 132d366f707..a2d0eb47942 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -636,9 +636,9 @@ message LogEntry {
   // free not to use all severity levels in their log messages.
   message Severity {
     enum Enum {
+      // Unspecified level information. Will be logged at the TRACE level.
       UNSPECIFIED = 0;
-      // Trace level information, also the default log level unless
-      // another severity is specified.
+      // Trace level information.
       TRACE = 1;
       // Debugging information.
       DEBUG = 2;
diff --git a/runners/java-fn-execution/pom.xml b/runners/java-fn-execution/pom.xml
index 3ebcfd0c8d9..f275d69207e 100644
--- a/runners/java-fn-execution/pom.xml
+++ b/runners/java-fn-execution/pom.xml
@@ -79,6 +79,11 @@
       <scope>provided</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
     <!-- Test dependencies -->
     <dependency>
       <groupId>junit</groupId>
@@ -98,9 +103,11 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
-      <dependency>
-          <groupId>org.mockito</groupId>
-          <artifactId>mockito-all</artifactId>
-      </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/FnService.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/FnService.java
index 9ea0fce4636..547475c4be6 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/FnService.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/FnService.java
@@ -21,4 +21,15 @@
 import io.grpc.BindableService;
 
 /** An interface sharing common behavior with services used during execution of user Fns.
*/
-public interface FnService extends AutoCloseable, BindableService {}
+public interface FnService extends AutoCloseable, BindableService {
+  /**
+   * {@inheritDoc}.
+   *
+   * <p>There should be no more calls to any service method by the time a call to {@link
#close()}
+   * begins. Specifically, this means that a {@link io.grpc.Server} that this service is
bound to
+   * should have completed a call to the {@link io.grpc.Server#shutdown()} method, and all
future
+   * incoming calls will be rejected.
+   */
+  @Override
+  void close() throws Exception;
+}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
new file mode 100644
index 00000000000..9fe4a5fa07c
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution;
+
+import io.grpc.BindableService;
+import io.grpc.Server;
+import io.grpc.inprocess.InProcessServerBuilder;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+
+/**
+ * A {@link ServerFactory} which creates {@link Server servers} with the {@link
+ * InProcessServerBuilder}.
+ */
+public class InProcessServerFactory extends ServerFactory {
+  private static final AtomicInteger serviceNameUniqifier = new AtomicInteger();
+
+  public static InProcessServerFactory create() {
+    return new InProcessServerFactory();
+  }
+
+  private InProcessServerFactory() {}
+
+  @Override
+  public Server allocatePortAndCreate(BindableService service, ApiServiceDescriptor.Builder
builder)
+      throws IOException {
+    String name = String.format("InProcessServer_%s", serviceNameUniqifier.getAndIncrement());
+    builder.setUrl(name);
+    return InProcessServerBuilder.forName(name).addService(service).build();
+  }
+
+  @Override
+  public Server create(
+      BindableService service, ApiServiceDescriptor serviceDescriptor) throws IOException
{
+    return InProcessServerBuilder.forName(serviceDescriptor.getUrl()).addService(service).build();
+  }
+}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/GrpcLoggingService.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/GrpcLoggingService.java
new file mode 100644
index 00000000000..37c1f56b59b
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/GrpcLoggingService.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.logging;
+
+import com.google.common.collect.ImmutableSet;
+import io.grpc.stub.StreamObserver;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.LogControl;
+import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** An implementation of the Beam Fn Logging Service over gRPC. */
+public class GrpcLoggingService extends BeamFnLoggingGrpc.BeamFnLoggingImplBase
+    implements FnService {
+  private static final Logger LOGGER = LoggerFactory.getLogger(GrpcLoggingService.class);
+
+  public static GrpcLoggingService forWriter(LogWriter writer) {
+    return new GrpcLoggingService(writer);
+  }
+
+  private final LogWriter logWriter;
+  private final ConcurrentMap<InboundObserver, StreamObserver<LogControl>> connectedClients;
+
+  private GrpcLoggingService(LogWriter logWriter) {
+    this.logWriter = logWriter;
+    connectedClients = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void close() throws Exception {
+    Set<InboundObserver> remainingClients = ImmutableSet.copyOf(connectedClients.keySet());
+    if (!remainingClients.isEmpty()) {
+      LOGGER.info(
+          "{} Beam Fn Logging clients still connected during shutdown.", remainingClients.size());
+
+      // Signal server shutting down to all remaining connected clients.
+      for (InboundObserver client : remainingClients) {
+        // We remove these from the connected clients map to prevent a race between
+        // this close method and the InboundObserver calling a terminal method on the
+        // StreamObserver. If we removed it, then we are responsible for the terminal call.
+        completeIfNotNull(connectedClients.remove(client));
+      }
+    }
+  }
+
+  @Override
+  public StreamObserver<BeamFnApi.LogEntry.List> logging(
+      StreamObserver<BeamFnApi.LogControl> outboundObserver) {
+    LOGGER.info("Beam Fn Logging client connected.");
+    InboundObserver inboundObserver = new InboundObserver();
+    connectedClients.put(inboundObserver, outboundObserver);
+    return inboundObserver;
+  }
+
+  private void completeIfNotNull(StreamObserver<BeamFnApi.LogControl> outboundObserver)
{
+    if (outboundObserver != null) {
+      try {
+        outboundObserver.onCompleted();
+      } catch (RuntimeException ignored) {
+        // Completing outbound observer failed, ignoring failure and continuing
+        LOGGER.warn("Beam Fn Logging client failed to be complete.", ignored);
+      }
+    }
+  }
+
+  /**
+   * An inbound {@link StreamObserver} that forwards incoming messages to the client logger.
+   *
+   * <p>Mutually hangs up on clients that have errored or completed.
+   */
+  private class InboundObserver implements StreamObserver<BeamFnApi.LogEntry.List>
{
+    @Override
+    public void onNext(BeamFnApi.LogEntry.List value) {
+      for (BeamFnApi.LogEntry logEntry : value.getLogEntriesList()) {
+        logWriter.log(logEntry);
+      }
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      LOGGER.warn("Logging client failed unexpectedly.", t);
+      // We remove these from the connected clients map to prevent a race between
+      // the close method and this InboundObserver calling a terminal method on the
+      // StreamObserver. If we removed it, then we are responsible for the terminal call.
+      completeIfNotNull(connectedClients.remove(this));
+    }
+
+    @Override
+    public void onCompleted() {
+      LOGGER.info("Logging client hanged up.");
+      // We remove these from the connected clients map to prevent a race between
+      // the close method and this InboundObserver calling a terminal method on the
+      // StreamObserver. If we removed it, then we are responsible for the terminal call.
+      completeIfNotNull(connectedClients.remove(this));
+    }
+  }
+}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/LogWriter.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/LogWriter.java
new file mode 100644
index 00000000000..7ec49d51d3a
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/LogWriter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.logging;
+
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+
+/**
+ * A consumer of {@link BeamFnApi.LogEntry Beam Log Entries}.
+ */
+public interface LogWriter {
+  /**
+   * Write the contents of the Log Entry to some logging backend.
+   */
+  void log(BeamFnApi.LogEntry entry);
+}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/Slf4jLogWriter.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/Slf4jLogWriter.java
new file mode 100644
index 00000000000..c76e16bcba9
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/Slf4jLogWriter.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.logging;
+
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.LogEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link LogWriter} which uses an {@link Logger SLF4J Logger} as the underlying log backend.
+ *
+ * <p>Ignores the {@code timestamp}, {@code instruction reference}, {@code primitive
transform
+ * reference}, and {@code thread} fields.
+ */
+public class Slf4jLogWriter implements LogWriter {
+  public static LogWriter getDefault() {
+    return new Slf4jLogWriter();
+  }
+
+  private Slf4jLogWriter() {}
+
+  @Override
+  public void log(LogEntry entry) {
+    Logger log;
+    String location = entry.getLogLocation();
+    if (location != null) {
+      log = LoggerFactory.getLogger(location);
+    } else {
+      // TODO: Provide a useful default
+      log = LoggerFactory.getLogger("RemoteLog");
+    }
+    String message = entry.getMessage();
+    String trace = entry.getTrace();
+    switch (entry.getSeverity()) {
+      case ERROR:
+      case CRITICAL:
+        if (trace == null) {
+          log.error(message);
+        } else {
+          log.error("{} {}", message, trace);
+        }
+        break;
+      case WARN:
+        if (trace == null) {
+          log.warn(message);
+        } else {
+          log.warn("{} {}", message, trace);
+        }
+        break;
+      case INFO:
+      case NOTICE:
+        log.info(message);
+        break;
+      case DEBUG:
+        log.debug(message);
+        break;
+      case UNSPECIFIED:
+      case TRACE:
+        log.trace(message);
+        break;
+      default:
+        log.warn("Unknown message severity {}", entry.getSeverity());
+        log.info(message);
+        break;
+    }
+  }
+}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/package-info.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/package-info.java
new file mode 100644
index 00000000000..494f276f43d
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/logging/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Classes used to log informational messages over the {@link
+ * org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc Beam Fn Logging Service}.
+ */
+package org.apache.beam.runners.fnexecution.logging;
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/logging/GrpcLoggingServiceTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/logging/GrpcLoggingServiceTest.java
new file mode 100644
index 00000000000..c267a2a08ae
--- /dev/null
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/logging/GrpcLoggingServiceTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.logging;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+import io.grpc.ManagedChannel;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.LogControl;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.LogEntry;
+import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.InProcessServerFactory;
+import org.apache.beam.sdk.fn.test.Consumer;
+import org.apache.beam.sdk.fn.test.TestStreams;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GrpcLoggingService}. */
+@RunWith(JUnit4.class)
+public class GrpcLoggingServiceTest {
+  private Consumer<LogControl> messageDiscarder = new Consumer<LogControl>()
{
+    @Override
+    public void accept(LogControl item) {
+      // Ignore
+    }
+  };
+
+  @Test
+  public void testMultipleClientsSuccessfullyProcessed() throws Exception {
+    ConcurrentLinkedQueue<BeamFnApi.LogEntry> logs = new ConcurrentLinkedQueue<>();
+    GrpcLoggingService service =
+        GrpcLoggingService.forWriter(new CollectionAppendingLogWriter(logs));
+    try (GrpcFnServer<GrpcLoggingService> server =
+        GrpcFnServer.allocatePortAndCreateFor(service, InProcessServerFactory.create()))
{
+
+      Collection<Callable<Void>> tasks = new ArrayList<>();
+      for (int i = 1; i <= 3; ++i) {
+        final int instructionReference = i;
+        tasks.add(
+            new Callable<Void>() {
+              @Override
+              public Void call() throws Exception {
+                CountDownLatch waitForServerHangup = new CountDownLatch(1);
+                String url = server.getApiServiceDescriptor().getUrl();
+                ManagedChannel channel =
+                    InProcessChannelBuilder.forName(url)
+                        .build();
+                StreamObserver<BeamFnApi.LogEntry.List> outboundObserver =
+                    BeamFnLoggingGrpc.newStub(channel)
+                        .logging(
+                            TestStreams.withOnNext(messageDiscarder)
+                                .withOnCompleted(new CountDown(waitForServerHangup))
+                                .build());
+                outboundObserver.onNext(
+                    createLogsWithIds(instructionReference, -instructionReference));
+                outboundObserver.onCompleted();
+                waitForServerHangup.await();
+                return null;
+              }
+            });
+      }
+      ExecutorService executorService = Executors.newCachedThreadPool();
+      executorService.invokeAll(tasks);
+      assertThat(logs,
+          containsInAnyOrder(createLogWithId(1L), createLogWithId(2L), createLogWithId(3L),
+              createLogWithId(-1L), createLogWithId(-2L), createLogWithId(-3L)));
+    }
+  }
+
+  @Test
+  public void testMultipleClientsFailingIsHandledGracefullyByServer() throws Exception {
+    ConcurrentLinkedQueue<BeamFnApi.LogEntry> logs = new ConcurrentLinkedQueue<>();
+    GrpcLoggingService service =
+        GrpcLoggingService.forWriter(new CollectionAppendingLogWriter(logs));
+    try (GrpcFnServer<GrpcLoggingService> server =
+        GrpcFnServer.allocatePortAndCreateFor(service, InProcessServerFactory.create()))
{
+
+      Collection<Callable<Void>> tasks = new ArrayList<>();
+      for (int i = 1; i <= 3; ++i) {
+        final int instructionReference = i;
+        tasks.add(
+            new Callable<Void>() {
+              public Void call() throws Exception {
+                CountDownLatch waitForTermination = new CountDownLatch(1);
+                ManagedChannel channel =
+                    InProcessChannelBuilder.forName(server.getApiServiceDescriptor().getUrl())
+                        .build();
+                StreamObserver<BeamFnApi.LogEntry.List> outboundObserver =
+                    BeamFnLoggingGrpc.newStub(channel)
+                        .logging(
+                            TestStreams.withOnNext(messageDiscarder)
+                                .withOnError(new CountDown(waitForTermination))
+                                .build());
+                outboundObserver.onNext(
+                    createLogsWithIds(instructionReference, -instructionReference));
+                outboundObserver.onError(new RuntimeException("Client " + instructionReference));
+                waitForTermination.await();
+                return null;
+              }
+            });
+      }
+      ExecutorService executorService = Executors.newCachedThreadPool();
+      executorService.invokeAll(tasks);
+    }
+  }
+
+  @Test
+  public void testServerCloseHangsUpClients() throws Exception {
+    LinkedBlockingQueue<LogEntry> logs = new LinkedBlockingQueue<>();
+    ExecutorService executorService = Executors.newCachedThreadPool();
+    Collection<Future<Void>> futures = new ArrayList<>();
+    final GrpcLoggingService service =
+        GrpcLoggingService.forWriter(new CollectionAppendingLogWriter(logs));
+    try (GrpcFnServer<GrpcLoggingService> server =
+        GrpcFnServer.allocatePortAndCreateFor(service, InProcessServerFactory.create()))
{
+
+      for (int i = 1; i <= 3; ++i) {
+        final long instructionReference = i;
+        futures.add(
+            executorService.submit(
+                new Callable<Void>() {
+                  public Void call() throws Exception {
+                    {
+                      CountDownLatch waitForServerHangup = new CountDownLatch(1);
+                      ManagedChannel channel =
+                          InProcessChannelBuilder.forName(
+                                  server.getApiServiceDescriptor().getUrl())
+                              .build();
+                      StreamObserver<BeamFnApi.LogEntry.List> outboundObserver =
+                          BeamFnLoggingGrpc.newStub(channel)
+                              .logging(
+                                  TestStreams.withOnNext(messageDiscarder)
+                                      .withOnCompleted(new CountDown(waitForServerHangup))
+                                      .build());
+                      outboundObserver.onNext(createLogsWithIds(instructionReference));
+                      waitForServerHangup.await();
+                      return null;
+                    }
+                  }
+                }));
+      }
+      // Wait till each client has sent their message showing that they have connected.
+      for (int i = 1; i <= 3; ++i) {
+        logs.take();
+      }
+    }
+    for (Future<Void> future : futures) {
+      future.get();
+    }
+  }
+
+  private BeamFnApi.LogEntry.List createLogsWithIds(long... ids) {
+    BeamFnApi.LogEntry.List.Builder builder = BeamFnApi.LogEntry.List.newBuilder();
+    for (long id : ids) {
+      builder.addLogEntries(createLogWithId(id));
+    }
+    return builder.build();
+  }
+  private BeamFnApi.LogEntry createLogWithId(long id) {
+    return BeamFnApi.LogEntry.newBuilder().setInstructionReference(Long.toString(id)).build();
+  }
+
+  private static class CollectionAppendingLogWriter implements LogWriter {
+    private final Collection<BeamFnApi.LogEntry> entries;
+
+    private CollectionAppendingLogWriter(Collection<LogEntry> entries) {
+      this.entries = entries;
+    }
+
+    @Override
+    public void log(LogEntry entry) {
+      entries.add(entry);
+    }
+  }
+
+  /**
+   * A {@link Runnable} that calls {@link CountDownLatch#countDown()} on a {@link CountDownLatch}.
+   */
+  private static class CountDown implements Runnable {
+    private final CountDownLatch latch;
+
+    CountDown(CountDownLatch latch) {
+      this.latch = latch;
+    }
+
+    @Override
+    public void run() {
+      latch.countDown();
+    }
+  }
+}
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestStreams.java
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestStreams.java
index 5df505b1f24..b07ea54e451 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestStreams.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/test/TestStreams.java
@@ -30,7 +30,7 @@
   public static <T> Builder<T> withOnNext(Consumer<T> onNext) {
     return new Builder<>(new ForwardingCallStreamObserver<>(
         onNext,
-        TestStreams.<Throwable>noopConsumer(),
+        TestStreams.throwingErrorHandler(),
         TestStreams.noopRunnable(),
         TestStreams.alwaysTrueSupplier()));
   }
@@ -97,6 +97,15 @@ public void accept(Throwable t) {
     }
   }
 
+  private static Consumer<Throwable> throwingErrorHandler() {
+    return new Consumer<Throwable>() {
+      @Override
+      public void accept(Throwable item) {
+        throw new RuntimeException(item);
+      }
+    };
+  }
+
   private static void noop() {
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Universal Local Runner
> ----------------------
>
>                 Key: BEAM-2899
>                 URL: https://issues.apache.org/jira/browse/BEAM-2899
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-core
>            Reporter: Henning Rohde
>            Assignee: Thomas Groh
>              Labels: portability
>
> To make the portability effort tractable, we should implement a Universal Local Runner
(ULR) in Java that runs in a single server process plus docker containers for the SDK harness
containers. It would serve multiple purposes:
>   (1) A reference implementation for other runners. Ideally, any new feature should be
implemented in the ULR first.
>   (2) A fully-featured test runner for SDKs who participate in the portability framework.
It thus complements the direct runners.
>   (3) A test runner for user code that depends on or customizes the runtime environment.
For example, a DoFn that shells out has a dependency that may be satisfied on the user's desktop
(and thus works fine on the direct runner), but perhaps not by the container harness image.
The ULR allows for an easy way to find out.
> The Java direct runner presumably has lots of pieces that can be reused.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message