pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [pulsar] branch master updated: Provide a flag to ignore Json format error in pulsar flink connector (#3210)
Date Wed, 19 Dec 2018 11:20:40 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 00ae3cf  Provide a flag to ignore Json format error in pulsar flink connector (#3210)
00ae3cf is described below

commit 00ae3cf545d25e3ad9d84b31c874f4a1b3beaf81
Author: 浪尖 <158570986@qq.com>
AuthorDate: Wed Dec 19 19:20:36 2018 +0800

    Provide a flag to ignore Json format error in pulsar flink connector (#3210)
    
    ### Motivation
    
    when using flink and pulsar, a Json format error will cause the flink program to fail
    
    ### Modifications
    
    provide a flag in pulsar flink connector to ignore json format errors
---
 .../pulsar/serde/JsonRowDeserializationSchema.java | 32 +++++++++++++++++++++-
 1 file changed, 31 insertions(+), 1 deletion(-)

diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java
index dfc89b9..0235e61 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java
@@ -38,6 +38,30 @@ import java.io.IOException;
  */
 public class JsonRowDeserializationSchema implements DeserializationSchema<Row> {
 
+    /*
+        What to do when detecting that a json line cannot be deserialized :
+        (1).false : Throw A IOException and Terminate application。
+        (2).true  : Ignore the error line and add a null line。
+     */
+    private boolean ignoreJsonFormatError = false;
+
+
+    /**
+     *
+     * @return true or false
+     */
+    public boolean getIgnoreJsonFormatError() {
+        return ignoreJsonFormatError;
+    }
+
+    /**
+     * set ignoreJsonFormatError
+     * @param ignoreJsonFormatError
+     */
+    public void setIgnoreJsonFormatError(boolean ignoreJsonFormatError) {
+        this.ignoreJsonFormatError = ignoreJsonFormatError;
+    }
+
     /**
      * Type information describing the result type.
      */
@@ -102,7 +126,13 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row>
 
             return row;
         } catch (Throwable t) {
-            throw new IOException("Failed to deserialize JSON object.", t);
+            if (ignoreJsonFormatError) {
+                final int arity = typeInfo.getArity();
+                final Object[] nullsArray = new Object[arity];
+                return Row.of(nullsArray);
+            } else {
+                throw new IOException("Failed to deserialize JSON object.", t);
+            }
         }
     }
 


Mime
View raw message