geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lgalli...@apache.org
Subject [geode] branch feature/GEODE-3781 updated: Fix NPEs with missing table and conmnection configuration. Add Dunit test to simulate end-to-end testing using gfsh commmands. Made changes to use region name as table name when table name is not provided.
Date Fri, 15 Dec 2017 01:43:22 GMT
This is an automated email from the ASF dual-hosted git repository.

lgallinat pushed a commit to branch feature/GEODE-3781
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-3781 by this push:
     new bdab7eb  Fix NPEs with missing table and conmnection configuration. Add Dunit test
to simulate end-to-end testing using gfsh commmands. Made changes to use region name as table
name when table name is not provided.
bdab7eb is described below

commit bdab7ebbdb4f46f709387aa39d7bc0ba8eafc822
Author: Lynn Gallinat <lgallinat@pivotal.io>
AuthorDate: Thu Dec 14 17:36:37 2017 -0800

    Fix NPEs with missing table and conmnection configuration.
    Add Dunit test to simulate end-to-end testing using gfsh commmands.
    Made changes to use region name as table name when table name is not provided.
---
 .../geode/connectors/jdbc/JdbcAsyncWriter.java     |  10 +
 .../jdbc/internal/ConnectionManager.java           |   3 +-
 .../connectors/jdbc/internal/RegionMapping.java    |  12 +-
 .../geode/connectors/jdbc/internal/SqlHandler.java |  16 +-
 .../jdbc/JdbcAsyncWriterIntegrationTest.java       |   2 +-
 .../geode/connectors/jdbc/JdbcDUnitTest.java       | 310 +++++++++++++++++++++
 .../jdbc/internal/ConnectionManagerUnitTest.java   |   1 +
 .../connectors/jdbc/internal/SqlHandlerTest.java   |   1 +
 8 files changed, 350 insertions(+), 5 deletions(-)

diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
index 465e1c6..8f83ecb 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
@@ -41,6 +41,7 @@ public class JdbcAsyncWriter extends AbstractJdbcCallback implements AsyncEventL
 
   private AtomicLong totalEvents = new AtomicLong();
   private AtomicLong successfulEvents = new AtomicLong();
+  private AtomicLong failedEvents = new AtomicLong();
 
   @SuppressWarnings("unused")
   public JdbcAsyncWriter() {
@@ -68,6 +69,7 @@ public class JdbcAsyncWriter extends AbstractJdbcCallback implements AsyncEventL
               getPdxInstance(event));
           changeSuccessfulEvents(1);
         } catch (RuntimeException ex) {
+          changeFailedEvents(1);
           logger.error("Exception processing event {}", event, ex);
         }
       }
@@ -86,10 +88,18 @@ public class JdbcAsyncWriter extends AbstractJdbcCallback implements AsyncEventL
     return successfulEvents.get();
   }
 
+  long getFailedEvents() {
+    return failedEvents.get();
+  }
+
   private void changeSuccessfulEvents(long delta) {
     successfulEvents.addAndGet(delta);
   }
 
