From commits-return-53833-archive-asf-public=cust-asf.ponee.io@pulsar.apache.org Fri May 8 12:33:54 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id CE00E180674 for ; Fri, 8 May 2020 14:33:52 +0200 (CEST) Received: (qmail 50424 invoked by uid 500); 8 May 2020 12:33:51 -0000 Mailing-List: contact commits-help@pulsar.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.apache.org Delivered-To: mailing list commits@pulsar.apache.org Received: (qmail 50219 invoked by uid 99); 8 May 2020 12:33:51 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 May 2020 12:33:51 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 1F9128091B; Fri, 8 May 2020 12:33:51 +0000 (UTC) Date: Fri, 08 May 2020 12:34:15 +0000 To: "commits@pulsar.apache.org" Subject: [pulsar] 29/38: Fixing JDBC sink to handle null fields. Also added new unit tests (#6848) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: zhaijia@apache.org In-Reply-To: <158894122543.2100.17101230735263380317@gitbox.apache.org> References: <158894122543.2100.17101230735263380317@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: pulsar X-Git-Refname: refs/heads/branch-2.5 X-Git-Reftype: branch X-Git-Rev: 53f74c7fd9eb0bf4a81f09d2dc1ab0a7895e1b97 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20200508123351.1F9128091B@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. zhaijia pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 53f74c7fd9eb0bf4a81f09d2dc1ab0a7895e1b97 Author: Chris Bartholomew AuthorDate: Fri May 1 13:53:23 2020 -0400 Fixing JDBC sink to handle null fields. Also added new unit tests (#6848) ### Motivation JDBC sink does not handle `null` fields. For example, the field `example` can potentially be null. The schema registered in Pulsar allows for it, and the table schema in MySQL has a column of the same name, is configured as double and also allows nulls. When messages are sent to the JDBC sink without that field, an exception like this is seen: ``` 21:08:38.472 [pool-5-thread-1] ERROR org.apache.pulsar.io.jdbc.JdbcAbstractSink - Got exception java.sql.SQLException: Data truncated for column 'example' at row 1 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:127) ~[mysql-connector-java-8.0.11.jar:8.0.11] at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:95) ~[mysql-connector-java-8.0.11.jar:8.0.11] at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122) ~[mysql-connector-java-8.0.11.jar:8.0.11] at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:960) ~[mysql-connector-java-8.0.11.jar:8.0.11] at com.mysql.cj.jdbc.ClientPreparedStatement.execute(ClientPreparedStatement.java:388) ~[mysql-connector-java-8.0.11.jar:8.0.11] at org.apache.pulsar.io.jdbc.JdbcAbstractSink.flush(JdbcAbstractSink.java:202) ~[pulsar-io-jdbc-2.5.0.nar-unpacked/:?] at org.apache.pulsar.io.jdbc.JdbcAbstractSink.lambda$open$0(JdbcAbstractSink.java:108) ~[pulsar-io-jdbc-2.5.0.nar-unpacked/:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_232] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_232] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_232] ``` Looking at the code for the JDBC sink, there was no handling of the case where the field was `null`. The PR adds code to handle that case. It also adds unit tests to cover this for both binary and JSON encoding of the schema. ### Modifications When the sink encounters a `null` field value it uses the `setColumnNull` method to properly reflect this in the database row. (cherry picked from commit c622de5116f41cdb7174647b09ad70d9f2462bbc) --- .../apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java | 37 ++++- .../org/apache/pulsar/io/jdbc/JdbcSinkTest.java | 150 ++++++++++++++++++++- 2 files changed, 184 insertions(+), 3 deletions(-) diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java index de146c4..a916ca3 100644 --- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java +++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java @@ -20,6 +20,7 @@ package org.apache.pulsar.io.jdbc; import java.sql.PreparedStatement; +import java.sql.Types; import java.util.List; import com.google.common.collect.Lists; @@ -60,12 +61,44 @@ public class JdbcAutoSchemaSink extends JdbcAbstractSink { int index = 1; for (ColumnId columnId : columns) { String colName = columnId.getName(); - Object obj = record.getField(colName); - setColumnValue(statement, index++, obj); + int colType = columnId.getType(); + if (log.isDebugEnabled()) { + log.debug("colName: {} colType: {}", colName, colType); + } + try { + Object obj = record.getField(colName); + if (obj != null) { + setColumnValue(statement, index++, obj); + } else { + if (log.isDebugEnabled()) { + log.debug("Column {} is null", colName); + } + setColumnNull(statement, index++, colType); + } + } catch (NullPointerException e) { + // With JSON schema field is omitted, so get NPE + // In this case we want to set column to Null + if (log.isDebugEnabled()) { + log.debug("Column {} is null", colName); + } + setColumnNull(statement, index++, colType); + } + } } + private static void setColumnNull(PreparedStatement statement, int index, int type) throws Exception { + if (log.isDebugEnabled()) { + log.debug("Setting column value to null, statement: {}, index: {}, value: {}", statement.toString(), index); + } + statement.setNull(index, type); + + } + private static void setColumnValue(PreparedStatement statement, int index, Object value) throws Exception { + + log.debug("Setting column value, statement: {}, index: {}, value: {}", statement.toString(), index, value.toString()); + if (value instanceof Integer) { statement.setInt(index, (Integer) value); } else if (value instanceof Long) { diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java index aa2a76a..3c33a16 100644 --- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java +++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java @@ -34,7 +34,9 @@ import org.apache.pulsar.client.api.schema.GenericSchema; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.schema.AvroSchema; +import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; +import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.source.PulsarRecord; import org.testng.Assert; @@ -109,13 +111,144 @@ public class JdbcSinkTest { jdbcSink.close(); } + private void testOpenAndWriteSinkNullValue(Map actionProperties) throws Exception { + Message insertMessage = mock(MessageImpl.class); + GenericSchema genericAvroSchema; + // prepare a foo Record + Foo insertObj = new Foo(); + insertObj.setField1("ValueOfField1"); + // Not setting field2 + // Field1 is the key and field3 is used for selecting records + insertObj.setField3(3); + AvroSchema schema = AvroSchema.of(SchemaDefinition.builder().withPojo(Foo.class).withAlwaysAllowNull(true).build()); + + byte[] insertBytes = schema.encode(insertObj); + CompletableFuture future = new CompletableFuture<>(); + Record insertRecord = PulsarRecord.builder() + .message(insertMessage) + .topicName("fake_topic_name") + .ackFunction(() -> future.complete(null)) + .build(); + + genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo()); + when(insertMessage.getValue()).thenReturn(genericAvroSchema.decode(insertBytes)); + when(insertMessage.getProperties()).thenReturn(actionProperties); + log.info("foo:{}, Message.getValue: {}, record.getValue: {}", + insertObj.toString(), + insertMessage.getValue().toString(), + insertRecord.getValue().toString()); + + // write should success. + jdbcSink.write(insertRecord); + log.info("executed write"); + // sleep to wait backend flush complete + future.get(1, TimeUnit.SECONDS); + + // value has been written to db, read it out and verify. + String querySql = "SELECT * FROM " + tableName + " WHERE field3=3"; + int count = sqliteUtils.select(querySql, (resultSet) -> { + Assert.assertEquals(insertObj.getField1(), resultSet.getString(1)); + Assert.assertNull(insertObj.getField2()); + Assert.assertEquals(insertObj.getField3(), resultSet.getInt(3)); + }); + Assert.assertEquals(count, 1); + + } + + private void testOpenAndWriteSinkJson(Map actionProperties) throws Exception { + Message insertMessage = mock(MessageImpl.class); + GenericSchema genericAvroSchema; + // prepare a foo Record + Foo insertObj = new Foo(); + insertObj.setField1("ValueOfField1"); + insertObj.setField2("ValueOfField2"); + insertObj.setField3(3); + JSONSchema schema = JSONSchema.of(SchemaDefinition.builder().withPojo(Foo.class).withAlwaysAllowNull(true).build()); + + byte[] insertBytes = schema.encode(insertObj); + CompletableFuture future = new CompletableFuture<>(); + Record insertRecord = PulsarRecord.builder() + .message(insertMessage) + .topicName("fake_topic_name") + .ackFunction(() -> future.complete(null)) + .build(); + + GenericSchema decodeSchema = GenericSchemaImpl.of(schema.getSchemaInfo()); + when(insertMessage.getValue()).thenReturn(decodeSchema.decode(insertBytes)); + when(insertMessage.getProperties()).thenReturn(actionProperties); + log.info("foo:{}, Message.getValue: {}, record.getValue: {}", + insertObj.toString(), + insertMessage.getValue().toString(), + insertRecord.getValue().toString()); + + // write should success. + jdbcSink.write(insertRecord); + log.info("executed write"); + // sleep to wait backend flush complete + future.get(1, TimeUnit.SECONDS); + + // value has been written to db, read it out and verify. + String querySql = "SELECT * FROM " + tableName + " WHERE field3=3"; + int count = sqliteUtils.select(querySql, (resultSet) -> { + Assert.assertEquals(insertObj.getField1(), resultSet.getString(1)); + Assert.assertEquals(insertObj.getField2(), resultSet.getString(2)); + Assert.assertEquals(insertObj.getField3(), resultSet.getInt(3)); + }); + Assert.assertEquals(count, 1); + + } + + private void testOpenAndWriteSinkNullValueJson(Map actionProperties) throws Exception { + Message insertMessage = mock(MessageImpl.class); + GenericSchema genericAvroSchema; + // prepare a foo Record + Foo insertObj = new Foo(); + insertObj.setField1("ValueOfField1"); + // Not setting field2 + // Field1 is the key and field3 is used for selecting records + insertObj.setField3(3); + JSONSchema schema = JSONSchema.of(SchemaDefinition.builder().withPojo(Foo.class).withAlwaysAllowNull(true).build()); + + byte[] insertBytes = schema.encode(insertObj); + CompletableFuture future = new CompletableFuture<>(); + Record insertRecord = PulsarRecord.builder() + .message(insertMessage) + .topicName("fake_topic_name") + .ackFunction(() -> future.complete(null)) + .build(); + + GenericSchema decodeSchema = GenericSchemaImpl.of(schema.getSchemaInfo()); + when(insertMessage.getValue()).thenReturn(decodeSchema.decode(insertBytes)); + when(insertMessage.getProperties()).thenReturn(actionProperties); + log.info("foo:{}, Message.getValue: {}, record.getValue: {}", + insertObj.toString(), + insertMessage.getValue().toString(), + insertRecord.getValue().toString()); + + // write should success. + jdbcSink.write(insertRecord); + log.info("executed write"); + // sleep to wait backend flush complete + future.get(1, TimeUnit.SECONDS); + + // value has been written to db, read it out and verify. + String querySql = "SELECT * FROM " + tableName + " WHERE field3=3"; + int count = sqliteUtils.select(querySql, (resultSet) -> { + Assert.assertEquals(insertObj.getField1(), resultSet.getString(1)); + Assert.assertNull(insertObj.getField2()); + Assert.assertEquals(insertObj.getField3(), resultSet.getInt(3)); + }); + Assert.assertEquals(count, 1); + + } + private void testOpenAndWriteSink(Map actionProperties) throws Exception { Message insertMessage = mock(MessageImpl.class); GenericSchema genericAvroSchema; // prepare a foo Record Foo insertObj = new Foo(); insertObj.setField1("ValueOfField1"); - insertObj.setField2("ValueOfField1"); + insertObj.setField2("ValueOfField2"); insertObj.setField3(3); AvroSchema schema = AvroSchema.of(SchemaDefinition.builder().withPojo(Foo.class).build()); @@ -163,6 +296,21 @@ public class JdbcSinkTest { } @Test + public void TestNoActionNullValue() throws Exception { + testOpenAndWriteSinkNullValue(ImmutableMap.of("ACTION", "INSERT")); + } + + @Test + public void TestNoActionNullValueJson() throws Exception { + testOpenAndWriteSinkNullValueJson(ImmutableMap.of("ACTION", "INSERT")); + } + + @Test + public void TestNoActionJson() throws Exception { + testOpenAndWriteSinkJson(ImmutableMap.of("ACTION", "INSERT")); + } + + @Test public void TestUnknownAction() throws Exception { Record recordRecord = mock(Record.class); when(recordRecord.getProperties()).thenReturn(ImmutableMap.of("ACTION", "UNKNOWN"));