beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/2] incubator-beam git commit: [BEAM-462] Replace MonitoringUtil.PrintHandler with a handler that utilizes a Java logger
Date Mon, 18 Jul 2016 16:29:01 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master a0afd6665 -> bfbfabf6d


[BEAM-462] Replace MonitoringUtil.PrintHandler with a handler that utilizes a Java logger


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7e4d9776
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7e4d9776
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7e4d9776

Branch: refs/heads/master
Commit: 7e4d977660906363b9c5ab55eababdd7144d6b91
Parents: a0afd66
Author: Luke Cwik <lcwik@google.com>
Authored: Mon Jul 18 08:51:32 2016 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Mon Jul 18 09:28:20 2016 -0700

----------------------------------------------------------------------
 .../beam/examples/common/ExampleUtils.java      |  2 +-
 .../dataflow/BlockingDataflowRunner.java        |  5 +-
 .../BlockingDataflowPipelineOptions.java        | 27 --------
 .../dataflow/testing/TestDataflowRunner.java    |  2 +-
 .../runners/dataflow/util/MonitoringUtil.java   | 70 +++++++++-----------
 .../dataflow/util/MonitoringUtilTest.java       | 60 +++++++++++++++++
 6 files changed, 95 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e4d9776/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
index ad00a14..8f9be31 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -306,7 +306,7 @@ public class ExampleUtils {
         addShutdownHook(jobsToCancel);
       }
       try {
-        job.waitToFinish(-1, TimeUnit.SECONDS, new MonitoringUtil.PrintHandler(System.out));
+        job.waitToFinish(-1, TimeUnit.SECONDS, new MonitoringUtil.LoggingHandler());
       } catch (Exception e) {
         throw new RuntimeException("Failed to wait for job to finish: " + job.getJobId());
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e4d9776/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
index 5c59bc2..f7f7dc8 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java
@@ -41,7 +41,7 @@ import javax.annotation.Nullable;
  * A {@link PipelineRunner} that's like {@link DataflowRunner}
  * but that waits for the launched job to finish.
  *
- * <p>Prints out job status updates and console messages while it waits.
+ * <p>Logs job status updates and console messages while it waits.
  *
  * <p>Returns the final job state, or throws an exception if the job
  * fails or cannot be monitored.
@@ -117,8 +117,7 @@ public class BlockingDataflowRunner extends
       State result;
       try {
         result = job.waitToFinish(
-            BUILTIN_JOB_TIMEOUT_SEC, TimeUnit.SECONDS,
-            new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
+            BUILTIN_JOB_TIMEOUT_SEC, TimeUnit.SECONDS, new MonitoringUtil.LoggingHandler());
       } catch (IOException | InterruptedException ex) {
         if (ex instanceof InterruptedException) {
           Thread.currentThread().interrupt();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e4d9776/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java
index 809df35..5d8d1a1 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/BlockingDataflowPipelineOptions.java
@@ -18,38 +18,11 @@
 package org.apache.beam.runners.dataflow.options;
 
 import org.apache.beam.runners.dataflow.BlockingDataflowRunner;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.Hidden;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.io.PrintStream;
 
 /**
  * Options that are used to configure the {@link BlockingDataflowRunner}.
  */
 @Description("Configure options on the BlockingDataflowRunner.")
 public interface BlockingDataflowPipelineOptions extends DataflowPipelineOptions {
-  /**
-   * Output stream for job status messages.
-   */
-  @Description("Where messages generated during execution of the Dataflow job will be output.")
-  @JsonIgnore
-  @Hidden
-  @Default.InstanceFactory(StandardOutputFactory.class)
-  PrintStream getJobMessageOutput();
-  void setJobMessageOutput(PrintStream value);
-
-  /**
-   * Returns a default of {@link System#out}.
-   */
-  public static class StandardOutputFactory implements DefaultValueFactory<PrintStream>
{
-    @Override
-    public PrintStream create(PipelineOptions options) {
-      return System.out;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e4d9776/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
index 6894a10..b60e1be 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
@@ -107,7 +107,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
     assertThat(job, testPipelineOptions.getOnCreateMatcher());
 
     CancelWorkflowOnError messageHandler = new CancelWorkflowOnError(
-        job, new MonitoringUtil.PrintHandler(options.getJobMessageOutput()));
+        job, new MonitoringUtil.LoggingHandler());
 
     try {
       final Optional<Boolean> result;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e4d9776/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
index 67cdfa6..4d12e66 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
@@ -27,12 +27,14 @@ import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Messages;
 import com.google.api.services.dataflow.model.JobMessage;
 import com.google.api.services.dataflow.model.ListJobMessagesResponse;
 import com.google.common.base.MoreObjects;
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 
 import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.PrintStream;
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
 import java.util.ArrayList;
@@ -63,6 +65,11 @@ public final class MonitoringUtil {
           .put("JOB_STATE_CANCELLED", State.CANCELLED)
           .put("JOB_STATE_UPDATED", State.UPDATED)
           .build();
+  private static final String JOB_MESSAGE_ERROR = "JOB_MESSAGE_ERROR";
+  private static final String JOB_MESSAGE_WARNING = "JOB_MESSAGE_WARNING";
+  private static final String JOB_MESSAGE_BASIC = "JOB_MESSAGE_BASIC";
+  private static final String JOB_MESSAGE_DETAILED = "JOB_MESSAGE_DETAILED";
+  private static final String JOB_MESSAGE_DEBUG = "JOB_MESSAGE_DEBUG";
 
   private String projectId;
   private Messages messagesClient;
@@ -76,53 +83,38 @@ public final class MonitoringUtil {
     void process(List<JobMessage> messages);
   }
 
-  /** A handler that prints monitoring messages to a stream. */
-  public static class PrintHandler implements JobMessagesHandler {
-    private PrintStream out;
-
-    /**
-     * Construct the handler.
-     *
-     * @param stream The stream to write the messages to.
-     */
-    public PrintHandler(PrintStream stream) {
-      out = stream;
-    }
+  /** A handler that logs monitoring messages. */
+  public static class LoggingHandler implements JobMessagesHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(LoggingHandler.class);
 
     @Override
     public void process(List<JobMessage> messages) {
       for (JobMessage message : messages) {
-        if (message.getMessageText() == null || message.getMessageText().isEmpty()) {
-          continue;
-        }
-        String importanceString = null;
-        if (message.getMessageImportance() == null) {
-          continue;
-        } else if (message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
-          importanceString = "Error:   ";
-        } else if (message.getMessageImportance().equals("JOB_MESSAGE_WARNING")) {
-          importanceString = "Warning: ";
-        } else if (message.getMessageImportance().equals("JOB_MESSAGE_BASIC")) {
-          importanceString = "Basic:  ";
-        } else if (message.getMessageImportance().equals("JOB_MESSAGE_DETAILED")) {
-          importanceString = "Detail:  ";
-        } else {
-          // TODO: Remove filtering here once getJobMessages supports minimum
-          // importance.
+        if (Strings.isNullOrEmpty(message.getMessageText())) {
           continue;
         }
+
         @Nullable Instant time = TimeUtil.fromCloudTime(message.getTime());
-        if (time == null) {
-          out.print("UNKNOWN TIMESTAMP: ");
-        } else {
-          out.print(time + ": ");
-        }
-        if (importanceString != null) {
-          out.print(importanceString);
+        String logMessage = (time == null ? "UNKNOWN TIMESTAMP: " : time + ": ")
+            + message.getMessageText();
+        switch (message.getMessageImportance()) {
+          case JOB_MESSAGE_ERROR:
+            LOG.error(logMessage);
+            break;
+          case JOB_MESSAGE_WARNING:
+            LOG.warn(logMessage);
+            break;
+          case JOB_MESSAGE_BASIC:
+          case JOB_MESSAGE_DETAILED:
+            LOG.info(logMessage);
+            break;
+          case JOB_MESSAGE_DEBUG:
+            LOG.debug(logMessage);
+            break;
+          default:
+            LOG.trace(logMessage);
         }
-        out.println(message.getMessageText());
       }
-      out.flush();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e4d9776/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
index 4b0ab2f..da07515 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java
@@ -23,15 +23,19 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil.LoggingHandler;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.util.TestCredential;
 
 import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.model.JobMessage;
 import com.google.api.services.dataflow.model.ListJobMessagesResponse;
 
+import org.joda.time.DateTime;
 import org.joda.time.Instant;
+import org.joda.time.chrono.ISOChronology;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -40,6 +44,7 @@ import org.junit.runners.JUnit4;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 /**
@@ -50,6 +55,7 @@ public class MonitoringUtilTest {
   private static final String PROJECT_ID = "someProject";
   private static final String JOB_ID = "1234";
 
+  @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(LoggingHandler.class);
   @Rule public ExpectedException thrown = ExpectedException.none();
 
   @Test
@@ -147,4 +153,58 @@ public class MonitoringUtilTest {
         + "gcloud alpha dataflow jobs --project=someProject cancel 1234",
         cancelCommand);
   }
+
+  @Test
+  public void testLoggingHandler() {
+    DateTime errorTime = new DateTime(1000L, ISOChronology.getInstanceUTC());
+    DateTime warningTime = new DateTime(2000L, ISOChronology.getInstanceUTC());
+    DateTime basicTime = new DateTime(3000L, ISOChronology.getInstanceUTC());
+    DateTime detailedTime = new DateTime(4000L, ISOChronology.getInstanceUTC());
+    DateTime debugTime = new DateTime(5000L, ISOChronology.getInstanceUTC());
+    DateTime unknownTime = new DateTime(6000L, ISOChronology.getInstanceUTC());
+    JobMessage errorJobMessage = new JobMessage();
+    errorJobMessage.setMessageImportance("JOB_MESSAGE_ERROR");
+    errorJobMessage.setMessageText("ERRORERROR");
+    errorJobMessage.setTime(TimeUtil.toCloudTime(errorTime));
+    JobMessage warningJobMessage = new JobMessage();
+    warningJobMessage.setMessageImportance("JOB_MESSAGE_WARNING");
+    warningJobMessage.setMessageText("WARNINGWARNING");
+    warningJobMessage.setTime(TimeUtil.toCloudTime(warningTime));
+    JobMessage basicJobMessage = new JobMessage();
+    basicJobMessage.setMessageImportance("JOB_MESSAGE_BASIC");
+    basicJobMessage.setMessageText("BASICBASIC");
+    basicJobMessage.setTime(TimeUtil.toCloudTime(basicTime));
+    JobMessage detailedJobMessage = new JobMessage();
+    detailedJobMessage.setMessageImportance("JOB_MESSAGE_DETAILED");
+    detailedJobMessage.setMessageText("DETAILEDDETAILED");
+    detailedJobMessage.setTime(TimeUtil.toCloudTime(detailedTime));
+    JobMessage debugJobMessage = new JobMessage();
+    debugJobMessage.setMessageImportance("JOB_MESSAGE_DEBUG");
+    debugJobMessage.setMessageText("DEBUGDEBUG");
+    debugJobMessage.setTime(TimeUtil.toCloudTime(debugTime));
+    JobMessage unknownJobMessage = new JobMessage();
+    unknownJobMessage.setMessageImportance("JOB_MESSAGE_UNKNOWN");
+    unknownJobMessage.setMessageText("UNKNOWNUNKNOWN");
+    unknownJobMessage.setTime("");
+    JobMessage emptyJobMessage = new JobMessage();
+    emptyJobMessage.setMessageImportance("JOB_MESSAGE_EMPTY");
+    emptyJobMessage.setTime(TimeUtil.toCloudTime(unknownTime));
+
+    new LoggingHandler().process(Arrays.asList(errorJobMessage, warningJobMessage, basicJobMessage,
+        detailedJobMessage, debugJobMessage, unknownJobMessage));
+
+    expectedLogs.verifyError("ERRORERROR");
+    expectedLogs.verifyError(errorTime.toString());
+    expectedLogs.verifyWarn("WARNINGWARNING");
+    expectedLogs.verifyWarn(warningTime.toString());
+    expectedLogs.verifyInfo("BASICBASIC");
+    expectedLogs.verifyInfo(basicTime.toString());
+    expectedLogs.verifyInfo("DETAILEDDETAILED");
+    expectedLogs.verifyInfo(detailedTime.toString());
+    expectedLogs.verifyDebug("DEBUGDEBUG");
+    expectedLogs.verifyDebug(debugTime.toString());
+    expectedLogs.verifyTrace("UNKNOWN TIMESTAMP");
+    expectedLogs.verifyTrace("UNKNOWNUNKNOWN");
+    expectedLogs.verifyNotLogged(unknownTime.toString());
+  }
 }


Mime
View raw message