+  private void changeFailedEvents(long delta) {
+    failedEvents.addAndGet(delta);
+  }
+
   private void changeTotalEvents(long delta) {
     totalEvents.addAndGet(delta);
   }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ConnectionManager.java
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ConnectionManager.java
index 8e7795b..b6fc8b1 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ConnectionManager.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ConnectionManager.java
@@ -59,7 +59,8 @@ class ConnectionManager {
 
   <K> List<ColumnValue> getColumnToValueList(ConnectionConfiguration config,
       RegionMapping regionMapping, K key, PdxInstance value, Operation operation) {
-    String keyColumnName = getKeyColumnName(config, regionMapping.getTableName());
+    String tableName = regionMapping.getRegionToTableName();
+    String keyColumnName = getKeyColumnName(config, tableName);
     ColumnValue keyColumnValue = new ColumnValue(true, keyColumnName, key);
 
     if (operation.isDestroy() || operation.isGet()) {
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/RegionMapping.java
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/RegionMapping.java
index 710bbbf..204ba37 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/RegionMapping.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/RegionMapping.java
@@ -61,8 +61,18 @@ public class RegionMapping implements Serializable {
     return primaryKeyInValue;
   }
 
+  public String getRegionToTableName() {
+    if (tableName == null) {
+      return regionName;
+    }
+    return tableName;
+  }
+
   public String getColumnNameForField(String fieldName) {
-    String columnName = fieldToColumnMap.get(fieldName);
+    String columnName = null;
+    if (fieldToColumnMap != null) {
+      columnName = fieldToColumnMap.get(fieldName);
+    }
     return columnName != null ? columnName : fieldName;
   }
 
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
index 34703fc..240ff3f 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
@@ -51,7 +51,7 @@ public class SqlHandler {
 
     List<ColumnValue> columnList =
         manager.getColumnToValueList(connectionConfig, regionMapping, key, null, Operation.GET);
-    String tableName = regionMapping.getTableName();
+    String tableName = regionMapping.getRegionToTableName();
     PreparedStatement statement = manager.getPreparedStatement(
         manager.getConnection(connectionConfig), columnList, tableName, Operation.GET, 0);
     PdxInstanceFactory factory = getPdxInstanceFactory(region, regionMapping);
@@ -126,12 +126,24 @@ public class SqlHandler {
       throw new IllegalArgumentException("PdxInstance cannot be null for non-destroy operations");
     }
     RegionMapping regionMapping = manager.getMappingForRegion(region.getName());
-    final String tableName = regionMapping.getTableName();
+
+    if (regionMapping == null) {
+      throw new IllegalStateException("JDBC write failed. JDBC mapping for region " +
+          region.getFullPath() +
+          " not found. Create mapping with gfsh command 'create jdbc-mapping'.");
+    }
     ConnectionConfiguration connectionConfig =
         manager.getConnectionConfig(regionMapping.getConnectionConfigName());
+    if (connectionConfig == null) {
+      throw new IllegalStateException(
+          "JDBC write failed. JDBC Connection configuration for connection name " +
+      regionMapping.getConnectionConfigName() + " not found.");
+    }
+
     List<ColumnValue> columnList =
         manager.getColumnToValueList(connectionConfig, regionMapping, key, value, operation);
 
+    String tableName = regionMapping.getRegionToTableName();
     int pdxTypeId = value == null ? 0 : ((PdxInstanceImpl) value).getPdxType().getTypeId();
     PreparedStatement statement = manager.getPreparedStatement(
         manager.getConnection(connectionConfig), columnList, tableName, operation, pdxTypeId);
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
index 2d52f34..2aacfbe 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
@@ -137,7 +137,7 @@ public class JdbcAsyncWriterIntegrationTest {
 
     awaitUntil(() -> assertThat(jdbcWriter.getTotalEvents()).isEqualTo(1));
 
-    assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(0);
+    awaitUntil(() -> assertThat(jdbcWriter.getFailedEvents()).isEqualTo(1));
   }
 
   @Test
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java
new file mode 100644
index 0000000..a70531c
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.TransactionException;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.rules.LocatorServerStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+
+@Category(DistributedTest.class)
+public class JdbcDUnitTest implements Serializable {
+
+  private static final String DB_NAME = "DerbyDB";
+  private static final String TABLE_NAME = "employees";
+  private static final String REGION_NAME = "employees";
+  private static final String CONNECTION_URL = "jdbc:derby:memory:" + DB_NAME + ";create=true";
+  private static final String CONNECTION_NAME = "TestConnection";
+
+  @Rule
+  public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+  @Rule
+  public LocatorServerStartupRule startupRule = new LocatorServerStartupRule();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  private MemberVM locator;
+  private MemberVM server;
+
+  @Before
+  public void setup() throws Exception {
+    locator = startupRule.startLocatorVM(0);
+    gfsh.connectAndVerify(locator);
+    server = startupRule.startServerVM(1, locator.getPort());
+    server.invoke(()-> {
+      createTable();
+    });
+  }
+
+  private void createTable() throws SQLException {
+    Connection connection = DriverManager.getConnection(CONNECTION_URL);
+    Statement statement = connection.createStatement();
+    statement.execute("Create Table " + TABLE_NAME
+        + " (id varchar(10) primary key not null, name varchar(10), age int)");
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    server.invoke(() -> {
+      CacheFactory.getAnyInstance().close();
+      closeDB();
+    });
+  }
+
+  private void closeDB() throws Exception {
+    try {
+      Connection connection = DriverManager.getConnection(CONNECTION_URL);
+      Statement statement = connection.createStatement();
+      if (statement == null) {
+        statement = connection.createStatement();
+      }
+      statement.execute("Drop table " + TABLE_NAME);
+      statement.close();
+
+      if (connection != null) {
+        connection.close();
+      }
+    } catch (SQLException ex) {
+      System.out.println("SQL Exception is thrown while closing the database.");
+    }
+  }
+
+  @Test
+  public void throwsExceptionWhenNoMappingExistsUsingWriter() throws Exception {
+    createRegion(true, false, false);
+    createJdbcConnection();
+
+    server.invoke(() -> {
+      PdxInstance pdxEmployee1 =
+          CacheFactory.getAnyInstance().createPdxInstanceFactory(Employee.class.getName())
+              .writeString("name", "Emp1").writeInt("age", 55).create();
+      Region region = CacheFactory.getAnyInstance().getRegion(REGION_NAME);
+      assertThatThrownBy(() -> region.put("key1", pdxEmployee1)).isExactlyInstanceOf(IllegalStateException.class).hasMessageContaining("JDBC
write failed");
+    });
+  }
+
+  @Test
+  public void throwsExceptionWhenNoMappingExistsUsingAsyncWriter() throws Exception {
+    IgnoredException.addIgnoredException("IllegalStateException");
+    createRegion(false, true, false);
+    createJdbcConnection();
+
+    server.invoke(() -> {
+      PdxInstance pdxEmployee1 =
+          CacheFactory.getAnyInstance().createPdxInstanceFactory(Employee.class.getName())
+              .writeString("name", "Emp1").writeInt("age", 55).create();
+      Region region = CacheFactory.getAnyInstance().getRegion(REGION_NAME);
+      region.put("key1", pdxEmployee1);
+
+      JdbcAsyncWriter asyncWriter = (JdbcAsyncWriter)CacheFactory.getAnyInstance().getAsyncEventQueue("JAW").getAsyncEventListener();
+      Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
+        assertThat(asyncWriter.getFailedEvents()).isEqualTo(1);
+      });
+
+    });
+  }
+
+  @Test
+  public void throwsExceptionWhenNoMappingMatches() throws Exception {
+    createRegion(true, false, false);
+    createJdbcConnection();
+    createMapping("NoSuchRegion", CONNECTION_NAME);
+
+    server.invoke(() -> {
+      PdxInstance pdxEmployee1 =
+          CacheFactory.getAnyInstance().createPdxInstanceFactory(Employee.class.getName())
+              .writeString("name", "Emp1").writeInt("age", 55).create();
+      Region region = CacheFactory.getAnyInstance().getRegion(REGION_NAME);
+      assertThatThrownBy(() -> region.put("key1", pdxEmployee1)).isExactlyInstanceOf(IllegalStateException.class).hasMessageContaining("JDBC
write failed");
+    });
+  }
+
+  @Test
+  public void throwsExceptionWhenNoConnectionExists() throws Exception {
+    createRegion(true, false, false);
+    createMapping(REGION_NAME, CONNECTION_NAME);
+
+    server.invoke(() -> {
+      PdxInstance pdxEmployee1 =
+          CacheFactory.getAnyInstance().createPdxInstanceFactory(Employee.class.getName())
+              .writeString("name", "Emp1").writeInt("age", 55).create();
+      Region region = CacheFactory.getAnyInstance().getRegion(REGION_NAME);
+      assertThatThrownBy(() -> region.put("key1", pdxEmployee1)).isExactlyInstanceOf(IllegalStateException.class).hasMessageContaining("JDBC
write failed");    });
+  }
+
+  @Test
+  public void putWritesToDB() throws Exception {
+    createRegion(true, false, false);
+    createJdbcConnection();
+    createMapping(REGION_NAME, CONNECTION_NAME);
+    server.invoke(() -> {
+      PdxInstance pdxEmployee1 =
+          CacheFactory.getAnyInstance().createPdxInstanceFactory(Employee.class.getName())
+              .writeString("id", "key1")
+              .writeString("name", "Emp1")
+              .writeInt("age", 55).create();
+
+      String key = "emp1";
+      CacheFactory.getAnyInstance().getRegion(REGION_NAME).put(key, pdxEmployee1);
+      assertTableHasEmployeeData(1, pdxEmployee1, key);
+    });
+  }
+
+  @Test
+  public void putAsyncWritesToDB() throws Exception {
+    createRegion(true, false, false);
+    createJdbcConnection();
+    createMapping(REGION_NAME, CONNECTION_NAME);
+    server.invoke(() -> {
+      PdxInstance pdxEmployee1 =
+          CacheFactory.getAnyInstance().createPdxInstanceFactory(Employee.class.getName())
+              .writeString("id", "key1")
+              .writeString("name", "Emp1")
+              .writeInt("age", 55).create();
+
+      String key = "emp1";
+      CacheFactory.getAnyInstance().getRegion(REGION_NAME).put(key, pdxEmployee1);
+      assertTableHasEmployeeData(1, pdxEmployee1, key);
+    });
+  }
+
+  @Test
+  public void getReadsFromEmptyDB() throws Exception {
+    createRegion(false, false, true);
+    createJdbcConnection();
+    createMapping(REGION_NAME, CONNECTION_NAME);
+    server.invoke(() -> {
+      String key = "emp1";
+      Region region = CacheFactory.getAnyInstance().getRegion(REGION_NAME);
+      region.get(key);
+      assertThat(region.size()).isEqualTo(0);
+    });
+  }
+
+  @Test
+  public void getReadsFromDB() throws Exception {
+    createRegion(true, false, true);
+    createJdbcConnection();
+    createMapping(REGION_NAME, CONNECTION_NAME);
+    server.invoke(() -> {
+      PdxInstance pdxEmployee1 =
+          CacheFactory.getAnyInstance().createPdxInstanceFactory(Employee.class.getName())
+              .writeString("id", "id1")
+              .writeString("name", "Emp1")
+              .writeInt("age", 55).create();
+
+      String key = "id1";
+      Region region = CacheFactory.getAnyInstance().getRegion(REGION_NAME);
+      region.put(key, pdxEmployee1);
+      region.invalidate(key);
+
+      PdxInstance result = (PdxInstance)region.get(key);
+      assertThat(result.getField("id")).isEqualTo(pdxEmployee1.getField("id"));
+      assertThat(result.getField("name")).isEqualTo(pdxEmployee1.getField("name"));
+      assertThat(result.getField("age")).isEqualTo(pdxEmployee1.getField("age"));
+    });
+  }
+
+  private void createJdbcConnection() {
+    final String commandStr = "create jdbc-connection --name=" + CONNECTION_NAME + " --url="
+        + CONNECTION_URL
+        + " --params=param1:value1,this.is.param2:value.2,this-is-value-3,value-3,this_is_param_4:value_4";
+    gfsh.executeAndAssertThat(commandStr).statusIsSuccess();
+  }
+
+  private void createAsyncListener(String id) {
+    final String commandStr = "create async-event-queue --id=" + id
+        + " --listener=" + JdbcAsyncWriter.class.getName()
+        + " --batch-size=1 --batch-time-interval=0 --parallel=false";
+    gfsh.executeAndAssertThat(commandStr).statusIsSuccess();
+  }
+
+  private void createRegion(boolean withCacheWriter, boolean withAsyncWriter, boolean withLoader)
{
+    StringBuffer createRegionCmd = new StringBuffer();
+    createRegionCmd.append("create region --name=" + REGION_NAME + " --type=REPLICATE ");
+    if (withCacheWriter) {
+      createRegionCmd.append(" --cache-writer=" + JdbcWriter.class.getName());
+    }
+    if (withLoader) {
+      createRegionCmd.append(" --cache-loader=" + JdbcLoader.class.getName());
+    }
+    if (withAsyncWriter) {
+      createAsyncListener("JAW");
+      createRegionCmd.append(" --async-event-queue-id=JAW");
+    }
+
+    gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
+  }
+
+  private void createMapping(String regionName, String connectionName) {
+    final String commandStr =
+        "create jdbc-mapping --region=" + regionName + " --connection=" + connectionName;
+    gfsh.executeAndAssertThat(commandStr).statusIsSuccess();
+  }
+
+  private void assertTableHasEmployeeData(int size, PdxInstance employee, String key)
+      throws SQLException {
+    Connection connection = DriverManager.getConnection(CONNECTION_URL);
+    Statement statement = connection.createStatement();
+    Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
+      assertThat(getRowCount(statement, TABLE_NAME)).isEqualTo(size);
+    });
+
+    ResultSet resultSet = statement.executeQuery("select * from " + REGION_NAME + " order
by id asc");
+    assertThat(resultSet.next()).isTrue();
+    assertThat(resultSet.getString("id")).isEqualTo(key);
+    assertThat(resultSet.getString("name")).isEqualTo(employee.getField("name"));
+    assertThat(resultSet.getObject("age")).isEqualTo(employee.getField("age"));
+  }
+
+  private int getRowCount(Statement stmt, String tableName) {
+    try {
+      ResultSet resultSet =
+          stmt.executeQuery("select count(*) from " + tableName);
+      resultSet.next();
+      return resultSet.getInt(1);
+    } catch (SQLException e){
+      return -1;
+    }
+  }
+
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ConnectionManagerUnitTest.java
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ConnectionManagerUnitTest.java
index 947559d..9478b19 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ConnectionManagerUnitTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ConnectionManagerUnitTest.java
@@ -68,6 +68,7 @@ public class ConnectionManagerUnitTest {
     connectionConfig = new ConnectionConfiguration("name", "url", null, null, null);
 
     when(mapping.getTableName()).thenReturn(TABLE_NAME);
+    when(mapping.getRegionToTableName()).thenReturn(TABLE_NAME);
     doReturn(connection).when(manager).getSQLConnection(connectionConfig);
 
     key = new Object();
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
index 7cd623b..05dcdb9 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
@@ -334,6 +334,7 @@ public class SqlHandlerTest {
     regionMapping = mock(RegionMapping.class);
     when(regionMapping.getRegionName()).thenReturn(REGION_NAME);
     when(regionMapping.getTableName()).thenReturn(TABLE_NAME);
+    when(regionMapping.getRegionToTableName()).thenReturn(TABLE_NAME);
     when(manager.getMappingForRegion(any())).thenReturn(regionMapping);
 
     List<ColumnValue> columnList = new ArrayList<>();

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <commits@geode.apache.org>'].

Mime
View raw message