geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nre...@apache.org
Subject [geode] branch feature/GEODE-3781 updated: Wired jdbc loaders and writers to the connector service and cleaned up tests
Date Thu, 16 Nov 2017 23:19:57 GMT
This is an automated email from the ASF dual-hosted git repository.

nreich pushed a commit to branch feature/GEODE-3781
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-3781 by this push:
     new 76abaff  Wired jdbc loaders and writers to the connector service and cleaned up tests
76abaff is described below

commit 76abaffd4eb3f31ca61df0969610e9e52bbddc7e
Author: Nick Reich <nreich@pivotal.io>
AuthorDate: Thu Nov 16 15:19:09 2017 -0800

    Wired jdbc loaders and writers to the connector service and cleaned up tests
---
 .../connectors/jdbc/AbstractJdbcCallback.java      |  65 +++++
 .../geode/connectors/jdbc/JdbcAsyncWriter.java     |  90 +++----
 .../apache/geode/connectors/jdbc/JdbcLoader.java   |  42 +--
 .../connectors/jdbc/JdbcSynchronousWriter.java     |  82 +++---
 .../jdbc/internal/ConnectionManager.java           |   4 +-
 .../internal/InternalJdbcConnectorService.java     |   5 +
 .../geode/connectors/jdbc/internal/SqlHandler.java |   4 +
 .../connectors/jdbc/AbstractJdbcCallbackTest.java  |  71 +++++
 .../org/apache/geode/connectors/jdbc/Employee.java |  54 ++++
 .../jdbc/JdbcAsyncWriterIntegrationTest.java       | 291 ++++++++-------------
 .../geode/connectors/jdbc/JdbcAsyncWriterTest.java |  89 +++++++
 .../connectors/jdbc/JdbcLoaderIntegrationTest.java |  49 ++--
 .../geode/connectors/jdbc/JdbcLoaderTest.java      |  40 +++
 .../jdbc/JdbcSynchronousWriterIntegrationTest.java | 268 +++++++------------
 .../connectors/jdbc/JdbcSynchronousWriterTest.java | 115 ++++++++
 15 files changed, 758 insertions(+), 511 deletions(-)

diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/AbstractJdbcCallback.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/AbstractJdbcCallback.java
new file mode 100644
index 0000000..bb0c2c5
--- /dev/null
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/AbstractJdbcCallback.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc;
+
+import java.util.Properties;
+
+import org.apache.geode.cache.CacheCallback;
+import org.apache.geode.connectors.jdbc.internal.ConnectionManager;
+import org.apache.geode.connectors.jdbc.internal.InternalJdbcConnectorService;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.internal.cache.InternalCache;
+
+abstract class AbstractJdbcCallback implements CacheCallback {
+
+  private volatile SqlHandler sqlHandler;
+
+  AbstractJdbcCallback() {
+    // nothing
+  }
+
+  AbstractJdbcCallback(SqlHandler sqlHandler) {
+    this.sqlHandler = sqlHandler;
+  }
+
+  @Override
+  public void close() {
+    if (sqlHandler != null) {
+      sqlHandler.close();
+    }
+  }
+
+  @Override
+  public void init(Properties props) {
+    // nothing
+  }
+
+  SqlHandler getSqlHandler() {
+    return sqlHandler;
+  }
+
+  void checkInitialized(InternalCache cache) {
+    if (sqlHandler == null) {
+      initialize(cache);
+    }
+  }
+
+  private synchronized void initialize(InternalCache cache) {
+    if (sqlHandler == null) {
+      InternalJdbcConnectorService service = cache.getService(InternalJdbcConnectorService.class);
+      ConnectionManager manager = new ConnectionManager(service);
+      sqlHandler = new SqlHandler(manager);
+    }
+  }
+}
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
index a009658..dfc8297 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
@@ -15,100 +15,90 @@
 package org.apache.geode.connectors.jdbc;
 
 import java.util.List;
-import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CopyHelper;
 import org.apache.geode.cache.asyncqueue.AsyncEvent;
 import org.apache.geode.cache.asyncqueue.AsyncEventListener;
 import org.apache.geode.cache.query.internal.DefaultQuery;
-import org.apache.geode.connectors.jdbc.internal.ConnectionManager;
 import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.pdx.PdxInstance;
-import org.apache.logging.log4j.Logger;
 
-/*
+/**
  * This class provides write behind cache semantics for a JDBC data source using AsyncEventListener.
  *
  * @since Geode 1.4
  */
