apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vro...@apache.org
Subject apex-core git commit: APEXCORE-732 Handling serialization and other exceptions while recording tuples, preventing the container from failing
Date Thu, 08 Jun 2017 14:03:22 GMT
Repository: apex-core
Updated Branches:
  refs/heads/APEXCORE-732 [created] a0dd30d8f


APEXCORE-732 Handling serialization and other exceptions while recording tuples, preventing
the container from failing


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/a0dd30d8
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/a0dd30d8
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/a0dd30d8

Branch: refs/heads/APEXCORE-732
Commit: a0dd30d8fdc27c0503f59c93ea84f7edacf53137
Parents: 22feeed
Author: Pramod Immaneni <pramod@datatorrent.com>
Authored: Mon May 29 17:26:01 2017 -0700
Committer: Pramod Immaneni <pramod@datatorrent.com>
Committed: Wed Jun 7 22:19:56 2017 -0700

----------------------------------------------------------------------
 .../datatorrent/stram/debug/TupleRecorder.java  | 63 +++++++++++++++++---
 .../stram/debug/TupleRecorderTest.java          |  2 +-
 2 files changed, 55 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/a0dd30d8/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorder.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorder.java b/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorder.java
index fc6c2e6..4f6bbfa 100644
--- a/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorder.java
+++ b/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorder.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
+import org.codehaus.jackson.JsonProcessingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,7 +60,7 @@ import com.datatorrent.stram.util.SharedPubSubWebSocketClient.Handler;
 public class TupleRecorder
 {
   public static final String VERSION = "1.2";
-  private int totalTupleCount = 0;
+  private long totalTupleCount = 0;
   private final HashMap<String, PortInfo> portMap = new HashMap<>(); // used
for output portInfo <name, id> map
   private final HashMap<String, PortCount> portCountMap = new HashMap<>(); //
used for tupleCount of each port <name, count> map
   private transient long currentWindowId = WindowGenerator.MIN_WINDOW_ID - 1;
@@ -76,6 +77,33 @@ public class TupleRecorder
   private String recordingNameTopic;
   private long numWindows = Long.MAX_VALUE; // number of windows to record
   private Runnable stopProcedure; // stop procedure to execute
+
+  private static final Logger logger = LoggerFactory.getLogger(TupleRecorder.class);
+
+  // If there are errors processing tuples, don't log an error for every tuple as it could
overwhelm the logs.
+  // The property specifies the minumum number of tuples between two consecutive error log
statements. Set it to zero to
+  // log every tuple error
+  private static long ERROR_LOG_GAP;
+  long lastLog = -1;
+
+  static {
+    ERROR_LOG_GAP = 10000L;
+    String property = System.getProperty("org.apache.apex.stram.tupleRecorder.errorLogGap");
+    if (property != null) {
+      try {
+        long value = Long.decode(property);
+        if (value < 0 ) {
+          logger.warn("Log gap should be greater than or equal to 0, setting to default");
+        } else {
+          ERROR_LOG_GAP = value;
+        }
+      } catch (Exception ex) {
+        logger.warn("Unable to parse the log gap property, setting to default", ex);
+      }
+    }
+    logger.debug("Log gap is {}", ERROR_LOG_GAP);
+  }
+
   private final FSPartFileCollection storage = new FSPartFileCollection()
   {
     @Override
@@ -154,7 +182,7 @@ public class TupleRecorder
     return Collections.unmodifiableMap(portMap);
   }
 
-  public int getTotalTupleCount()
+  public long getTotalTupleCount()
   {
     return totalTupleCount;
   }
@@ -392,12 +420,19 @@ public class TupleRecorder
 
   public void writeTuple(Object obj, String port)
   {
+    ++totalTupleCount;
     if (windowIdRanges.isEmpty()) {
       throw new RuntimeException("Data tuples received from tuple recorder before any BEGIN_WINDOW");
     }
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    Slice f = null;
+    try {
+      f = streamCodec.toByteArray(obj);
+    } catch (RuntimeException ex) {
+      checkLogTuple(ex, "save", obj);
+      return;
+    }
     try {
-      ByteArrayOutputStream bos = new ByteArrayOutputStream();
-      Slice f = streamCodec.toByteArray(obj);
       PortInfo pi = portMap.get(port);
       String str = "T:" + System.currentTimeMillis() + ":" + pi.id + ":" + f.length + ":";
       bos.write(str.getBytes());
@@ -410,13 +445,12 @@ public class TupleRecorder
       storage.writeDataItem(bos.toByteArray(), true);
       //logger.debug("Writing tuple for port id {}", pi.id);
       //fsOutput.hflush();
-      ++totalTupleCount;
       if (numSubscribers > 0) {
         // this is not asynchronous.  we need to fix this
         publishTupleData(pi.id, obj);
       }
-    } catch (IOException ex) {
-      logger.error(ex.toString());
+    } catch (Exception ex) {
+      logger.warn("Error saving tuple", ex);
     }
   }
 
@@ -463,7 +497,19 @@ public class TupleRecorder
         wsClient.publish(recordingNameTopic, map);
       }
     } catch (Exception ex) {
-      logger.warn("Error publishing tuple data", ex);
+      if (ex instanceof JsonProcessingException) {
+        checkLogTuple(ex, "publish", obj);
+      } else {
+        logger.warn("Error publishing tuple", ex);
+      }
+    }
+  }
+
+  private void checkLogTuple(Exception ex, String context, Object tuple)
+  {
+    if ((lastLog == -1) || (totalTupleCount - lastLog) >= ERROR_LOG_GAP) {
+      lastLog = totalTupleCount;
+      logger.warn("Error serializing during {} for tuple {} ", context, tuple, ex);
     }
   }
 
@@ -519,5 +565,4 @@ public class TupleRecorder
 
   }
 
-  private static final Logger logger = LoggerFactory.getLogger(TupleRecorder.class);
 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/a0dd30d8/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
index 9d88bdf..447326f 100644
--- a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
@@ -207,7 +207,7 @@ public class TupleRecorderTest
   }
 
   private static final File testWorkDir = new File("target", TupleRecorderTest.class.getName());
-  private static final int testTupleCount = 10;
+  private static final long testTupleCount = 10;
 
   @Test
   public void testRecordingFlow() throws Exception


Mime
View raw message