-public class JdbcAsyncWriter implements AsyncEventListener {
-  static final Logger logger = LogService.getLogger();
-  private long totalEvents = 0;
-  private long successfulEvents = 0;
-  private ConnectionManager manager;
-  private SqlHandler sqlHandler;
+public class JdbcAsyncWriter extends AbstractJdbcCallback implements AsyncEventListener {
+  private static final Logger logger = LogService.getLogger();
 
-  // Constructor for test purposes only
-  JdbcAsyncWriter(ConnectionManager manager) {
-    this.manager = manager;
-    sqlHandler = new SqlHandler(manager);
-  }
+  private AtomicLong totalEvents = new AtomicLong();
+  private AtomicLong successfulEvents = new AtomicLong();
 
-  @Override
-  public void close() {
-    if (this.manager != null) {
-      this.manager.close();
-    }
+  @SuppressWarnings("unused")
+  public JdbcAsyncWriter() {
+    super();
   }
 
-  /**
-   * precondition: DefaultQuery.setPdxReadSerialized(true)
-   */
-  @SuppressWarnings("rawtypes")
-  private PdxInstance getPdxInstance(AsyncEvent event) {
-    Object v = event.getDeserializedValue();
-    if (!(v instanceof PdxInstance)) {
-      v = CopyHelper.copy(v);
-    }
-    return (PdxInstance) v;
+  // Constructor for test purposes only
+  JdbcAsyncWriter(SqlHandler sqlHandler) {
+    super(sqlHandler);
   }
 
-
-  @SuppressWarnings("rawtypes")
   @Override
   public boolean processEvents(List<AsyncEvent> events) {
     changeTotalEvents(events.size());
-    // TODO: have a better API that lets you do this
+
+    if (!events.isEmpty()) {
+      checkInitialized((InternalCache) events.get(0).getRegion().getRegionService());
+    }
+
     DefaultQuery.setPdxReadSerialized(true);
     try {
       for (AsyncEvent event : events) {
         try {
-          sqlHandler.write(event.getRegion(), event.getOperation(), event.getKey(),
+          getSqlHandler().write(event.getRegion(), event.getOperation(), event.getKey(),
               getPdxInstance(event));
           changeSuccessfulEvents(1);
         } catch (RuntimeException ex) {
-          // TODO improve the following logging
-          logger.error("Exception processing event " + event, ex);
+          logger.error("Exception processing event {}", event, ex);
         }
       }
     } finally {
       DefaultQuery.setPdxReadSerialized(false);
     }
+
     return true;
   }
 
-  @Override
-  public void init(Properties props) {
-    /*
-     * JDBCConfiguration config = new JDBCConfiguration(props); this.manager = new
-     * ConnectionManager(config);
-     */
+  long getTotalEvents() {
+    return totalEvents.get();
   }
 
-  private synchronized void changeTotalEvents(long delta) {
-    this.totalEvents += delta;
+  long getSuccessfulEvents() {
+    return successfulEvents.get();
   }
 
-  public synchronized long getTotalEvents() {
-    return this.totalEvents;
+  private void changeSuccessfulEvents(long delta) {
+    successfulEvents.addAndGet(delta);
   }
 
-  private synchronized void changeSuccessfulEvents(long delta) {
-    this.successfulEvents += delta;
+  private void changeTotalEvents(long delta) {
+    totalEvents.addAndGet(delta);
   }
 
-  public synchronized long getSuccessfulEvents() {
-    return this.successfulEvents;
+  /**
+   * precondition: DefaultQuery.setPdxReadSerialized(true)
+   */
+  private PdxInstance getPdxInstance(AsyncEvent event) {
+    Object value = event.getDeserializedValue();
+    if (!(value instanceof PdxInstance)) {
+      value = CopyHelper.copy(value);
+    }
+    return (PdxInstance) value;
   }
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcLoader.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcLoader.java
index 2fec773..aed38f1 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcLoader.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcLoader.java
@@ -14,54 +14,38 @@
  */
 package org.apache.geode.connectors.jdbc;
 
-import org.apache.geode.cache.LoaderHelper;
-import org.apache.geode.connectors.jdbc.internal.ConnectionManager;
-
-import java.util.Properties;
-
 import org.apache.geode.cache.CacheLoader;
 import org.apache.geode.cache.CacheLoaderException;
+import org.apache.geode.cache.LoaderHelper;
 import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.internal.cache.InternalCache;
 
-/*
+/**
  * This class provides loading from a data source using JDBC.
  *
  * @since Geode 1.4
  */
-public class JdbcLoader<K, V> implements CacheLoader<K, V> {
-  private ConnectionManager manager;
-  private SqlHandler sqlHandler;
+public class JdbcLoader<K, V> extends AbstractJdbcCallback implements CacheLoader<K, V> {
 
-  // Constructor for test purposes only
-  JdbcLoader(ConnectionManager manager) {
-    this.manager = manager;
-    sqlHandler = new SqlHandler(manager);
+  @SuppressWarnings("unused")
+  public JdbcLoader() {
+    super();
   }
 
-  @Override
-  public void close() {
-    if (this.manager != null) {
-      this.manager.close();
-    }
+  // Constructor for test purposes only
+  JdbcLoader(SqlHandler sqlHandler) {
+    super(sqlHandler);
   }
 
-  @SuppressWarnings("unchecked")
-  @Override
   /**
    * @return this method always returns a PdxInstance. It does not matter what the V generic
    *         parameter is set to.
    */
+  @Override
   public V load(LoaderHelper<K, V> helper) throws CacheLoaderException {
     // The following cast to V is to keep the compiler happy
     // but is erased at runtime and no actual cast happens.
-    return (V) sqlHandler.read(helper.getRegion(), helper.getKey());
+    checkInitialized((InternalCache) helper.getRegion().getRegionService());
+    return (V) getSqlHandler().read(helper.getRegion(), helper.getKey());
   }
-
-  public void init(Properties props) {
-    /*
-     * JDBCConfiguration config = new JDBCConfiguration(props); this.manager = new
-     * ConnectionManager(config);
-     */
-    // TODO: make this get the JdbcConnectorService?
-  };
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcSynchronousWriter.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcSynchronousWriter.java
index cf16152..6440bf5 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcSynchronousWriter.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcSynchronousWriter.java
@@ -14,8 +14,6 @@
  */
 package org.apache.geode.connectors.jdbc;
 
-import java.util.Properties;
-
 import org.apache.geode.CopyHelper;
 import org.apache.geode.cache.CacheWriter;
 import org.apache.geode.cache.CacheWriterException;
@@ -23,74 +21,46 @@ import org.apache.geode.cache.EntryEvent;
 import org.apache.geode.cache.RegionEvent;
 import org.apache.geode.cache.SerializedCacheValue;
 import org.apache.geode.cache.query.internal.DefaultQuery;
-import org.apache.geode.connectors.jdbc.internal.ConnectionManager;
 import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.pdx.PdxInstance;
 
-/*
+/**
  * This class provides synchronous write through to a data source using JDBC.
  *
  * @since Geode 1.4
  */
-public class JdbcSynchronousWriter<K, V> implements CacheWriter<K, V> {
-  private ConnectionManager manager;
-  private SqlHandler sqlHandler;
-
-  // Constructor for test purposes only
-  JdbcSynchronousWriter(ConnectionManager manager) {
-    this.manager = manager;
-    sqlHandler = new SqlHandler(manager);
-  }
+public class JdbcSynchronousWriter<K, V> extends AbstractJdbcCallback implements CacheWriter<K, V> {
 
-  @Override
-  public void init(Properties props) {
-    /*
-     * JDBCConfiguration config = new JDBCConfiguration(props); this.manager = new
-     * ConnectionManager(config);
-     */
+  @SuppressWarnings("unused")
+  public JdbcSynchronousWriter() {
+    super();
   }
 
-  @Override
-  public void close() {
-    if (this.manager != null) {
-      this.manager.close();
-    }
+  // Constructor for test purposes only
+  JdbcSynchronousWriter(SqlHandler sqlHandler) {
+    super(sqlHandler);
   }
 
-  private PdxInstance getPdxNewValue(EntryEvent<K, V> event) {
-    // TODO: have a better API that lets you do this
-    DefaultQuery.setPdxReadSerialized(true);
-    try {
-      Object v = event.getNewValue();
-      if (!(v instanceof PdxInstance)) {
-        SerializedCacheValue<V> sv = event.getSerializedNewValue();
-        if (sv != null) {
-          v = sv.getDeserializedValue();
-        } else {
-          v = CopyHelper.copy(v);
-        }
-      }
-      return (PdxInstance) v;
-    } finally {
-      DefaultQuery.setPdxReadSerialized(false);
-    }
-  }
 
   @Override
   public void beforeUpdate(EntryEvent<K, V> event) throws CacheWriterException {
-    sqlHandler.write(event.getRegion(), event.getOperation(), event.getKey(),
+    checkInitialized((InternalCache) event.getRegion().getRegionService());
+    getSqlHandler().write(event.getRegion(), event.getOperation(), event.getKey(),
         getPdxNewValue(event));
   }
 
   @Override
   public void beforeCreate(EntryEvent<K, V> event) throws CacheWriterException {
-    sqlHandler.write(event.getRegion(), event.getOperation(), event.getKey(),
+    checkInitialized((InternalCache) event.getRegion().getRegionService());
+    getSqlHandler().write(event.getRegion(), event.getOperation(), event.getKey(),
         getPdxNewValue(event));
   }
 
   @Override
   public void beforeDestroy(EntryEvent<K, V> event) throws CacheWriterException {
-    sqlHandler.write(event.getRegion(), event.getOperation(), event.getKey(),
+    checkInitialized((InternalCache) event.getRegion().getRegionService());
+    getSqlHandler().write(event.getRegion(), event.getOperation(), event.getKey(),
         getPdxNewValue(event));
   }
 
@@ -104,4 +74,26 @@ public class JdbcSynchronousWriter<K, V> implements CacheWriter<K, V> {
     // this event is not sent to JDBC
   }
 
+  private PdxInstance getPdxNewValue(EntryEvent<K, V> event) {
+    DefaultQuery.setPdxReadSerialized(true);
+    try {
+      Object newValue = event.getNewValue();
+      if (!(newValue instanceof PdxInstance)) {
+        SerializedCacheValue<V> serializedNewValue = event.getSerializedNewValue();
+        if (serializedNewValue != null) {
+          newValue = serializedNewValue.getDeserializedValue();
+        } else {
+          newValue = CopyHelper.copy(newValue);
+        }
+        if (newValue != null && !(newValue instanceof PdxInstance)) {
+          String valueClassName = newValue == null ? "null" : newValue.getClass().getName();
+          throw new IllegalArgumentException(getClass().getSimpleName()
+              + " only supports PDX values; newValue is " + valueClassName);
+        }
+      }
+      return (PdxInstance) newValue;
+    } finally {
+      DefaultQuery.setPdxReadSerialized(false);
+    }
+  }
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ConnectionManager.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ConnectionManager.java
index 7746b49..a114da7 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ConnectionManager.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ConnectionManager.java
@@ -32,7 +32,7 @@ import org.apache.geode.pdx.PdxInstance;
 
 public class ConnectionManager {
 
-  private final JdbcConnectorService configService;
+  private final InternalJdbcConnectorService configService;
 
   private final Map<String, Connection> connectionMap = new ConcurrentHashMap<>();
 
@@ -40,7 +40,7 @@ public class ConnectionManager {
 
   private final ThreadLocal<PreparedStatementCache> preparedStatementCache = new ThreadLocal<>();
 
-  public ConnectionManager(JdbcConnectorService configService) {
+  public ConnectionManager(InternalJdbcConnectorService configService) {
     this.configService = configService;
   }
 
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/InternalJdbcConnectorService.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/InternalJdbcConnectorService.java
index 881dec8..49938e4 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/InternalJdbcConnectorService.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/InternalJdbcConnectorService.java
@@ -21,4 +21,9 @@ public interface InternalJdbcConnectorService extends Extension<Cache>, CacheSer
   void addOrUpdateConnectionConfig(ConnectionConfiguration config);
 
   void addOrUpdateRegionMapping(RegionMapping mapping);
+
+  ConnectionConfiguration getConnectionConfig(String connectionName);
+
+  RegionMapping getMappingForRegion(String regionName);
+
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
index 0bdcab2..5aefb20 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
@@ -33,6 +33,10 @@ public class SqlHandler {
     this.manager = manager;
   }
 
+  public void close() {
+    manager.close();
+  }
+
   public PdxInstance read(Region region, Object key) {
     if (key == null) {
       throw new IllegalArgumentException("Key for query cannot be null");
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/AbstractJdbcCallbackTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/AbstractJdbcCallbackTest.java
new file mode 100644
index 0000000..05b6334
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/AbstractJdbcCallbackTest.java
@@ -0,0 +1,71 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc;
+
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.connectors.jdbc.internal.InternalJdbcConnectorService;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.internal.cache.InternalCache;
+
+public class AbstractJdbcCallbackTest {
+
+  private AbstractJdbcCallback jdbcCallback;
+  private SqlHandler sqlHandler;
+
+  @Before
+  public void setUp() throws Exception {
+    sqlHandler = mock(SqlHandler.class);
+    jdbcCallback = new AbstractJdbcCallback(sqlHandler) {};
+  }
+
+  @Test
+  public void closesSqlHandler() throws Exception {
+    jdbcCallback.close();
+    verify(sqlHandler, times(1)).close();
+  }
+
+  @Test
+  public void returnsCorrectSqlHander() throws Exception {
+    assertThat(jdbcCallback.getSqlHandler()).isSameAs(sqlHandler);
+  }
+
+  @Test
+  public void checkInitializedDoesNothingIfInitialized() {
+    jdbcCallback.checkInitialized(mock(InternalCache.class));
+    assertThat(jdbcCallback.getSqlHandler()).isSameAs(sqlHandler);
+  }
+
+  @Test
+  public void initializedSqlHandlerIfNoneExists() {
+    jdbcCallback = new AbstractJdbcCallback() {};
+    InternalCache cache = mock(InternalCache.class);
+    InternalJdbcConnectorService service = mock(InternalJdbcConnectorService.class);
+    when(cache.getService(any())).thenReturn(service);
+    assertThat(jdbcCallback.getSqlHandler()).isNull();
+
+    jdbcCallback.checkInitialized(cache);
+
+    assertThat(jdbcCallback.getSqlHandler()).isNotNull();
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/Employee.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/Employee.java
new file mode 100644
index 0000000..82e2e90
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/Employee.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc;
+
+import org.apache.geode.pdx.PdxReader;
+import org.apache.geode.pdx.PdxSerializable;
+import org.apache.geode.pdx.PdxWriter;
+
+@SuppressWarnings("unused")
+public class Employee implements PdxSerializable {
+  private String name;
+  private int age;
+
+  public Employee() {
+    // nothing
+  }
+
+  Employee(String name, int age) {
+    this.name = name;
+    this.age = age;
+  }
+
+  String getName() {
+    return name;
+  }
+
+  int getAge() {
+    return age;
+  }
+
+  @Override
+  public void toData(PdxWriter writer) {
+    writer.writeString("name", this.name);
+    writer.writeInt("age", this.age);
+  }
+
+  @Override
+  public void fromData(PdxReader reader) {
+    this.name = reader.readString("name");
+    this.age = reader.readInt("age");
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
index 96e5328..cda4c2c 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.connectors.jdbc;
 
+import static org.apache.geode.cache.RegionShortcut.REPLICATE;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.sql.Connection;
@@ -33,33 +34,43 @@ import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionFactory;
-import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.connectors.jdbc.internal.ConnectionManager;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
 import org.apache.geode.connectors.jdbc.internal.TestConfigService;
 import org.apache.geode.pdx.PdxInstance;
-import org.apache.geode.pdx.PdxReader;
-import org.apache.geode.pdx.PdxSerializable;
-import org.apache.geode.pdx.PdxSerializationException;
-import org.apache.geode.pdx.PdxWriter;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 
 @Category(IntegrationTest.class)
 public class JdbcAsyncWriterIntegrationTest {
+
+  private static final String DB_NAME = "DerbyDB";
+  private static final String REGION_TABLE_NAME = "employees";
+  private static final String CONNECTION_URL = "jdbc:derby:memory:" + DB_NAME + ";create=true";
+
   private Cache cache;
+  private Region<String, PdxInstance> employees;
   private Connection connection;
   private Statement statement;
   private JdbcAsyncWriter jdbcWriter;
-  private String dbName = "DerbyDB";
-  private String regionTableName = "employees";
-  private String connectionURL = "jdbc:derby:memory:" + dbName + ";create=true";
+  private PdxInstance pdxEmployee1;
+  private PdxInstance pdxEmployee2;
+  private Employee employee1;
+  private Employee employee2;
 
   @Before
   public void setup() throws Exception {
     cache = new CacheFactory().setPdxReadSerialized(false).create();
-    connection = DriverManager.getConnection(connectionURL);
+    employees = createRegionWithJDBCAsyncWriter(REGION_TABLE_NAME);
+    connection = DriverManager.getConnection(CONNECTION_URL);
     statement = connection.createStatement();
-    statement.execute("Create Table " + regionTableName
+    statement.execute("Create Table " + REGION_TABLE_NAME
         + " (id varchar(10) primary key not null, name varchar(10), age int)");
+    pdxEmployee1 = cache.createPdxInstanceFactory(Employee.class.getName())
+        .writeString("name", "Emp1").writeInt("age", 55).create();
+    pdxEmployee2 = cache.createPdxInstanceFactory(Employee.class.getName())
+        .writeString("name", "Emp2").writeInt("age", 21).create();
+    employee1 = (Employee) pdxEmployee1.getObject();
+    employee2 = (Employee) pdxEmployee2.getObject();
   }
 
   @After
@@ -72,7 +83,7 @@ public class JdbcAsyncWriterIntegrationTest {
     if (statement == null) {
       statement = connection.createStatement();
     }
-    statement.execute("Drop table " + regionTableName);
+    statement.execute("Drop table " + REGION_TABLE_NAME);
     statement.close();
 
     if (connection != null) {
@@ -82,246 +93,162 @@ public class JdbcAsyncWriterIntegrationTest {
 
   @Test
   public void validateJDBCAsyncWriterTotalEvents() {
-    Region<String, PdxInstance> employees = createRegionWithJDBCAsyncWriter(regionTableName);
-    PdxInstance pdx1 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
-        .writeInt("age", 55).create();
-    PdxInstance pdx2 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp2")
-        .writeInt("age", 21).create();
-    employees.put("1", pdx1);
-    employees.put("2", pdx2);
+    employees.put("1", pdxEmployee1);
+    employees.put("2", pdxEmployee2);
 
-    Awaitility.await().atMost(5, TimeUnit.SECONDS)
-        .until(() -> assertThat(jdbcWriter.getTotalEvents()).isEqualTo(2));
+    awaitUntil(() -> assertThat(jdbcWriter.getTotalEvents()).isEqualTo(2));
   }
 
   @Test
   public void canInsertIntoTable() throws Exception {
-    Region<String, PdxInstance> employees = createRegionWithJDBCAsyncWriter(regionTableName);
-    PdxInstance pdx1 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
-        .writeInt("age", 55).create();
-    PdxInstance pdx2 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp2")
-        .writeInt("age", 21).create();
-    employees.put("1", pdx1);
-    employees.put("2", pdx2);
-
-    Awaitility.await().atMost(30, TimeUnit.SECONDS)
-        .until(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
-
-    ResultSet rs = statement.executeQuery("select * from " + regionTableName + " order by id asc");
-    assertThat(rs.next()).isTrue();
-    assertThat(rs.getString("id")).isEqualTo("1");
-    assertThat(rs.getString("name")).isEqualTo("Emp1");
-    assertThat(rs.getObject("age")).isEqualTo(55);
-    assertThat(rs.next()).isTrue();
-    assertThat(rs.getString("id")).isEqualTo("2");
-    assertThat(rs.getString("name")).isEqualTo("Emp2");
-    assertThat(rs.getObject("age")).isEqualTo(21);
-    assertThat(rs.next()).isFalse();
+    employees.put("1", pdxEmployee1);
+    employees.put("2", pdxEmployee2);
+
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee1);
+    assertRecordMatchesEmployee(resultSet, "2", employee2);
+    assertThat(resultSet.next()).isFalse();
   }
 
   @Test
   public void verifyThatPdxFieldNamedSameAsPrimaryKeyIsIgnored() throws Exception {
-    Region<String, PdxInstance> employees = createRegionWithJDBCAsyncWriter(regionTableName);
     PdxInstance pdx1 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
         .writeInt("age", 55).writeInt("id", 3).create();
     employees.put("1", pdx1);
 
-    Awaitility.await().atMost(30, TimeUnit.SECONDS)
-        .until(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
 
-    ResultSet rs = statement.executeQuery("select * from " + regionTableName + " order by id asc");
-    assertThat(rs.next()).isTrue();
-    assertThat(rs.getString("id")).isEqualTo("1");
-    assertThat(rs.getString("name")).isEqualTo("Emp1");
-    assertThat(rs.getObject("age")).isEqualTo(55);
-    assertThat(rs.next()).isFalse();
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee1);
+    assertThat(resultSet.next()).isFalse();
   }
 
   @Test
   public void putNonPdxInstanceFails() {
-    Region employees = createRegionWithJDBCAsyncWriter(regionTableName);
-    employees.put("1", "non pdx instance");
+    Region nonPdxEmployees = this.employees;
+    nonPdxEmployees.put("1", "non pdx instance");
+
+    awaitUntil(() -> assertThat(jdbcWriter.getTotalEvents()).isEqualTo(1));
 
-    Awaitility.await().atMost(30, TimeUnit.SECONDS)
-        .until(() -> assertThat(jdbcWriter.getTotalEvents()).isEqualTo(1));
     assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(0);
   }
 
   @Test
   public void putNonPdxInstanceThatIsPdxSerializable() throws SQLException {
-    Region employees = createRegionWithJDBCAsyncWriter(regionTableName);
-    Object value = new TestEmployee("Emp2", 22);
-    employees.put("2", value);
-
-    Awaitility.await().atMost(30, TimeUnit.SECONDS)
-        .until(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
-
-    ResultSet rs = statement.executeQuery("select * from " + regionTableName + " order by id asc");
-    assertThat(rs.next()).isTrue();
-    assertThat(rs.getString("id")).isEqualTo("2");
-    assertThat(rs.getString("name")).isEqualTo("Emp2");
-    assertThat(rs.getObject("age")).isEqualTo(22);
-    assertThat(rs.next()).isFalse();
+    Region nonPdxEmployees = this.employees;
+    Employee value = new Employee("Emp2", 22);
+    nonPdxEmployees.put("2", value);
+
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "2", value);
+    assertThat(resultSet.next()).isFalse();
   }
 
   @Test
   public void canDestroyFromTable() throws Exception {
-    Region<String, PdxInstance> employees = createRegionWithJDBCAsyncWriter(regionTableName);
-    PdxInstance pdx1 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
-        .writeInt("age", 55).create();
-    PdxInstance pdx2 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp2")
-        .writeInt("age", 21).create();
-    employees.put("1", pdx1);
-    employees.put("2", pdx2);
+    employees.put("1", pdxEmployee1);
+    employees.put("2", pdxEmployee2);
 
-    Awaitility.await().atMost(30, TimeUnit.SECONDS)
-        .until(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
 
-    try {
-      employees.destroy("1");
-    } catch (PdxSerializationException ignore) {
-      // destroy tries to deserialize old value
-      // which does not work because our PdxInstance
-      // does not have a real class
-    }
+    employees.destroy("1");
 
-    Awaitility.await().atMost(30, TimeUnit.SECONDS)
-        .until(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(3));
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(3));
 
-    ResultSet rs = statement.executeQuery("select * from " + regionTableName + " order by id asc");
-    assertThat(rs.next()).isTrue();
-    assertThat(rs.getString("id")).isEqualTo("2");
-    assertThat(rs.getString("name")).isEqualTo("Emp2");
-    assertThat(rs.getObject("age")).isEqualTo(21);
-    assertThat(rs.next()).isFalse();
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "2", employee2);
+    assertThat(resultSet.next()).isFalse();
   }
 
   @Test
   public void canUpdateTable() throws Exception {
-    Region<String, PdxInstance> employees = createRegionWithJDBCAsyncWriter(regionTableName);
-    PdxInstance pdx1 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
-        .writeInt("age", 55).create();
-    employees.put("1", pdx1);
+    employees.put("1", pdxEmployee1);
 
-    Awaitility.await().atMost(30, TimeUnit.SECONDS)
-        .until(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
-
-    PdxInstance pdx3 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
-        .writeInt("age", 72).create();
-    try {
-      employees.put("1", pdx3);
-    } catch (PdxSerializationException ignore) {
-      // put tries to deserialize old value
-      // which does not work because our PdxInstance
-      // does not have a real class
-    }
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+
+    employees.put("1", pdxEmployee2);
 
-    Awaitility.await().atMost(30, TimeUnit.SECONDS)
-        .until(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
 
-    ResultSet rs = statement.executeQuery("select * from " + regionTableName + " order by id asc");
-    assertThat(rs.next()).isTrue();
-    assertThat(rs.getObject("age")).isEqualTo(72);
-    assertThat(rs.next()).isFalse();
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee2);
+    assertThat(resultSet.next()).isFalse();
   }
 
   @Test
   public void canUpdateBecomeInsert() throws Exception {
-    Region<String, PdxInstance> employees = createRegionWithJDBCAsyncWriter(regionTableName);
-    PdxInstance pdx1 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
-        .writeInt("age", 55).create();
-    employees.put("1", pdx1);
+    employees.put("1", pdxEmployee1);
 
-    Awaitility.await().atMost(30, TimeUnit.SECONDS)
-        .until(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
 
-    statement.execute("delete from " + regionTableName + " where id = '1'");
+    statement.execute("delete from " + REGION_TABLE_NAME + " where id = '1'");
     validateTableRowCount(0);
 
-    PdxInstance pdx3 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
-        .writeInt("age", 72).create();
-    try {
-      employees.put("1", pdx3);
-    } catch (PdxSerializationException ignore) {
-      // put tries to deserialize old value
-      // which does not work because our PdxInstance
-      // does not have a real class
-    }
+    employees.put("1", pdxEmployee2);
 
-    Awaitility.await().atMost(10, TimeUnit.SECONDS)
-        .until(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
 
-    ResultSet rs = statement.executeQuery("select * from " + regionTableName + " order by id asc");
-    assertThat(rs.next()).isTrue();
-    assertThat(rs.getString("id")).isEqualTo("1");
-    assertThat(rs.getString("name")).isEqualTo("Emp1");
-    assertThat(rs.getObject("age")).isEqualTo(72);
-    assertThat(rs.next()).isFalse();
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee2);
+    assertThat(resultSet.next()).isFalse();
   }
 
   @Test
   public void canInsertBecomeUpdate() throws Exception {
-    statement.execute("Insert into " + regionTableName + " values('1', 'bogus', 11)");
+    statement.execute("Insert into " + REGION_TABLE_NAME + " values('1', 'bogus', 11)");
     validateTableRowCount(1);
 
-    Region<String, PdxInstance> employees = createRegionWithJDBCAsyncWriter(regionTableName);
-    PdxInstance pdx1 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
-        .writeInt("age", 55).create();
-    employees.put("1", pdx1);
+    employees.put("1", pdxEmployee1);
 
-    Awaitility.await().atMost(30, TimeUnit.SECONDS)
-        .until(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+    awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee1);
+    assertThat(resultSet.next()).isFalse();
+  }
+
+  private void awaitUntil(final Runnable supplier) {
+    Awaitility.await().atMost(30, TimeUnit.SECONDS).until(supplier);
+  }
 
-    ResultSet rs = statement.executeQuery("select * from " + regionTableName + " order by id asc");
-    assertThat(rs.next()).isTrue();
-    assertThat(rs.getString("id")).isEqualTo("1");
-    assertThat(rs.getString("name")).isEqualTo("Emp1");
-    assertThat(rs.getObject("age")).isEqualTo(55);
-    assertThat(rs.next()).isFalse();
+  private void assertRecordMatchesEmployee(ResultSet resultSet, String key, Employee employee)
+      throws SQLException {
+    assertThat(resultSet.next()).isTrue();
+    assertThat(resultSet.getString("id")).isEqualTo(key);
+    assertThat(resultSet.getString("name")).isEqualTo(employee.getName());
+    assertThat(resultSet.getObject("age")).isEqualTo(employee.getAge());
   }
 
   private Region<String, PdxInstance> createRegionWithJDBCAsyncWriter(String regionName) {
-    jdbcWriter = new JdbcAsyncWriter(createManager());
+    jdbcWriter = new JdbcAsyncWriter(createSqlHandler());
     cache.createAsyncEventQueueFactory().setBatchSize(1).setBatchTimeInterval(1)
         .create("jdbcAsyncQueue", jdbcWriter);
 
-    RegionFactory<String, PdxInstance> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
-    rf.addAsyncEventQueueId("jdbcAsyncQueue");
-    return rf.create(regionName);
+    RegionFactory<String, PdxInstance> regionFactory = cache.createRegionFactory(REPLICATE);
+    regionFactory.addAsyncEventQueueId("jdbcAsyncQueue");
+    return regionFactory.create(regionName);
   }
 
   private void validateTableRowCount(int expected) throws Exception {
-    ResultSet rs = statement.executeQuery("select count(*) from " + regionTableName);
-    rs.next();
-    int size = rs.getInt(1);
+    ResultSet resultSet = statement.executeQuery("select count(*) from " + REGION_TABLE_NAME);
+    resultSet.next();
+    int size = resultSet.getInt(1);
     assertThat(size).isEqualTo(expected);
   }
 
-  private ConnectionManager createManager() {
-    return new ConnectionManager(TestConfigService.getTestConfigService());
+  private SqlHandler createSqlHandler() {
+    return new SqlHandler(new ConnectionManager(TestConfigService.getTestConfigService()));
   }
 
-  public static class TestEmployee implements PdxSerializable {
-    private String name;
-    private int age;
-
-    TestEmployee(String name, int age) {
-      this.name = name;
-      this.age = age;
-    }
-
-    @Override
-    public void toData(PdxWriter writer) {
-      writer.writeString("name", this.name);
-      writer.writeInt("age", this.age);
-    }
-
-    @Override
-    public void fromData(PdxReader reader) {
-      this.name = reader.readString("name");
-      this.age = reader.readInt("age");
-    }
-  }
-
-
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterTest.java
new file mode 100644
index 0000000..7a867fb
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterTest.java
@@ -0,0 +1,89 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.asyncqueue.AsyncEvent;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.internal.cache.InternalRegion;
+
+public class JdbcAsyncWriterTest {
+
+  private SqlHandler sqlHandler;
+  private JdbcAsyncWriter writer;
+
+  @Before
+  public void setup() {
+    sqlHandler = mock(SqlHandler.class);
+    writer = new JdbcAsyncWriter(sqlHandler);
+  }
+
+  @Test
+  public void throwsNullPointerExceptionIfGivenNullList() {
+    assertThatThrownBy(() -> writer.processEvents(null)).isInstanceOf(NullPointerException.class);
+  }
+
+  @Test
+  public void doesNothingIfEventListIsEmpty() {
+    writer.processEvents(Collections.emptyList());
+
+    verifyZeroInteractions(sqlHandler);
+    assertThat(writer.getSuccessfulEvents()).isZero();
+    assertThat(writer.getTotalEvents()).isZero();
+  }
+
+  @Test
+  public void writesAProvidedEvent() {
+    writer.processEvents(Collections.singletonList(createMockEvent()));
+
+    verify(sqlHandler, times(1)).write(any(), any(), any(), any());
+    assertThat(writer.getSuccessfulEvents()).isEqualTo(1);
+    assertThat(writer.getTotalEvents()).isEqualTo(1);
+  }
+
+  @Test
+  public void writesMultipleProvidedEvents() {
+    List<AsyncEvent> events = new ArrayList<>();
+    events.add(createMockEvent());
+    events.add(createMockEvent());
+    events.add(createMockEvent());
+
+    writer.processEvents(events);
+
+    verify(sqlHandler, times(3)).write(any(), any(), any(), any());
+    assertThat(writer.getSuccessfulEvents()).isEqualTo(3);
+    assertThat(writer.getTotalEvents()).isEqualTo(3);
+  }
+
+  private AsyncEvent createMockEvent() {
+    AsyncEvent event = mock(AsyncEvent.class);
+    when(event.getRegion()).thenReturn(mock(InternalRegion.class));
+    return event;
+  }
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
index 06e56d2..7f9ddd6 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
@@ -32,26 +32,28 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.connectors.jdbc.internal.ConnectionManager;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
 import org.apache.geode.connectors.jdbc.internal.TestConfigService;
 import org.apache.geode.pdx.PdxInstance;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 
 @Category(IntegrationTest.class)
 public class JdbcLoaderIntegrationTest {
-  private JdbcLoader<String, String> jdbcLoader;
+
+  private static final String DB_NAME = "DerbyDB";
+  private static final String REGION_TABLE_NAME = "employees";
+  private static final String CONNECTION_URL = "jdbc:derby:memory:" + DB_NAME + ";create=true";
+
   private Cache cache;
   private Connection connection;
   private Statement statement;
-  private String dbName = "DerbyDB";
-  private String regionTableName = "employees";
-  private String connectionURL = "jdbc:derby:memory:" + dbName + ";create=true";
 
   @Before
   public void setup() throws Exception {
     cache = new CacheFactory().setPdxReadSerialized(false).create();
-    connection = DriverManager.getConnection(connectionURL);
+    connection = DriverManager.getConnection(CONNECTION_URL);
     statement = connection.createStatement();
-    statement.execute("Create Table " + regionTableName
+    statement.execute("Create Table " + REGION_TABLE_NAME
         + " (id varchar(10) primary key not null, name varchar(10), age int)");
   }
 
@@ -65,7 +67,7 @@ public class JdbcLoaderIntegrationTest {
     if (statement == null) {
       statement = connection.createStatement();
     }
-    statement.execute("Drop table " + regionTableName);
+    statement.execute("Drop table " + REGION_TABLE_NAME);
     statement.close();
 
     if (connection != null) {
@@ -73,32 +75,31 @@ public class JdbcLoaderIntegrationTest {
     }
   }
 
-  private Region createRegionWithJDBCLoader(String regionName) {
-    this.jdbcLoader = new JdbcLoader<>(createManager());
-    RegionFactory<String, String> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
-    rf.setCacheLoader(jdbcLoader);
-    return rf.create(regionName);
-  }
-
   @Test
   public void verifySimpleGet() throws SQLException {
-    statement.execute("Insert into " + regionTableName + " values('1', 'Emp1', 21)");
-    Region region = createRegionWithJDBCLoader(this.regionTableName);
-    Object result = region.get("1");
-    assertThat(result).isNotNull();
-    PdxInstance pdx = (PdxInstance) result;
+    statement.execute("Insert into " + REGION_TABLE_NAME + " values('1', 'Emp1', 21)");
+    Region<String, PdxInstance> region = createRegionWithJDBCLoader(REGION_TABLE_NAME);
+    PdxInstance pdx = region.get("1");
+
     assertThat(pdx.getField("name")).isEqualTo("Emp1");
     assertThat(pdx.getField("age")).isEqualTo(21);
   }
 
   @Test
   public void verifySimpleMiss() throws SQLException {
-    Region region = createRegionWithJDBCLoader(this.regionTableName);
-    Object result = region.get("1");
-    assertThat(result).isNull();
+    Region<String, PdxInstance> region = createRegionWithJDBCLoader(REGION_TABLE_NAME);
+    PdxInstance pdx = region.get("1");
+    assertThat(pdx).isNull();
+  }
+
+  private SqlHandler createSqlHandler() {
+    return new SqlHandler(new ConnectionManager(TestConfigService.getTestConfigService()));
   }
 
-  private ConnectionManager createManager() {
-    return new ConnectionManager(TestConfigService.getTestConfigService());
+  private Region<String, PdxInstance> createRegionWithJDBCLoader(String regionName) {
+    JdbcLoader<String, PdxInstance> jdbcLoader = new JdbcLoader<>(createSqlHandler());
+    RegionFactory<String, PdxInstance> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
+    rf.setCacheLoader(jdbcLoader);
+    return rf.create(regionName);
   }
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderTest.java
new file mode 100644
index 0000000..9817c37
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderTest.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+
+import org.apache.geode.cache.LoaderHelper;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.internal.cache.InternalRegion;
+
+public class JdbcLoaderTest {
+
+  @Test
+  public void loadReadsFromSqlHandler() {
+    SqlHandler sqlHandler = mock(SqlHandler.class);
+    JdbcLoader<Object, Object> loader = new JdbcLoader<>(sqlHandler);
+    LoaderHelper loaderHelper = mock(LoaderHelper.class);
+    when(loaderHelper.getRegion()).thenReturn(mock(InternalRegion.class));
+    loader.load(loaderHelper);
+    verify(sqlHandler, times(1)).read(any(), any());
+  }
+
+}
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcSynchronousWriterIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcSynchronousWriterIntegrationTest.java
index 248fcf5..54f6329 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcSynchronousWriterIntegrationTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcSynchronousWriterIntegrationTest.java
@@ -26,7 +26,9 @@ import java.sql.Statement;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
+import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -38,6 +40,7 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.connectors.jdbc.internal.ConnectionManager;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
 import org.apache.geode.connectors.jdbc.internal.TestConfigService;
 import org.apache.geode.pdx.PdxInstance;
 import org.apache.geode.pdx.PdxReader;
@@ -49,27 +52,34 @@ import org.apache.geode.test.junit.categories.IntegrationTest;
 @Category(IntegrationTest.class)
 public class JdbcSynchronousWriterIntegrationTest {
 
-  private Cache cache;
+  private static final String DB_NAME = "DerbyDB";
+  private static final String REGION_TABLE_NAME = "employees";
+  private static final String CONNECTION_URL = "jdbc:derby:memory:" + DB_NAME + ";create=true";
 
+  private Cache cache;
+  private Region<String, PdxInstance> employees;
   private Connection connection;
-
   private Statement statement;
-
   private JdbcSynchronousWriter jdbcWriter;
-
-  private String dbName = "DerbyDB";
-
-  private String regionTableName = "employees";
-
-  private String connectionURL = "jdbc:derby:memory:" + dbName + ";create=true";
+  private PdxInstance pdx1;
+  private PdxInstance pdx2;
+  private Employee employee1;
+  private Employee employee2;
 
   @Before
   public void setup() throws Exception {
     cache = new CacheFactory().setPdxReadSerialized(false).create();
-    connection = DriverManager.getConnection(connectionURL);
+    employees = createRegionWithJDBCSynchronousWriter(REGION_TABLE_NAME);
+    connection = DriverManager.getConnection(CONNECTION_URL);
     statement = connection.createStatement();
-    statement.execute("Create Table " + regionTableName
+    statement.execute("Create Table " + REGION_TABLE_NAME
         + " (id varchar(10) primary key not null, name varchar(10), age int)");
+    pdx1 = cache.createPdxInstanceFactory(Employee.class.getName()).writeString("name", "Emp1")
+        .writeInt("age", 55).create();
+    pdx2 = cache.createPdxInstanceFactory(Employee.class.getName()).writeString("name", "Emp2")
+        .writeInt("age", 21).create();
+    employee1 = (Employee) pdx1.getObject();
+    employee2 = (Employee) pdx2.getObject();
   }
 
   @After
@@ -82,7 +92,7 @@ public class JdbcSynchronousWriterIntegrationTest {
     if (statement == null) {
       statement = connection.createStatement();
     }
-    statement.execute("Drop table " + regionTableName);
+    statement.execute("Drop table " + REGION_TABLE_NAME);
     statement.close();
 
     if (connection != null) {
@@ -90,241 +100,141 @@ public class JdbcSynchronousWriterIntegrationTest {
     }
   }
 
-  private Properties getRequiredProperties() {
-    Properties props = new Properties();
-    props.setProperty("url", this.connectionURL);
-    return props;
-  }
-
   @Test
   public void canInsertIntoTable() throws Exception {
-    Region<String, PdxInstance> employees =
-        createRegionWithJDBCSynchronousWriter(regionTableName, getRequiredProperties());
-    PdxInstance pdx1 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
-        .writeInt("age", 55).create();
-    PdxInstance pdx2 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp2")
-        .writeInt("age", 21).create();
     employees.put("1", pdx1);
     employees.put("2", pdx2);
 
-    ResultSet rs = statement.executeQuery("select * from " + regionTableName + " order by id asc");
-    assertThat(rs.next()).isTrue();
-    assertThat(rs.getString("id")).isEqualTo("1");
-    assertThat(rs.getString("name")).isEqualTo("Emp1");
-    assertThat(rs.getObject("age")).isEqualTo(55);
-    assertThat(rs.next()).isTrue();
-    assertThat(rs.getString("id")).isEqualTo("2");
-    assertThat(rs.getString("name")).isEqualTo("Emp2");
-    assertThat(rs.getObject("age")).isEqualTo(21);
-    assertThat(rs.next()).isFalse();
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee1);
+    assertRecordMatchesEmployee(resultSet, "2", employee2);
+    assertThat(resultSet.next()).isFalse();
   }
 
   @Test
   public void canPutAllInsertIntoTable() throws Exception {
-    Region<String, PdxInstance> employees =
-        createRegionWithJDBCSynchronousWriter(regionTableName, getRequiredProperties());
-    PdxInstance pdx1 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
-        .writeInt("age", 55).create();
-    PdxInstance pdx2 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp2")
-        .writeInt("age", 21).create();
     Map<String, PdxInstance> putAllMap = new HashMap<>();
     putAllMap.put("1", pdx1);
     putAllMap.put("2", pdx2);
     employees.putAll(putAllMap);
 
-    ResultSet rs = statement.executeQuery("select * from " + regionTableName + " order by id asc");
-    assertThat(rs.next()).isTrue();
-    assertThat(rs.getString("id")).isEqualTo("1");
-    assertThat(rs.getString("name")).isEqualTo("Emp1");
-    assertThat(rs.getObject("age")).isEqualTo(55);
-    assertThat(rs.next()).isTrue();
-    assertThat(rs.getString("id")).isEqualTo("2");
-    assertThat(rs.getString("name")).isEqualTo("Emp2");
-    assertThat(rs.getObject("age")).isEqualTo(21);
-    assertThat(rs.next()).isFalse();
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee1);
+    assertRecordMatchesEmployee(resultSet, "2", employee2);
+    assertThat(resultSet.next()).isFalse();
   }
 
   @Test
   public void verifyThatPdxFieldNamedSameAsPrimaryKeyIsIgnored() throws Exception {
-    Region<String, PdxInstance> employees =
-        createRegionWithJDBCSynchronousWriter(regionTableName, getRequiredProperties());
-    PdxInstance pdx1 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
-        .writeInt("age", 55).writeInt("id", 3).create();
-    employees.put("1", pdx1);
+    PdxInstance pdxInstanceWithId = cache.createPdxInstanceFactory(Employee.class.getName())
+        .writeString("name", "Emp1").writeInt("age", 55).writeInt("id", 3).create();
+    employees.put("1", pdxInstanceWithId);
 
-    ResultSet rs = statement.executeQuery("select * from " + regionTableName + " order by id asc");
-    assertThat(rs.next()).isTrue();
-    assertThat(rs.getString("id")).isEqualTo("1");
-    assertThat(rs.getString("name")).isEqualTo("Emp1");
-    assertThat(rs.getObject("age")).isEqualTo(55);
-    assertThat(rs.next()).isFalse();
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", (Employee) pdxInstanceWithId.getObject());
+    assertThat(resultSet.next()).isFalse();
   }
 
   @Test
   public void putNonPdxInstanceFails() {
-    Region employees =
-        createRegionWithJDBCSynchronousWriter(regionTableName, getRequiredProperties());
-    catchException(employees).put("1", "non pdx instance");
-    assertThat((Exception) caughtException()).isInstanceOf(ClassCastException.class);
-    assertThat(caughtException().getMessage())
-        .isEqualTo("java.lang.String cannot be cast to org.apache.geode.pdx.PdxInstance");
+    Region nonPdxEmployees = this.employees;
+    catchException(nonPdxEmployees).put("1", "non pdx instance");
+    assertThat((Exception) caughtException()).isInstanceOf(IllegalArgumentException.class);
   }
 
   @Test
   public void putNonPdxInstanceThatIsPdxSerializable() throws SQLException {
-    Region employees =
-        createRegionWithJDBCSynchronousWriter(regionTableName, getRequiredProperties());
-    Object value = new TestEmployee("Emp2", 22);
-    employees.put("2", value);
-
-    ResultSet rs = statement.executeQuery("select * from " + regionTableName + " order by id asc");
-    assertThat(rs.next()).isTrue();
-    assertThat(rs.getString("id")).isEqualTo("2");
-    assertThat(rs.getString("name")).isEqualTo("Emp2");
-    assertThat(rs.getObject("age")).isEqualTo(22);
-    assertThat(rs.next()).isFalse();
-  }
+    Region nonPdxEmployees = this.employees;
+    Employee value = new Employee("Emp2", 22);
+    nonPdxEmployees.put("2", value);
 
-  public static class TestEmployee implements PdxSerializable {
-    private String name;
-    private int age;
-
-    TestEmployee(String name, int age) {
-      this.name = name;
-      this.age = age;
-    }
-
-    @Override
-    public void toData(PdxWriter writer) {
-      writer.writeString("name", this.name);
-      writer.writeInt("age", this.age);
-    }
-
-    @Override
-    public void fromData(PdxReader reader) {
-      this.name = reader.readString("name");
-      this.age = reader.readInt("age");
-    }
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "2", value);
+    assertThat(resultSet.next()).isFalse();
   }
 
   @Test
   public void canDestroyFromTable() throws Exception {
-    Region<String, PdxInstance> employees =
-        createRegionWithJDBCSynchronousWriter(regionTableName, getRequiredProperties());
-    PdxInstance pdx1 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
-        .writeInt("age", 55).create();
-    PdxInstance pdx2 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp2")
-        .writeInt("age", 21).create();
     employees.put("1", pdx1);
     employees.put("2", pdx2);
 
-    try {
-      employees.destroy("1");
-    } catch (PdxSerializationException ignore) {
-      // destroy tries to deserialize old value
-      // which does not work because our PdxInstance
-      // does not have a real class
-    }
+    employees.destroy("1");
 
-    ResultSet rs = statement.executeQuery("select * from " + regionTableName + " order by id asc");
-    assertThat(rs.next()).isTrue();
-    assertThat(rs.getString("id")).isEqualTo("2");
-    assertThat(rs.getString("name")).isEqualTo("Emp2");
-    assertThat(rs.getObject("age")).isEqualTo(21);
-    assertThat(rs.next()).isFalse();
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "2", employee2);
+    assertThat(resultSet.next()).isFalse();
   }
 
   @Test
   public void canUpdateTable() throws Exception {
-    Region<String, PdxInstance> employees =
-        createRegionWithJDBCSynchronousWriter(regionTableName, getRequiredProperties());
-    PdxInstance pdx1 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
-        .writeInt("age", 55).create();
     employees.put("1", pdx1);
+    employees.put("1", pdx2);
 
-    PdxInstance pdx3 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
-        .writeInt("age", 72).create();
-    try {
-      employees.put("1", pdx3);
-    } catch (PdxSerializationException ignore) {
-      // put tries to deserialize old value
-      // which does not work because our PdxInstance
-      // does not have a real class
-    }
-
-    ResultSet rs = statement.executeQuery("select * from " + regionTableName + " order by id asc");
-    assertThat(rs.next()).isTrue();
-    assertThat(rs.getObject("age")).isEqualTo(72);
-    assertThat(rs.next()).isFalse();
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee2);
+    assertThat(resultSet.next()).isFalse();
   }
 
   @Test
   public void canUpdateBecomeInsert() throws Exception {
-    Region<String, PdxInstance> employees =
-        createRegionWithJDBCSynchronousWriter(regionTableName, getRequiredProperties());
-    PdxInstance pdx1 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
-        .writeInt("age", 55).create();
     employees.put("1", pdx1);
 
-    statement.execute("delete from " + regionTableName + " where id = '1'");
+    statement.execute("delete from " + REGION_TABLE_NAME + " where id = '1'");
     validateTableRowCount(0);
 
-    PdxInstance pdx3 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
-        .writeInt("age", 72).create();
-    try {
-      employees.put("1", pdx3);
-    } catch (PdxSerializationException ignore) {
-      // put tries to deserialize old value
-      // which does not work because our PdxInstance
-      // does not have a real class
-    }
+    employees.put("1", pdx2);
 
-    ResultSet rs = statement.executeQuery("select * from " + regionTableName + " order by id asc");
-    assertThat(rs.next()).isTrue();
-    assertThat(rs.getString("id")).isEqualTo("1");
-    assertThat(rs.getString("name")).isEqualTo("Emp1");
-    assertThat(rs.getObject("age")).isEqualTo(72);
-    assertThat(rs.next()).isFalse();
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee2);
+    assertThat(resultSet.next()).isFalse();
   }
 
   @Test
   public void canInsertBecomeUpdate() throws Exception {
-    statement.execute("Insert into " + regionTableName + " values('1', 'bogus', 11)");
+    statement.execute("Insert into " + REGION_TABLE_NAME + " values('1', 'bogus', 11)");
     validateTableRowCount(1);
 
-    Region<String, PdxInstance> employees =
-        createRegionWithJDBCSynchronousWriter(regionTableName, getRequiredProperties());
-    PdxInstance pdx1 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
-        .writeInt("age", 55).create();
     employees.put("1", pdx1);
 
-    ResultSet rs = statement.executeQuery("select * from " + regionTableName + " order by id asc");
-    assertThat(rs.next()).isTrue();
-    assertThat(rs.getString("id")).isEqualTo("1");
-    assertThat(rs.getString("name")).isEqualTo("Emp1");
-    assertThat(rs.getObject("age")).isEqualTo(55);
-    assertThat(rs.next()).isFalse();
+    ResultSet resultSet =
+        statement.executeQuery("select * from " + REGION_TABLE_NAME + " order by id asc");
+    assertRecordMatchesEmployee(resultSet, "1", employee1);
+    assertThat(resultSet.next()).isFalse();
   }
 
-  private Region<String, PdxInstance> createRegionWithJDBCSynchronousWriter(String regionName,
-      Properties props) {
-    jdbcWriter = new JdbcSynchronousWriter(createManager());
-    jdbcWriter.init(props);
+  private Region<String, PdxInstance> createRegionWithJDBCSynchronousWriter(String regionName) {
+    jdbcWriter = new JdbcSynchronousWriter(createSqlHandler());
+    jdbcWriter.init(new Properties());
 
-    RegionFactory<String, PdxInstance> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
-    rf.setCacheWriter(jdbcWriter);
-    return rf.create(regionName);
+    RegionFactory<String, PdxInstance> regionFactory =
+        cache.createRegionFactory(RegionShortcut.REPLICATE);
+    regionFactory.setCacheWriter(jdbcWriter);
+    return regionFactory.create(regionName);
   }
 
   private void validateTableRowCount(int expected) throws Exception {
-    ResultSet rs = statement.executeQuery("select count(*) from " + regionTableName);
-    rs.next();
-    int size = rs.getInt(1);
+    ResultSet resultSet = statement.executeQuery("select count(*) from " + REGION_TABLE_NAME);
+    resultSet.next();
+    int size = resultSet.getInt(1);
     assertThat(size).isEqualTo(expected);
   }
 
-  private ConnectionManager createManager() {
-    return new ConnectionManager(TestConfigService.getTestConfigService());
+  private SqlHandler createSqlHandler() {
+    return new SqlHandler(new ConnectionManager(TestConfigService.getTestConfigService()));
   }
 
+  private void assertRecordMatchesEmployee(ResultSet resultSet, String key, Employee employee)
+      throws SQLException {
+    assertThat(resultSet.next()).isTrue();
+    assertThat(resultSet.getString("id")).isEqualTo(key);
+    assertThat(resultSet.getString("name")).isEqualTo(employee.getName());
+    assertThat(resultSet.getObject("age")).isEqualTo(employee.getAge());
+  }
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcSynchronousWriterTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcSynchronousWriterTest.java
new file mode 100644
index 0000000..9a3fcfc
--- /dev/null
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcSynchronousWriterTest.java
@@ -0,0 +1,115 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by
+ * applicable law or agreed to in writing, software distributed under the License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied.
+ * See the License for the specific language governing permissions and limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.SerializedCacheValue;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.pdx.PdxInstance;
+
+public class JdbcSynchronousWriterTest {
+
+  private EntryEvent<Object, Object> entryEvent;
+  private PdxInstance pdxInstance;
+  private SqlHandler sqlHandler;
+
+  @Before
+  public void setUp() {
+    entryEvent = mock(EntryEvent.class);
+    pdxInstance = mock(PdxInstance.class);
+    SerializedCacheValue<Object> serializedNewValue = mock(SerializedCacheValue.class);
+    sqlHandler = mock(SqlHandler.class);
+
+    when(entryEvent.getRegion()).thenReturn(mock(InternalRegion.class));
+    when(entryEvent.getSerializedNewValue()).thenReturn(serializedNewValue);
+    when(serializedNewValue.getDeserializedValue()).thenReturn(pdxInstance);
+
+  }
+
+  @Test
+  public void beforeUpdateWithPdxInstanceWritesToSqlHandler() {
+    JdbcSynchronousWriter<Object, Object> writer = new JdbcSynchronousWriter<>(sqlHandler);
+
+    writer.beforeUpdate(entryEvent);
+
+    verify(sqlHandler, times(1)).write(any(), any(), any(), eq(pdxInstance));
+  }
+
+  @Test
+  public void beforeUpdateWithoutPdxInstanceWritesToSqlHandler() {
+    EntryEvent<Object, Object> entryEvent = mock(EntryEvent.class);
+    Object value = new Object();
+    SerializedCacheValue<Object> serializedNewValue = mock(SerializedCacheValue.class);
+    SqlHandler sqlHander = mock(SqlHandler.class);
+
+    when(entryEvent.getRegion()).thenReturn(mock(InternalRegion.class));
+    when(entryEvent.getSerializedNewValue()).thenReturn(serializedNewValue);
+    when(serializedNewValue.getDeserializedValue()).thenReturn(value);
+
+    JdbcSynchronousWriter<Object, Object> writer = new JdbcSynchronousWriter<>(sqlHander);
+
+    assertThatThrownBy(() -> writer.beforeUpdate(entryEvent))
+        .isInstanceOf(IllegalArgumentException.class);
+  }
+
+  @Test
+  public void beforeCreateWithPdxInstanceWritesToSqlHandler() {
+    JdbcSynchronousWriter<Object, Object> writer = new JdbcSynchronousWriter<>(sqlHandler);
+
+    writer.beforeCreate(entryEvent);
+
+    verify(sqlHandler, times(1)).write(any(), any(), any(), eq(pdxInstance));
+  }
+
+  @Test
+  public void beforeDestroyWithPdxInstanceWritesToSqlHandler() {
+    JdbcSynchronousWriter<Object, Object> writer = new JdbcSynchronousWriter<>(sqlHandler);
+
+    writer.beforeDestroy(entryEvent);
+
+    verify(sqlHandler, times(1)).write(any(), any(), any(), eq(pdxInstance));
+  }
+
+  @Test
+  public void beforeRegionDestroyDoesNotWriteToSqlHandler() {
+    JdbcSynchronousWriter<Object, Object> writer = new JdbcSynchronousWriter<>(sqlHandler);
+
+    writer.beforeRegionDestroy(mock(RegionEvent.class));
+
+    verifyZeroInteractions(sqlHandler);
+  }
+
+  @Test
+  public void beforeRegionClearDoesNotWriteToSqlHandler() {
+    JdbcSynchronousWriter<Object, Object> writer = new JdbcSynchronousWriter<>(sqlHandler);
+
+    writer.beforeRegionClear(mock(RegionEvent.class));
+
+    verifyZeroInteractions(sqlHandler);
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <commits@geode.apache.org>'].

Mime
View raw message