geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jche...@apache.org
Subject [geode] branch develop updated: GEODE-6461: Report errors when table metadata and region mapping do not match (#3256)
Date Tue, 12 Mar 2019 17:37:50 GMT
This is an automated email from the ASF dual-hosted git repository.

jchen21 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 18f0f21  GEODE-6461: Report errors when table metadata and region mapping do not match (#3256)
18f0f21 is described below

commit 18f0f214ae8ddc9da8d43f4ae7403d563bb92523
Author: BenjaminPerryRoss <39068135+BenjaminPerryRoss@users.noreply.github.com>
AuthorDate: Tue Mar 12 10:37:35 2019 -0700

    GEODE-6461: Report errors when table metadata and region mapping do not match (#3256)
    
    Co-authored-by: Ben Ross <bross@pivotal.io>
    Co-authored-by: Darrel Schneider <dschneider@pivotal.io>
    Co-authored-by: Jianxia Chen <jchen@pivotal.io>
---
 .../jdbc/JdbcAsyncWriterIntegrationTest.java       |  25 +-
 .../geode/connectors/jdbc/JdbcDistributedTest.java |  81 +++++++
 .../connectors/jdbc/JdbcLoaderIntegrationTest.java |   2 +-
 .../connectors/jdbc/JdbcWriterIntegrationTest.java |  37 +--
 .../jdbc/MySqlJdbcLoaderIntegrationTest.java       |   6 +-
 .../jdbc/PostgresJdbcLoaderIntegrationTest.java    |   6 +-
 .../jdbc/PostgresJdbcWriterIntegrationTest.java    |   6 +-
 .../jdbc/internal/JdbcConnectorService.java        |   6 +
 .../jdbc/internal/JdbcConnectorServiceImpl.java    | 115 +++++++++
 .../geode/connectors/jdbc/internal/SqlHandler.java |  58 ++---
 .../connectors/jdbc/internal/SqlToPdxInstance.java |   6 +-
 .../jdbc/internal/cli/DescribeMappingCommand.java  |   2 +
 .../jdbc/internal/configuration/RegionMapping.java |  51 ++--
 .../internal/xml/RegionMappingConfiguration.java   |   1 +
 .../connectors/util/internal/MappingConstants.java |   1 +
 .../geode.apache.org/schema/jdbc/jdbc-1.0.xsd      |   1 +
 .../sanctioned-geode-connectors-serializables.txt  |   2 +-
 .../jdbc/internal/JdbcConnectorServiceTest.java    | 156 +++++++++++-
 .../jdbc/internal/RegionMappingTest.java           |   4 +-
 .../connectors/jdbc/internal/SqlHandlerTest.java   | 265 ++++++++++++++-------
 .../jdbc/internal/SqlToPdxInstanceTest.java        |  14 +-
 .../internal/cli/DescribeMappingCommandTest.java   |  54 ++++-
 .../geode/internal/cache/GemFireCacheImpl.java     |   1 -
 23 files changed, 688 insertions(+), 212 deletions(-)

diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
index 89f2af6..3b82467 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
@@ -40,6 +40,7 @@ import org.apache.geode.connectors.jdbc.internal.TableMetaDataManager;
 import org.apache.geode.connectors.jdbc.internal.TestConfigService;
 import org.apache.geode.connectors.jdbc.internal.configuration.FieldMapping;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.pdx.FieldType;
 import org.apache.geode.pdx.PdxInstance;
 import org.apache.geode.pdx.WritablePdxInstance;
 
@@ -78,9 +79,9 @@ public abstract class JdbcAsyncWriterIntegrationTest {
 
   private void setupRegion(String ids) throws RegionMappingExistsException {
     List<FieldMapping> fieldMappings = Arrays.asList(
-        new FieldMapping("", "", "id", JDBCType.VARCHAR.name(), false),
-        new FieldMapping("", "", "name", JDBCType.VARCHAR.name(), true),
-        new FieldMapping("", "", "age", JDBCType.INTEGER.name(), true));
+        new FieldMapping("id", FieldType.STRING.name(), "id", JDBCType.VARCHAR.name(), false),
+        new FieldMapping("name", FieldType.STRING.name(), "name", JDBCType.VARCHAR.name(), true),
+        new FieldMapping("age", FieldType.OBJECT.name(), "age", JDBCType.INTEGER.name(), true));
     employees = createRegionWithJDBCAsyncWriter(REGION_TABLE_NAME, ids, fieldMappings);
   }
 
@@ -110,7 +111,7 @@ public abstract class JdbcAsyncWriterIntegrationTest {
 
   @Test
   public void validateJDBCAsyncWriterTotalEvents() throws RegionMappingExistsException {
-    setupRegion(null);
+    setupRegion("id");
     employees.put("1", pdxEmployee1);
     employees.put("2", pdxEmployee2);
 
@@ -119,7 +120,7 @@ public abstract class JdbcAsyncWriterIntegrationTest {
 
   @Test
   public void verifyThatPdxFieldNamedSameAsPrimaryKeyIsIgnored() throws Exception {
-    setupRegion(null);
+    setupRegion("id");
     PdxInstance pdx1 = cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
         .writeObject("age", 55).writeInt("id", 3).create();
     employees.put("1", pdx1);
@@ -134,7 +135,7 @@ public abstract class JdbcAsyncWriterIntegrationTest {
 
   @Test
   public void putNonPdxInstanceFails() throws RegionMappingExistsException {
-    setupRegion(null);
+    setupRegion("id");
     Region nonPdxEmployees = this.employees;
     nonPdxEmployees.put("1", "non pdx instance");
 
@@ -146,7 +147,7 @@ public abstract class JdbcAsyncWriterIntegrationTest {
   @Test
   public void putNonPdxInstanceThatIsPdxSerializable()
       throws SQLException, RegionMappingExistsException {
-    setupRegion(null);
+    setupRegion("id");
     Region nonPdxEmployees = this.employees;
     Employee value = new Employee("2", "Emp2", 22);
     nonPdxEmployees.put("2", value);
@@ -161,7 +162,7 @@ public abstract class JdbcAsyncWriterIntegrationTest {
 
   @Test
   public void canDestroyFromTable() throws Exception {
-    setupRegion(null);
+    setupRegion("id");
     employees.put("1", pdxEmployee1);
     employees.put("2", pdxEmployee2);
 
@@ -201,7 +202,7 @@ public abstract class JdbcAsyncWriterIntegrationTest {
 
   @Test
   public void canInsertIntoTable() throws Exception {
-    setupRegion(null);
+    setupRegion("id");
     employees.put("1", pdxEmployee1);
     employees.put("2", pdxEmployee2);
     awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
@@ -236,7 +237,7 @@ public abstract class JdbcAsyncWriterIntegrationTest {
 
   @Test
   public void canUpdateTable() throws Exception {
-    setupRegion(null);
+    setupRegion("id");
     employees.put("1", pdxEmployee1);
 
     awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
@@ -277,7 +278,7 @@ public abstract class JdbcAsyncWriterIntegrationTest {
 
   @Test
   public void canUpdateBecomeInsert() throws Exception {
-    setupRegion(null);
+    setupRegion("id");
     employees.put("1", pdxEmployee1);
 
     awaitUntil(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
@@ -297,7 +298,7 @@ public abstract class JdbcAsyncWriterIntegrationTest {
 
   @Test
   public void canInsertBecomeUpdate() throws Exception {
-    setupRegion(null);
+    setupRegion("id");
     statement.execute("Insert into " + REGION_TABLE_NAME + " values('1', 'bogus', 11)");
     validateTableRowCount(1);
 
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java
index 49ba259..8c22df1 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcDistributedTest.java
@@ -96,6 +96,13 @@ public abstract class JdbcDistributedTest implements Serializable {
         + " (id varchar(10) primary key not null, name varchar(10), age int not null)");
   }
 
+  private void alterTable() throws SQLException {
+    Connection connection = getConnection();
+    Statement statement = connection.createStatement();
+    statement.execute("Alter Table " + TABLE_NAME
+        + " add column new_column varchar(10)");
+  }
+
   private void createTableForAllSupportedFields() throws SQLException {
     server = startupRule.startServerVM(1,
         x -> x.withConnectionToLocator(locator.getPort()).withPDXReadSerialized());
@@ -223,6 +230,80 @@ public abstract class JdbcDistributedTest implements Serializable {
   }
 
   @Test
+  public void throwsExceptionWhenMappingDoesNotMatchTableDefinitionOnInitialOperation()
+      throws Exception {
+    IgnoredException.addIgnoredException(
+        "Error detected when comparing mapping for region \"employees\" with table definition:");
+    createTable();
+    createRegionUsingGfsh();
+    createJdbcDataSource();
+    createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
+    alterTable();
+    server.invoke(() -> {
+      PdxInstance pdxEmployee1 =
+          ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
+              .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
+
+      String key = "id1";
+      Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
+      assertThatThrownBy(() -> region.put(key, pdxEmployee1))
+          .isExactlyInstanceOf(JdbcConnectorException.class).hasMessage(
+              "Jdbc mapping for \"" + REGION_NAME
+                  + "\" does not match table definition, check logs for more details.");
+    });
+  }
+
+  @Test
+  public void throwsExceptionWhenMappingDoesNotMatchTableDefinitionOnLoaderAlreadyInitialized()
+      throws Exception {
+    IgnoredException.addIgnoredException(
+        "Error detected when comparing mapping for region \"employees\" with table definition:");
+    createTable();
+    createRegionUsingGfsh();
+    createJdbcDataSource();
+    createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
+    server.invoke(() -> {
+      PdxInstance pdxEmployee1 =
+          ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
+              .writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
+
+      String key = "id1";
+      Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
+      region.put(key, pdxEmployee1); // this initializes the writer
+      region.invalidate(key);
+      region.get(key); // this initializes the loader
+      region.invalidate(key);
+    });
+    alterTable();
+    server.invoke(() -> {
+      String key = "id1";
+      Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
+      assertThatThrownBy(() -> region.get(key))
+          .isExactlyInstanceOf(JdbcConnectorException.class).hasMessage(
+              "The jdbc-mapping does not contain the column name \"new_column\"."
+                  + " This is probably caused by a column being added to the table after the jdbc-mapping was created.");
+    });
+  }
+
+  @Test
+  public void throwsExceptionWhenMappingDoesNotMatchTableDefinitionOnServerStartup()
+      throws Exception {
+    IgnoredException.addIgnoredException(
+        "Error detected when comparing mapping for region \"employees\" with table definition:");
+    IgnoredException.addIgnoredException(
+        "Jdbc mapping for \"employees\" does not match table definition, check logs for more details.");
+    createTable();
+    createRegionUsingGfsh();
+    createJdbcDataSource();
+    createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
+    alterTable();
+    assertThatThrownBy(
+        () -> startupRule.startServerVM(2, x -> x.withConnectionToLocator(locator.getPort())))
+            .hasCauseExactlyInstanceOf(JdbcConnectorException.class).hasStackTraceContaining(
+                "Jdbc mapping for \"employees\" does not match table definition, check logs for more details.");
+  }
+
+  @Test
   public void throwsExceptionWhenNoDataSourceExists() throws Exception {
     createTable();
     createRegionUsingGfsh();
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
index 057e367..1a5ab83 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
@@ -256,7 +256,7 @@ public abstract class JdbcLoaderIntegrationTest {
   protected <K, V> Region<K, V> createRegionWithJDBCLoader(String regionName, String pdxClassName,
       List<FieldMapping> fieldMappings)
       throws RegionMappingExistsException {
-    return createRegionWithJDBCLoader(regionName, pdxClassName, null, null, null, fieldMappings);
+    return createRegionWithJDBCLoader(regionName, pdxClassName, "id", null, null, fieldMappings);
   }
 
   protected ClassWithSupportedPdxFields createClassWithSupportedPdxFieldsForInsert(String key) {
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java
index 86d163f..1a7ca72 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/JdbcWriterIntegrationTest.java
@@ -27,6 +27,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import javax.sql.DataSource;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -41,6 +43,7 @@ import org.apache.geode.connectors.jdbc.internal.TableMetaDataManager;
 import org.apache.geode.connectors.jdbc.internal.TestConfigService;
 import org.apache.geode.connectors.jdbc.internal.configuration.FieldMapping;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.pdx.FieldType;
 import org.apache.geode.pdx.PdxInstance;
 import org.apache.geode.pdx.WritablePdxInstance;
 
@@ -108,9 +111,9 @@ public abstract class JdbcWriterIntegrationTest {
   protected void sharedRegionSetup(String ids, String catalog, String schema)
       throws RegionMappingExistsException {
     List<FieldMapping> fieldMappings = Arrays.asList(
-        new FieldMapping("", "", "id", JDBCType.VARCHAR.name(), false),
-        new FieldMapping("", "", "name", JDBCType.VARCHAR.name(), true),
-        new FieldMapping("", "", "age", JDBCType.INTEGER.name(), true));
+        new FieldMapping("id", FieldType.STRING.name(), "id", JDBCType.VARCHAR.name(), false),
+        new FieldMapping("name", FieldType.STRING.name(), "name", JDBCType.VARCHAR.name(), true),
+        new FieldMapping("age", FieldType.OBJECT.name(), "age", JDBCType.INTEGER.name(), true));
     employees = createRegionWithJDBCSynchronousWriter(REGION_TABLE_NAME, ids, catalog, schema,
         fieldMappings);
   }
@@ -160,7 +163,9 @@ public abstract class JdbcWriterIntegrationTest {
   @Test
   public void canInsertIntoTable() throws Exception {
     createTable();
-    setupRegion(null);
+    DataSource dataSource = testDataSourceFactory.getDataSource("testConnectionConfig");
+    setupRegion("id");
+
     employees.put("1", pdx1);
     employees.put("2", pdx2);
 
@@ -169,6 +174,8 @@ public abstract class JdbcWriterIntegrationTest {
     assertRecordMatchesEmployee(resultSet, "1", employee1);
     assertRecordMatchesEmployee(resultSet, "2", employee2);
     assertThat(resultSet.next()).isFalse();
+
+    dataSource.getConnection();
   }
 
   protected abstract boolean vendorSupportsSchemas();
@@ -176,7 +183,7 @@ public abstract class JdbcWriterIntegrationTest {
   @Test
   public void canInsertIntoTableWithSchema() throws Exception {
     createTableWithSchema();
-    setupRegionWithSchema(null);
+    setupRegionWithSchema("id");
     employees.put("1", pdx1);
     employees.put("2", pdx2);
 
@@ -212,7 +219,7 @@ public abstract class JdbcWriterIntegrationTest {
   @Test
   public void canPutAllInsertIntoTable() throws Exception {
     createTable();
-    setupRegion(null);
+    setupRegion("id");
     Map<String, PdxInstance> putAllMap = new HashMap<>();
     putAllMap.put("1", pdx1);
     putAllMap.put("2", pdx2);
@@ -228,7 +235,7 @@ public abstract class JdbcWriterIntegrationTest {
   @Test
   public void verifyThatPdxFieldNamedSameAsPrimaryKeyIsIgnored() throws Exception {
     createTable();
-    setupRegion(null);
+    setupRegion("id");
     PdxInstance pdxInstanceWithId = cache.createPdxInstanceFactory(Employee.class.getName())
         .writeString("name", "Emp1").writeInt("age", 55).writeString("id", "3").create();
     employees.put("1", pdxInstanceWithId);
@@ -242,7 +249,7 @@ public abstract class JdbcWriterIntegrationTest {
   @Test
   public void putNonPdxInstanceFails() throws Exception {
     createTable();
-    setupRegion(null);
+    setupRegion("id");
     Region nonPdxEmployees = this.employees;
     Throwable thrown = catchThrowable(() -> nonPdxEmployees.put("1", "non pdx instance"));
     assertThat(thrown).isInstanceOf(IllegalArgumentException.class);
@@ -252,7 +259,7 @@ public abstract class JdbcWriterIntegrationTest {
   public void putNonPdxInstanceThatIsPdxSerializable()
       throws SQLException, RegionMappingExistsException {
     createTable();
-    setupRegion(null);
+    setupRegion("id");
     Region nonPdxEmployees = this.employees;
     Employee value = new Employee("2", "Emp2", 22);
     nonPdxEmployees.put("2", value);
@@ -266,7 +273,7 @@ public abstract class JdbcWriterIntegrationTest {
   @Test
   public void canDestroyFromTable() throws Exception {
     createTable();
-    setupRegion(null);
+    setupRegion("id");
     employees.put("1", pdx1);
     employees.put("2", pdx2);
 
@@ -281,7 +288,7 @@ public abstract class JdbcWriterIntegrationTest {
   @Test
   public void canDestroyFromTableWithSchema() throws Exception {
     createTableWithSchema();
-    setupRegionWithSchema(null);
+    setupRegionWithSchema("id");
     employees.put("1", pdx1);
     employees.put("2", pdx2);
 
@@ -318,7 +325,7 @@ public abstract class JdbcWriterIntegrationTest {
   @Test
   public void canUpdateTable() throws Exception {
     createTable();
-    setupRegion(null);
+    setupRegion("id");
     employees.put("1", pdx1);
     employees.put("1", pdx2);
 
@@ -331,7 +338,7 @@ public abstract class JdbcWriterIntegrationTest {
   @Test
   public void canUpdateTableWithSchema() throws Exception {
     createTableWithSchema();
-    setupRegionWithSchema(null);
+    setupRegionWithSchema("id");
     employees.put("1", pdx1);
     employees.put("1", pdx2);
 
@@ -368,7 +375,7 @@ public abstract class JdbcWriterIntegrationTest {
   @Test
   public void canUpdateBecomeInsert() throws Exception {
     createTable();
-    setupRegion(null);
+    setupRegion("id");
     employees.put("1", pdx1);
 
     statement.execute("delete from " + REGION_TABLE_NAME + " where id = '1'");
@@ -385,7 +392,7 @@ public abstract class JdbcWriterIntegrationTest {
   @Test
   public void canInsertBecomeUpdate() throws Exception {
     createTable();
-    setupRegion(null);
+    setupRegion("id");
     statement.execute("Insert into " + REGION_TABLE_NAME + " values('1', 'bogus', 11)");
     validateTableRowCount(1);
 
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/MySqlJdbcLoaderIntegrationTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/MySqlJdbcLoaderIntegrationTest.java
index e404966..e518c80 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/MySqlJdbcLoaderIntegrationTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/MySqlJdbcLoaderIntegrationTest.java
@@ -69,8 +69,8 @@ public class MySqlJdbcLoaderIntegrationTest extends JdbcLoaderIntegrationTest {
             true),
         new FieldMapping("anint", FieldType.INT.name(), "anint", JDBCType.INTEGER.name(), true),
         new FieldMapping("along", FieldType.LONG.name(), "along", JDBCType.BIGINT.name(), true),
-        new FieldMapping("afloat", FieldType.FLOAT.name(), "afloat", JDBCType.FLOAT.name(), true),
-        new FieldMapping("adouble", FieldType.DOUBLE.name(), "adouble", JDBCType.FLOAT.name(),
+        new FieldMapping("afloat", FieldType.FLOAT.name(), "afloat", JDBCType.REAL.name(), true),
+        new FieldMapping("adouble", FieldType.DOUBLE.name(), "adouble", JDBCType.REAL.name(),
             true),
         new FieldMapping("astring", FieldType.STRING.name(), "astring", JDBCType.VARCHAR.name(),
             true),
@@ -78,7 +78,7 @@ public class MySqlJdbcLoaderIntegrationTest extends JdbcLoaderIntegrationTest {
         new FieldMapping("anobject", FieldType.OBJECT.name(), "anobject", JDBCType.VARCHAR.name(),
             true),
         new FieldMapping("abytearray", FieldType.BYTE_ARRAY.name(), "abytearray",
-            JDBCType.BLOB.name(), true),
+            JDBCType.BINARY.name(), true),
         new FieldMapping("achar", FieldType.CHAR.name(), "achar", JDBCType.CHAR.name(), true));
     return fieldMappings;
   }
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/PostgresJdbcLoaderIntegrationTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/PostgresJdbcLoaderIntegrationTest.java
index ecd6e6a..e3174c3 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/PostgresJdbcLoaderIntegrationTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/PostgresJdbcLoaderIntegrationTest.java
@@ -66,15 +66,15 @@ public class PostgresJdbcLoaderIntegrationTest extends JdbcLoaderIntegrationTest
   protected List<FieldMapping> getSupportedPdxFieldsTableFieldMappings() {
     List<FieldMapping> fieldMappings = Arrays.asList(
         new FieldMapping("id", FieldType.STRING.name(), "id", JDBCType.VARCHAR.name(), false),
-        new FieldMapping("aboolean", FieldType.BOOLEAN.name(), "aboolean", JDBCType.SMALLINT.name(),
+        new FieldMapping("aboolean", FieldType.BOOLEAN.name(), "aboolean", JDBCType.BIT.name(),
             true),
         new FieldMapping("aByte", FieldType.BYTE.name(), "abyte", JDBCType.SMALLINT.name(), true),
         new FieldMapping("ASHORT", FieldType.SHORT.name(), "ashort", JDBCType.SMALLINT.name(),
             true),
         new FieldMapping("anint", FieldType.INT.name(), "anint", JDBCType.INTEGER.name(), true),
         new FieldMapping("along", FieldType.LONG.name(), "along", JDBCType.BIGINT.name(), true),
-        new FieldMapping("afloat", FieldType.FLOAT.name(), "afloat", JDBCType.FLOAT.name(), true),
-        new FieldMapping("adouble", FieldType.DOUBLE.name(), "adouble", JDBCType.FLOAT.name(),
+        new FieldMapping("afloat", FieldType.FLOAT.name(), "afloat", JDBCType.DOUBLE.name(), true),
+        new FieldMapping("adouble", FieldType.DOUBLE.name(), "adouble", JDBCType.DOUBLE.name(),
             true),
         new FieldMapping("astring", FieldType.STRING.name(), "astring", JDBCType.VARCHAR.name(),
             true),
diff --git a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/PostgresJdbcWriterIntegrationTest.java b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/PostgresJdbcWriterIntegrationTest.java
index 8f4a4af..a1fb491 100644
--- a/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/PostgresJdbcWriterIntegrationTest.java
+++ b/geode-connectors/src/acceptanceTest/java/org/apache/geode/connectors/jdbc/PostgresJdbcWriterIntegrationTest.java
@@ -61,7 +61,7 @@ public class PostgresJdbcWriterIntegrationTest extends JdbcWriterIntegrationTest
   @Test
   public void canDestroyFromTableWithCatalogAndSchema() throws Exception {
     createTableWithCatalogAndSchema();
-    sharedRegionSetup(null, DB_NAME, SCHEMA_NAME);
+    sharedRegionSetup("id", DB_NAME, SCHEMA_NAME);
     employees.put("1", pdx1);
     employees.put("2", pdx2);
 
@@ -77,7 +77,7 @@ public class PostgresJdbcWriterIntegrationTest extends JdbcWriterIntegrationTest
   @Test
   public void canInsertIntoTableWithCatalogAndSchema() throws Exception {
     createTableWithCatalogAndSchema();
-    sharedRegionSetup(null, DB_NAME, SCHEMA_NAME);
+    sharedRegionSetup("id", DB_NAME, SCHEMA_NAME);
     employees.put("1", pdx1);
     employees.put("2", pdx2);
 
@@ -92,7 +92,7 @@ public class PostgresJdbcWriterIntegrationTest extends JdbcWriterIntegrationTest
   @Test
   public void canUpdateTableWithCatalogAndSchema() throws Exception {
     createTableWithCatalogAndSchema();
-    sharedRegionSetup(null, DB_NAME, SCHEMA_NAME);
+    sharedRegionSetup("id", DB_NAME, SCHEMA_NAME);
     employees.put("1", pdx1);
     employees.put("1", pdx2);
 
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JdbcConnectorService.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JdbcConnectorService.java
index c355ed9..4ae915a 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JdbcConnectorService.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JdbcConnectorService.java
@@ -16,6 +16,8 @@ package org.apache.geode.connectors.jdbc.internal;
 
 import java.util.Set;
 
+import javax.sql.DataSource;
+
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
@@ -38,4 +40,8 @@ public interface JdbcConnectorService extends CacheService {
   RegionMapping getMappingForRegion(String regionName);
 
   Set<RegionMapping> getRegionMappings();
+
+  void validateMapping(RegionMapping regionMapping, DataSource dataSource);
+
+  void validateMapping(RegionMapping regionMapping);
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JdbcConnectorServiceImpl.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JdbcConnectorServiceImpl.java
index 5630d54..59105f2 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JdbcConnectorServiceImpl.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/JdbcConnectorServiceImpl.java
@@ -14,22 +14,33 @@
  */
 package org.apache.geode.connectors.jdbc.internal;
 
+import java.sql.Connection;
+import java.sql.SQLException;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import javax.sql.DataSource;
+
+import org.apache.logging.log4j.Logger;
+
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.Region;
+import org.apache.geode.connectors.jdbc.JdbcConnectorException;
 import org.apache.geode.connectors.jdbc.JdbcWriter;
+import org.apache.geode.connectors.jdbc.internal.configuration.FieldMapping;
 import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
 import org.apache.geode.internal.cache.CacheService;
+import org.apache.geode.internal.jndi.JNDIInvoker;
+import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.management.internal.beans.CacheServiceMBeanBase;
 
 @Experimental
 public class JdbcConnectorServiceImpl implements JdbcConnectorService {
 
+  private static final Logger logger = LogService.getLogger();
   private final Map<String, RegionMapping> mappingsByRegion =
       new ConcurrentHashMap<>();
 
@@ -77,6 +88,87 @@ public class JdbcConnectorServiceImpl implements JdbcConnectorService {
   }
 
   @Override
+  public void validateMapping(RegionMapping regionMapping) {
+
+    DataSource dataSource = getDataSource(regionMapping.getDataSourceName());
+    if (dataSource == null) {
+      throw new JdbcConnectorException("No datasource \"" + regionMapping.getDataSourceName()
+          + "\" found when creating mapping \"" + regionMapping.getRegionName() + "\"");
+    }
+    validateMapping(regionMapping, dataSource);
+  }
+
+  @Override
+  public void validateMapping(RegionMapping regionMapping, DataSource dataSource) {
+    TableMetaDataView metaDataView = getTableMetaDataView(regionMapping, dataSource);
+    boolean foundDifference = false;
+
+    if (regionMapping.getFieldMappings().size() != metaDataView.getColumnNames().size()) {
+      foundDifference = true;
+    } else {
+      for (FieldMapping fieldMapping : regionMapping.getFieldMappings()) {
+        String jdbcName = fieldMapping.getJdbcName();
+        if (!metaDataView.getColumnNames().contains(jdbcName)) {
+          foundDifference = true;
+          break;
+        }
+        if (!metaDataView.getColumnDataType(jdbcName).getName()
+            .equals(fieldMapping.getJdbcType())) {
+          foundDifference = true;
+          break;
+        }
+        if (metaDataView.isColumnNullable(jdbcName) != fieldMapping.isJdbcNullable()) {
+          foundDifference = true;
+          break;
+        }
+      }
+    }
+
+    if (!foundDifference) {
+      if (!regionMapping.getSpecifiedIds()
+          && !regionMapping.getIds().equals(String.join(",", metaDataView.getKeyColumnNames()))) {
+        foundDifference = true;
+      }
+    }
+
+    if (foundDifference) {
+      StringBuilder sb = new StringBuilder();
+      sb.append(
+          "Error detected when comparing mapping for region \"" + regionMapping.getRegionName()
+              + "\" with table definition: \n");
+
+      if (!regionMapping.getSpecifiedIds()) {
+        sb.append("\nId fields in Field Mappings: " + regionMapping.getIds());
+        sb.append(
+            "\nId fields in Table MetaData: " + String.join(",", metaDataView.getKeyColumnNames()));
+      }
+
+      sb.append("\n\nDefinition from Field Mappings (" + regionMapping.getFieldMappings().size()
+          + " field mappings found):");
+
+      for (FieldMapping fieldMapping : regionMapping.getFieldMappings()) {
+        sb.append("\n" + fieldMapping.getJdbcName() + " - " + fieldMapping.getJdbcType());
+      }
+
+      sb.append("\n\nDefinition from Table Metadata (" + metaDataView.getColumnNames().size()
+          + " columns found):");
+
+      for (String name : metaDataView.getColumnNames()) {
+        sb.append("\n" + name + " - " + metaDataView.getColumnDataType(name));
+      }
+
+      sb.append("\n\nDestroy and recreate the JDBC mapping for \"" + regionMapping.getRegionName()
+          + "\" to resolve this error.");
+
+      logger.error(sb.toString());
+
+      throw new JdbcConnectorException("Jdbc mapping for \"" + regionMapping.getRegionName()
+          + "\" does not match table definition, check logs for more details.");
+    }
+  }
+
+
+  @Override
   public RegionMapping getMappingForRegion(String regionName) {
     return mappingsByRegion.get(regionName);
   }
@@ -99,4 +191,27 @@ public class JdbcConnectorServiceImpl implements JdbcConnectorService {
   public CacheServiceMBeanBase getMBean() {
     return null;
   }
+
+
+  // The following helper method is to allow for proper mocking in unit tests
+  DataSource getDataSource(String dataSourceName) {
+    return JNDIInvoker.getDataSource(dataSourceName);
+  }
+
+  // The following helper method is to allow for proper mocking in unit tests
+  TableMetaDataManager getTableMetaDataManager() {
+    return new TableMetaDataManager();
+  }
+
+  private TableMetaDataView getTableMetaDataView(RegionMapping regionMapping,
+      DataSource dataSource) {
+    TableMetaDataManager manager = getTableMetaDataManager();
+    try (Connection connection = dataSource.getConnection()) {
+      return manager.getTableMetaDataView(connection, regionMapping);
+    } catch (SQLException ex) {
+      throw JdbcConnectorException
+          .createException("Exception thrown while connecting to datasource \""
+              + regionMapping.getDataSourceName() + "\": ", ex);
+    }
+  }
 }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
index 423622c..ff8bcfe 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
@@ -40,20 +40,20 @@ import org.apache.geode.pdx.PdxInstance;
 @Experimental
 public class SqlHandler {
   private final InternalCache cache;
-  private final TableMetaDataManager tableMetaDataManager;
   private final RegionMapping regionMapping;
   private final DataSource dataSource;
+  private final TableMetaDataView tableMetaData;
   private final Map<String, FieldMapping> pdxToFieldMappings = new HashMap<>();
-  private final Map<String, FieldMapping> jdbcToFieldMappings = new HashMap<>();
   private volatile SqlToPdxInstance sqlToPdxInstance;
 
   public SqlHandler(InternalCache cache, String regionName,
       TableMetaDataManager tableMetaDataManager, JdbcConnectorService configService,
       DataSourceFactory dataSourceFactory) {
     this.cache = cache;
-    this.tableMetaDataManager = tableMetaDataManager;
     this.regionMapping = getMappingForRegion(configService, regionName);
     this.dataSource = getDataSource(dataSourceFactory, this.regionMapping.getDataSourceName());
+    this.tableMetaData = getTableMetaDataView(tableMetaDataManager);
+    cache.getService(JdbcConnectorService.class).validateMapping(regionMapping, dataSource);
     initializeFieldMappingMaps();
   }
 
@@ -63,6 +63,15 @@ public class SqlHandler {
         dataSourceName -> JNDIInvoker.getDataSource(dataSourceName));
   }
 
+  private TableMetaDataView getTableMetaDataView(TableMetaDataManager tableMetaDataManager) {
+    try (Connection connection = getConnection()) {
+      return tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
+    } catch (SQLException ex) {
+      throw new JdbcConnectorException("Could not connect to datasource \""
+          + regionMapping.getDataSourceName() + "\" because: " + ex);
+    }
+  }
+
   private static RegionMapping getMappingForRegion(JdbcConnectorService configService,
       String regionName) {
     RegionMapping regionMapping = configService.getMappingForRegion(regionName);
@@ -86,38 +95,16 @@ public class SqlHandler {
 
   private void initializeFieldMappingMaps() {
     for (FieldMapping fieldMapping : regionMapping.getFieldMappings()) {
-      this.jdbcToFieldMappings.put(fieldMapping.getJdbcName(), fieldMapping);
-      if (!fieldMapping.getPdxName().isEmpty()) {
-        this.pdxToFieldMappings.put(fieldMapping.getPdxName(), fieldMapping);
-      }
+      this.pdxToFieldMappings.put(fieldMapping.getPdxName(), fieldMapping);
     }
   }
 
   private String getColumnNameForField(String fieldName) {
-    FieldMapping exactMatch = this.pdxToFieldMappings.get(fieldName);
-    if (exactMatch != null) {
-      return exactMatch.getJdbcName();
-    }
-    exactMatch = this.jdbcToFieldMappings.get(fieldName);
-    if (exactMatch != null) {
-      this.pdxToFieldMappings.put(fieldName, exactMatch);
-      return exactMatch.getJdbcName();
+    FieldMapping match = this.pdxToFieldMappings.get(fieldName);
+    if (match != null) {
+      return match.getJdbcName();
     }
-    FieldMapping inexactMatch = null;
-    for (FieldMapping fieldMapping : regionMapping.getFieldMappings()) {
-      if (fieldMapping.getJdbcName().equalsIgnoreCase(fieldName)) {
-        if (inexactMatch != null) {
-          throw new JdbcConnectorException(
-              "Multiple columns matched the pdx field \"" + fieldName + "\".");
-        }
-        inexactMatch = fieldMapping;
-      }
-    }
-    if (inexactMatch == null) {
-      throw new JdbcConnectorException("No column matched the pdx field \"" + fieldName + "\".");
-    }
-    this.pdxToFieldMappings.put(fieldName, inexactMatch);
-    return inexactMatch.getJdbcName();
+    return null;
   }
 
   Connection getConnection() throws SQLException {
@@ -131,8 +118,6 @@ public class SqlHandler {
 
     PdxInstance result;
     try (Connection connection = getConnection()) {
-      TableMetaDataView tableMetaData =
-          this.tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
       EntryColumnData entryColumnData =
           getEntryColumnData(tableMetaData, key, null, Operation.GET);
       try (PreparedStatement statement =
@@ -226,8 +211,6 @@ public class SqlHandler {
     }
 
     try (Connection connection = getConnection()) {
-      TableMetaDataView tableMetaData =
-          this.tableMetaDataManager.getTableMetaDataView(connection, regionMapping);
       EntryColumnData entryColumnData =
           getEntryColumnData(tableMetaData, key, value, operation);
       int updateCount = 0;
@@ -332,7 +315,7 @@ public class SqlHandler {
       }
       for (String fieldName : fieldNames) {
         String columnName = getColumnNameForField(fieldName);
-        if (!keyColumnNames.contains(columnName)) {
+        if (columnName == null || !keyColumnNames.contains(columnName)) {
           throw new JdbcConnectorException("The key \"" + key + "\" has the field \"" + fieldName
               + "\" which does not match any of the key columns: " + keyColumnNames);
         }
@@ -349,6 +332,11 @@ public class SqlHandler {
     List<ColumnData> result = new ArrayList<>();
     for (String fieldName : value.getFieldNames()) {
       String columnName = getColumnNameForField(fieldName);
+      if (columnName == null) {
+        // The user must have added a new field to their pdx domain class.
+        // To support PDX class versioning we will ignore this field.
+        continue;
+      }
       if (tableMetaData.getKeyColumnNames().contains(columnName)) {
         continue;
       }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstance.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstance.java
index 535e24e..40ec68a 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstance.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstance.java
@@ -80,9 +80,9 @@ public class SqlToPdxInstance {
       String columnName = metaData.getColumnName(i);
       PdxFieldInfo fieldInfo = this.columnToPdxFieldMap.get(columnName);
       if (fieldInfo == null) {
-        // TODO: this column was added since create jdbc-mapping was done.
-        // Log a warning, once, and just ignore this column
-        continue;
+        throw new JdbcConnectorException(
+            "The jdbc-mapping does not contain the column name \"" + columnName + "\"."
+                + " This is probably caused by a column being added to the table after the jdbc-mapping was created.");
       }
       Object fieldValue = getFieldValue(resultSet, i, fieldInfo.getType(), metaData);
       result.setField(fieldInfo.getName(), fieldValue);
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/DescribeMappingCommand.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/DescribeMappingCommand.java
index fb64000..1b44f59 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/DescribeMappingCommand.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/cli/DescribeMappingCommand.java
@@ -20,6 +20,7 @@ import static org.apache.geode.connectors.util.internal.MappingConstants.ID_NAME
 import static org.apache.geode.connectors.util.internal.MappingConstants.PDX_NAME;
 import static org.apache.geode.connectors.util.internal.MappingConstants.REGION_NAME;
 import static org.apache.geode.connectors.util.internal.MappingConstants.SCHEMA_NAME;
+import static org.apache.geode.connectors.util.internal.MappingConstants.SPECIFIED_ID_NAME;
 import static org.apache.geode.connectors.util.internal.MappingConstants.SYNCHRONOUS_NAME;
 import static org.apache.geode.connectors.util.internal.MappingConstants.TABLE_NAME;
 
@@ -117,6 +118,7 @@ public class DescribeMappingCommand extends GfshCommand {
     attributes.put(DATA_SOURCE_NAME, regionMapping.getDataSourceName());
     attributes.put(SYNCHRONOUS_NAME, Boolean.toString(synchronous));
     attributes.put(ID_NAME, regionMapping.getIds());
+    attributes.put(SPECIFIED_ID_NAME, Boolean.toString(regionMapping.getSpecifiedIds()));
     if (regionMapping.getCatalog() != null) {
       attributes.put(CATALOG_NAME, regionMapping.getCatalog());
     }
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/configuration/RegionMapping.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/configuration/RegionMapping.java
index 9f781f8..6d8e3d1 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/configuration/RegionMapping.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/configuration/RegionMapping.java
@@ -25,46 +25,14 @@ import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 import javax.xml.bind.annotation.XmlType;
 
+import org.apache.commons.lang3.StringUtils;
+
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.configuration.CacheElement;
 import org.apache.geode.cache.configuration.XSDRootElement;
 
 /**
- * <p>
- * Java class for anonymous complex type.
- *
- * <p>
- * The following schema fragment specifies the expected content contained within this class.
- *
- * <pre>
- * &lt;complexType>
- *   &lt;complexContent>
- *     &lt;restriction base="{http://www.w3.org/2001/XMLSchema}anyType">
- *       &lt;sequence>
- *         &lt;element name="field-mapping" maxOccurs="unbounded" minOccurs="0">
- *           &lt;complexType>
- *             &lt;simpleContent>
- *               &lt;extension base="&lt;http://www.w3.org/2001/XMLSchema>string">
- *                 &lt;attribute name="pdx-name" type="{http://www.w3.org/2001/XMLSchema}string" />
- *                 &lt;attribute name="pdx-type" type="{http://www.w3.org/2001/XMLSchema}string" />
- *                 &lt;attribute name="jdbc-name" type="{http://www.w3.org/2001/XMLSchema}string" />
- *                 &lt;attribute name="jdbc-type" type="{http://www.w3.org/2001/XMLSchema}string" />
- *               &lt;/extension>
- *             &lt;/simpleContent>
- *           &lt;/complexType>
- *         &lt;/element>
- *       &lt;/sequence>
- *       &lt;attribute name="data-source" type="{http://www.w3.org/2001/XMLSchema}string" />
- *       &lt;attribute name="table" type="{http://www.w3.org/2001/XMLSchema}string" />
- *       &lt;attribute name="pdx-name" type="{http://www.w3.org/2001/XMLSchema}string" />
- *       &lt;attribute name="ids" type="{http://www.w3.org/2001/XMLSchema}string" />
- *       &lt;attribute name="catalog" type="{http://www.w3.org/2001/XMLSchema}string" />
- *     &lt;/restriction>
- *   &lt;/complexContent>
- * &lt;/complexType>
- * </pre>
- *
- *
+ * Java class for xsd mapping element.
  */
 @Experimental
 @XmlAccessorType(XmlAccessType.FIELD)
@@ -84,6 +52,8 @@ public class RegionMapping implements CacheElement {
   protected String pdxName;
   @XmlAttribute(name = "ids")
   protected String ids;
+  @XmlAttribute(name = "specified-ids")
+  protected boolean specifiedIds;
   @XmlAttribute(name = "catalog")
   protected String catalog;
   @XmlAttribute(name = "schema")
@@ -103,6 +73,7 @@ public class RegionMapping implements CacheElement {
     this.tableName = tableName;
     this.dataSourceName = dataSourceName;
     this.ids = ids;
+    this.specifiedIds = !StringUtils.isEmpty(ids);
     this.catalog = catalog;
     this.schema = schema;
   }
@@ -135,6 +106,10 @@ public class RegionMapping implements CacheElement {
     this.schema = schema;
   }
 
+  public void setSpecifiedIds(boolean specifiedIds) {
+    this.specifiedIds = specifiedIds;
+  }
+
   public String getDataSourceName() {
     return dataSourceName;
   }
@@ -151,6 +126,10 @@ public class RegionMapping implements CacheElement {
     return ids;
   }
 
+  public boolean getSpecifiedIds() {
+    return specifiedIds;
+  }
+
   public String getCatalog() {
     return catalog;
   }
@@ -187,6 +166,7 @@ public class RegionMapping implements CacheElement {
         && isEqual(tableName, that.tableName)
         && isEqual(dataSourceName, that.dataSourceName)
         && isEqual(ids, that.ids)
+        && specifiedIds == that.specifiedIds
         && isEqual(catalog, that.catalog)
         && isEqual(schema, that.schema)
         && isEqual(fieldMappings, that.fieldMappings);
@@ -216,6 +196,7 @@ public class RegionMapping implements CacheElement {
         + ", tableName='" + tableName + '\''
         + ", dataSourceName='" + dataSourceName + '\''
         + ", ids='" + ids + '\''
+        + ", specifiedIds='" + specifiedIds + '\''
         + ", catalog='" + catalog + '\''
         + ", schema='" + schema + '\''
         + ", fieldMapping='" + fieldMappings + '\''
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/RegionMappingConfiguration.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/RegionMappingConfiguration.java
index 1f13e64..475a55d 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/RegionMappingConfiguration.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/xml/RegionMappingConfiguration.java
@@ -50,6 +50,7 @@ public class RegionMappingConfiguration implements Extension<Region<?, ?>> {
     final Region<?, ?> region = extensionPoint.getTarget();
     InternalCache internalCache = (InternalCache) region.getRegionService();
     JdbcConnectorService service = internalCache.getService(JdbcConnectorService.class);
+    service.validateMapping(mapping);
     createRegionMapping(service, mapping);
   }
 
diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/util/internal/MappingConstants.java b/geode-connectors/src/main/java/org/apache/geode/connectors/util/internal/MappingConstants.java
index b22f99e..9417731 100644
--- a/geode-connectors/src/main/java/org/apache/geode/connectors/util/internal/MappingConstants.java
+++ b/geode-connectors/src/main/java/org/apache/geode/connectors/util/internal/MappingConstants.java
@@ -21,6 +21,7 @@ public final class MappingConstants {
   public static final String DATA_SOURCE_NAME = "data-source";
   public static final String SYNCHRONOUS_NAME = "synchronous";
   public static final String ID_NAME = "id";
+  public static final String SPECIFIED_ID_NAME = "id-user-specified";
   public static final String SCHEMA_NAME = "schema";
   public static final String CATALOG_NAME = "catalog";
   public static final String GROUP_NAME = "groups";
diff --git a/geode-connectors/src/main/resources/META-INF/schemas/geode.apache.org/schema/jdbc/jdbc-1.0.xsd b/geode-connectors/src/main/resources/META-INF/schemas/geode.apache.org/schema/jdbc/jdbc-1.0.xsd
index 72701b3..afec0da 100644
--- a/geode-connectors/src/main/resources/META-INF/schemas/geode.apache.org/schema/jdbc/jdbc-1.0.xsd
+++ b/geode-connectors/src/main/resources/META-INF/schemas/geode.apache.org/schema/jdbc/jdbc-1.0.xsd
@@ -59,6 +59,7 @@ XML schema for JDBC Connector Service in Geode.
                 <xsd:attribute type="xsd:string" name="table" use="optional"/>
                 <xsd:attribute type="xsd:string" name="pdx-name" use="required"/>
                 <xsd:attribute type="xsd:string" name="ids" use="optional"/>
+                <xsd:attribute type="xsd:boolean" name="specified-ids" use="optional"/>
                 <xsd:attribute type="xsd:string" name="catalog" use="optional"/>
                 <xsd:attribute type="xsd:string" name="schema" use="optional"/>
             </xsd:complexType>
diff --git a/geode-connectors/src/main/resources/org/apache/geode/internal/sanctioned-geode-connectors-serializables.txt b/geode-connectors/src/main/resources/org/apache/geode/internal/sanctioned-geode-connectors-serializables.txt
index 9257999..5551ab4 100755
--- a/geode-connectors/src/main/resources/org/apache/geode/internal/sanctioned-geode-connectors-serializables.txt
+++ b/geode-connectors/src/main/resources/org/apache/geode/internal/sanctioned-geode-connectors-serializables.txt
@@ -5,4 +5,4 @@ org/apache/geode/connectors/jdbc/internal/cli/CreateMappingFunction,false
 org/apache/geode/connectors/jdbc/internal/cli/CreateMappingPreconditionCheckFunction,false
 org/apache/geode/connectors/jdbc/internal/cli/DestroyMappingFunction,false
 org/apache/geode/connectors/jdbc/internal/configuration/FieldMapping,false,jdbcName:java/lang/String,jdbcNullable:boolean,jdbcType:java/lang/String,pdxName:java/lang/String,pdxType:java/lang/String
-org/apache/geode/connectors/jdbc/internal/configuration/RegionMapping,false,catalog:java/lang/String,dataSourceName:java/lang/String,fieldMappings:java/util/List,ids:java/lang/String,pdxName:java/lang/String,regionName:java/lang/String,schema:java/lang/String,tableName:java/lang/String
+org/apache/geode/connectors/jdbc/internal/configuration/RegionMapping,false,catalog:java/lang/String,dataSourceName:java/lang/String,fieldMappings:java/util/List,ids:java/lang/String,pdxName:java/lang/String,regionName:java/lang/String,schema:java/lang/String,specifiedIds:boolean,tableName:java/lang/String
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JdbcConnectorServiceTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JdbcConnectorServiceTest.java
index 01cfc32..1d80bf6 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JdbcConnectorServiceTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/JdbcConnectorServiceTest.java
@@ -15,12 +15,30 @@
 package org.apache.geode.connectors.jdbc.internal;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.sql.Connection;
+import java.sql.JDBCType;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.sql.DataSource;
+
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.geode.connectors.jdbc.JdbcConnectorException;
+import org.apache.geode.connectors.jdbc.internal.configuration.FieldMapping;
 import org.apache.geode.connectors.jdbc.internal.configuration.RegionMapping;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.extension.ExtensionPoint;
@@ -28,22 +46,61 @@ import org.apache.geode.internal.cache.extension.ExtensionPoint;
 public class JdbcConnectorServiceTest {
 
   private static final String TEST_REGION_NAME = "testRegion";
+  private static final String DATA_SOURCE_NAME = "dataSource";
+
+  private static final String KEY_COLUMN_NAME = "id";
+  private static final String COMPOSITE_KEY_COLUMN_NAME = "Key2";
+  private static final String VALUE_COLUMN_NAME = "name";
+  private static final String EXTRA_COLUMN_NAME = "extraColumn";
+
+  private final List<String> keyColumns = new ArrayList<>();
+  private final Set<String> allColumns = new HashSet<>();
+  private final List<FieldMapping> fieldMappings = new ArrayList<>();
 
   private RegionMapping mapping;
 
   private JdbcConnectorServiceImpl service;
 
+  TableMetaDataView view = mock(TableMetaDataView.class);
+  TableMetaDataManager manager = mock(TableMetaDataManager.class);
+  InternalCache cache = mock(InternalCache.class);
+  DataSource dataSource = mock(DataSource.class);
+  Connection connection = mock(Connection.class);
+
   @Before
   public void setUp() throws Exception {
-    InternalCache cache = mock(InternalCache.class);
     mapping = mock(RegionMapping.class);
-    String[] parameters = new String[] {"key1:value1", "key2:value2"};
 
     when(cache.getExtensionPoint()).thenReturn(mock(ExtensionPoint.class));
     when(mapping.getRegionName()).thenReturn(TEST_REGION_NAME);
+    when(mapping.getDataSourceName()).thenReturn(DATA_SOURCE_NAME);
+    when(mapping.getFieldMappings()).thenReturn(fieldMappings);
+    when(mapping.getIds()).thenReturn(KEY_COLUMN_NAME);
+    when(mapping.getSpecifiedIds()).thenReturn(true);
+
+    when(dataSource.getConnection()).thenReturn(connection);
+    when(manager.getTableMetaDataView(connection, mapping)).thenReturn(view);
+    when(view.getKeyColumnNames()).thenReturn(keyColumns);
+    when(view.getColumnNames()).thenReturn(allColumns);
+    when(view.getColumnDataType(KEY_COLUMN_NAME)).thenReturn(JDBCType.INTEGER);
+    when(view.getColumnDataType(VALUE_COLUMN_NAME)).thenReturn(JDBCType.VARCHAR);
+    when(view.isColumnNullable(KEY_COLUMN_NAME)).thenReturn(false);
+    when(view.isColumnNullable(VALUE_COLUMN_NAME)).thenReturn(true);
 
-    service = new JdbcConnectorServiceImpl();
+    service = spy(JdbcConnectorServiceImpl.class);
     service.init(cache);
+
+    keyColumns.add(KEY_COLUMN_NAME);
+    allColumns.add(KEY_COLUMN_NAME);
+    allColumns.add(VALUE_COLUMN_NAME);
+
+    fieldMappings
+        .add(new FieldMapping("id", "integer", KEY_COLUMN_NAME, JDBCType.INTEGER.getName(), false));
+    fieldMappings.add(
+        new FieldMapping("name", "string", VALUE_COLUMN_NAME, JDBCType.VARCHAR.getName(), true));
+
+    doReturn(dataSource).when(service).getDataSource(DATA_SOURCE_NAME);
+    doReturn(manager).when(service).getTableMetaDataManager();
   }
 
   @Test
@@ -65,4 +122,97 @@ public class JdbcConnectorServiceTest {
 
     assertThat(service.getMappingForRegion(TEST_REGION_NAME)).isNull();
   }
+
+  @Test
+  public void validateMappingSucceedsWithMatchingMapping() {
+    service.validateMapping(mapping);
+  }
+
+  @Test
+  public void validateMappingSucceedsWithMatchingMappingAndUnspecifiedIds() {
+    when(mapping.getSpecifiedIds()).thenReturn(false);
+    service.validateMapping(mapping);
+  }
+
+  @Test
+  public void validateMappingThrowsExceptionWhenGetConnectionHasSqlException() throws SQLException {
+    when(dataSource.getConnection()).thenThrow(SQLException.class);
+    Throwable throwable = catchThrowable(() -> service.validateMapping(mapping));
+    assertThat(throwable).isInstanceOf(JdbcConnectorException.class).hasMessageContaining(
+        "Exception thrown while connecting to datasource \"dataSource\": null");
+    verify(connection, never()).close();
+  }
+
+  @Test
+  public void validateMappingClosesConnectionWhenGetTableMetaDataViewThrows() throws SQLException {
+    when(manager.getTableMetaDataView(connection, mapping)).thenThrow(JdbcConnectorException.class);
+    Throwable throwable = catchThrowable(() -> service.validateMapping(mapping));
+    assertThat(throwable).isInstanceOf(JdbcConnectorException.class);
+    verify(connection).close();
+  }
+
+  @Test(expected = JdbcConnectorException.class)
+  public void validateMappingThrowsExceptionWithNullDataSource() {
+    doReturn(null).when(service).getDataSource(DATA_SOURCE_NAME);
+    service.validateMapping(mapping);
+  }
+
+  @Test(expected = JdbcConnectorException.class)
+  public void validateMappingThrowsExceptionWithAddedColumn() {
+    allColumns.add(EXTRA_COLUMN_NAME);
+    when(view.getColumnDataType(EXTRA_COLUMN_NAME)).thenReturn(JDBCType.VARCHAR);
+    service.validateMapping(mapping);
+  }
+
+  @Test(expected = JdbcConnectorException.class)
+  public void validateMappingThrowsExceptionWithRemovedColumn() {
+    allColumns.remove(VALUE_COLUMN_NAME);
+    service.validateMapping(mapping);
+  }
+
+  @Test(expected = JdbcConnectorException.class)
+  public void validateMappingThrowsExceptionWithColumnNameChanged() {
+    allColumns.remove(VALUE_COLUMN_NAME);
+    allColumns.add(VALUE_COLUMN_NAME.toUpperCase());
+    when(view.getColumnDataType(VALUE_COLUMN_NAME.toUpperCase())).thenReturn(JDBCType.VARCHAR);
+    service.validateMapping(mapping);
+  }
+
+  @Test(expected = JdbcConnectorException.class)
+  public void validateMappingThrowsExceptionWithModifiedColumn() {
+    when(view.getColumnDataType(VALUE_COLUMN_NAME)).thenReturn(JDBCType.INTEGER);
+    service.validateMapping(mapping);
+  }
+
+  @Test(expected = JdbcConnectorException.class)
+  public void validateMappingThrowsExceptionWithModifiedColumnIsNullable() {
+    when(view.isColumnNullable(VALUE_COLUMN_NAME)).thenReturn(false);
+    service.validateMapping(mapping);
+  }
+
+  @Test(expected = JdbcConnectorException.class)
+  public void validateMappingThrowsExceptionWithModifiedIdColumns() {
+    when(view.getKeyColumnNames()).thenReturn(Arrays.asList(VALUE_COLUMN_NAME.toUpperCase()));
+    when(mapping.getSpecifiedIds()).thenReturn(false);
+    service.validateMapping(mapping);
+  }
+
+  @Test
+  public void validateMappingSucceedsWithModifiedIdColumnsWithSpecifiedIds() {
+    when(view.getKeyColumnNames()).thenReturn(Arrays.asList(VALUE_COLUMN_NAME.toUpperCase()));
+    service.validateMapping(mapping);
+  }
+
+  @Test
+  public void validateMappingSucceedsWithCompositeKeys() {
+    keyColumns.add(COMPOSITE_KEY_COLUMN_NAME);
+    allColumns.add(COMPOSITE_KEY_COLUMN_NAME);
+    when(view.getColumnDataType(COMPOSITE_KEY_COLUMN_NAME)).thenReturn(JDBCType.INTEGER);
+    when(view.isColumnNullable(COMPOSITE_KEY_COLUMN_NAME)).thenReturn(false);
+    fieldMappings.add(new FieldMapping("key2", "integer", COMPOSITE_KEY_COLUMN_NAME,
+        JDBCType.INTEGER.getName(), false));
+    when(mapping.getSpecifiedIds()).thenReturn(false);
+    when(mapping.getIds()).thenReturn(KEY_COLUMN_NAME + "," + COMPOSITE_KEY_COLUMN_NAME);
+    service.validateMapping(mapping);
+  }
 }
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/RegionMappingTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/RegionMappingTest.java
index 6c62b79..4e7c8dc 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/RegionMappingTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/RegionMappingTest.java
@@ -134,7 +134,7 @@ public class RegionMappingTest {
     String result = rm.toString();
 
     assertThat(result).isEqualTo(
-        "RegionMapping{regionName='regionName', pdxName='pdxClassName', tableName='tableName', dataSourceName='dataSourceName', ids='ids', catalog='catalog', schema='schema', fieldMapping='[FieldMapping [pdxName=pdxName, pdxType=pdxType, jdbcName=jdbcName, jdbcType=jdbcType, jdbcNullable=true]]'}");
+        "RegionMapping{regionName='regionName', pdxName='pdxClassName', tableName='tableName', dataSourceName='dataSourceName', ids='ids', specifiedIds='true', catalog='catalog', schema='schema', fieldMapping='[FieldMapping [pdxName=pdxName, pdxType=pdxType, jdbcName=jdbcName, jdbcType=jdbcType, jdbcNullable=true]]'}");
   }
 
   @Test
@@ -145,7 +145,7 @@ public class RegionMappingTest {
     String result = rm.toString();
 
     assertThat(result).isEqualTo(
-        "RegionMapping{regionName='regionName', pdxName='pdxClassName', tableName='null', dataSourceName='dataSourceName', ids='null', catalog='null', schema='null', fieldMapping='[]'}");
+        "RegionMapping{regionName='regionName', pdxName='pdxClassName', tableName='null', dataSourceName='dataSourceName', ids='null', specifiedIds='false', catalog='null', schema='null', fieldMapping='[]'}");
   }
 
   @Test
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
index f6c8c10..4ca24b0 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
@@ -32,10 +32,13 @@ import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
 
 import javax.sql.DataSource;
 
 import junitparams.JUnitParamsRunner;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -80,6 +83,7 @@ public class SqlHandlerTest {
   private PdxInstanceImpl value;
   private Object key;
   private final String fieldName = "fieldName";
+  private Set<String> columnNames;
 
   @SuppressWarnings("unchecked")
   @Before
@@ -98,7 +102,7 @@ public class SqlHandlerTest {
     tableMetaDataView = mock(TableMetaDataView.class);
     when(tableMetaDataView.getQuotedTablePath()).thenReturn(TABLE_NAME);
     when(tableMetaDataView.getKeyColumnNames()).thenReturn(Arrays.asList(KEY_COLUMN));
-    final String IDS = "ids";
+    final String IDS = KEY_COLUMN;
     when(tableMetaDataManager.getTableMetaDataView(any(), any()))
         .thenReturn(tableMetaDataView);
     connectorService = mock(JdbcConnectorService.class);
@@ -108,7 +112,10 @@ public class SqlHandlerTest {
     value = mock(PdxInstanceImpl.class);
     when(value.getPdxType()).thenReturn(mock(PdxType.class));
     when(value.getFieldNames()).thenReturn(Arrays.asList(fieldName));
-
+    columnNames = new HashSet<>();
+    columnNames.add(fieldName);
+    when(tableMetaDataView.getColumnNames()).thenReturn(columnNames);
+    when(tableMetaDataView.getColumnDataType(fieldName)).thenReturn(JDBCType.VARCHAR);
     regionMapping = mock(RegionMapping.class);
     when(regionMapping.getDataSourceName()).thenReturn(DATA_SOURCE_NAME);
     when(regionMapping.getRegionName()).thenReturn(REGION_NAME);
@@ -121,6 +128,8 @@ public class SqlHandlerTest {
     when(fieldMapping.getPdxType()).thenReturn(FieldType.OBJECT.name());
     when(regionMapping.getFieldMappings()).thenReturn(Arrays.asList(fieldMapping));
     when(connectorService.getMappingForRegion(REGION_NAME)).thenReturn(regionMapping);
+    JdbcConnectorService connectorService = mock(JdbcConnectorService.class);
+    when(cache.getService(JdbcConnectorService.class)).thenReturn(connectorService);
 
     when(dataSource.getConnection()).thenReturn(this.connection);
 
@@ -129,19 +138,38 @@ public class SqlHandlerTest {
     createSqlHandler();
   }
 
+  @After
+  public void cleanUp() {
+    columnNames.clear();
+  }
+
   private void createSqlHandler() {
     handler = new SqlHandler(cache, REGION_NAME, tableMetaDataManager, connectorService,
         dataSourceFactory);
   }
 
   @Test
+  public void createSqlHandlerThrowsNoExceptionWithMatchingMapping() {
+    createSqlHandler();
+  }
+
+  @Test
+  public void createSqlHandlerHandlesSqlExceptionFromGetConnection() throws SQLException {
+    doThrow(new SQLException("test exception")).when(dataSource).getConnection();
+
+    assertThatThrownBy(() -> createSqlHandler())
+        .isInstanceOf(JdbcConnectorException.class).hasMessage(
+            "Could not connect to datasource \"dataSourceName\" because: java.sql.SQLException: test exception");
+  }
+
+  @Test
   public void readThrowsIfNoKeyProvided() throws Exception {
     thrown.expect(IllegalArgumentException.class);
     handler.read(region, null);
   }
 
   @Test
-  public void constructorThrowsIfNoMapping() throws Exception {
+  public void constructorThrowsIfNoMapping() {
     thrown.expect(JdbcConnectorException.class);
     thrown.expectMessage(
         "JDBC mapping for region regionWithNoMapping not found. Create the mapping with the gfsh command 'create jdbc-mapping'.");
@@ -151,7 +179,7 @@ public class SqlHandlerTest {
   }
 
   @Test
-  public void constructorThrowsIfNoConnectionConfig() throws Exception {
+  public void constructorThrowsIfNoConnectionConfig() {
     when(regionMapping.getDataSourceName()).thenReturn("bogus data source name");
     thrown.expect(JdbcConnectorException.class);
     thrown.expectMessage(
@@ -370,11 +398,19 @@ public class SqlHandlerTest {
     FieldMapping fieldMapping1 = mock(FieldMapping.class);
     when(fieldMapping1.getJdbcName()).thenReturn("fieldOne");
     when(fieldMapping1.getPdxName()).thenReturn("fieldOne");
+    when(fieldMapping1.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
     FieldMapping fieldMapping2 = mock(FieldMapping.class);
     when(fieldMapping2.getJdbcName()).thenReturn("fieldTwo");
     when(fieldMapping2.getPdxName()).thenReturn("fieldTwo");
+    when(fieldMapping2.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
+    columnNames.clear();
+    columnNames.add("fieldOne");
+    columnNames.add("fieldTwo");
+    when(tableMetaDataView.getColumnDataType("fieldOne")).thenReturn(JDBCType.VARCHAR);
+    when(tableMetaDataView.getColumnDataType("fieldTwo")).thenReturn(JDBCType.VARCHAR);
     when(regionMapping.getFieldMappings()).thenReturn(Arrays.asList(fieldMapping1, fieldMapping2));
     when(value.getFieldNames()).thenReturn(Arrays.asList("fieldOne", "fieldTwo"));
+    when(regionMapping.getIds()).thenReturn("fieldOne,fieldTwo");
     createSqlHandler();
 
     handler.write(region, Operation.CREATE, compositeKey, value);
@@ -413,11 +449,19 @@ public class SqlHandlerTest {
     FieldMapping fieldMapping1 = mock(FieldMapping.class);
     when(fieldMapping1.getJdbcName()).thenReturn("fieldOne");
     when(fieldMapping1.getPdxName()).thenReturn("fieldOne");
+    when(fieldMapping1.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
     FieldMapping fieldMapping2 = mock(FieldMapping.class);
     when(fieldMapping2.getJdbcName()).thenReturn("fieldTwo");
     when(fieldMapping2.getPdxName()).thenReturn("fieldTwo");
+    when(fieldMapping2.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
     when(regionMapping.getFieldMappings()).thenReturn(Arrays.asList(fieldMapping1, fieldMapping2));
     when(value.getFieldNames()).thenReturn(Arrays.asList("fieldOne", "fieldTwo"));
+    columnNames.clear();
+    columnNames.add("fieldOne");
+    columnNames.add("fieldTwo");
+    when(tableMetaDataView.getColumnDataType("fieldOne")).thenReturn(JDBCType.VARCHAR);
+    when(tableMetaDataView.getColumnDataType("fieldTwo")).thenReturn(JDBCType.VARCHAR);
+    when(regionMapping.getIds()).thenReturn("fieldOne,fieldTwo");
     createSqlHandler();
 
     handler.write(region, Operation.UPDATE, compositeKey, value);
@@ -453,10 +497,18 @@ public class SqlHandlerTest {
     FieldMapping fieldMapping1 = mock(FieldMapping.class);
     when(fieldMapping1.getJdbcName()).thenReturn("fieldOne");
     when(fieldMapping1.getPdxName()).thenReturn("fieldOne");
+    when(fieldMapping1.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
     FieldMapping fieldMapping2 = mock(FieldMapping.class);
     when(fieldMapping2.getJdbcName()).thenReturn("fieldTwo");
     when(fieldMapping2.getPdxName()).thenReturn("fieldTwo");
+    when(fieldMapping2.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
+    columnNames.clear();
+    columnNames.add("fieldOne");
+    columnNames.add("fieldTwo");
+    when(tableMetaDataView.getColumnDataType("fieldOne")).thenReturn(JDBCType.VARCHAR);
+    when(tableMetaDataView.getColumnDataType("fieldTwo")).thenReturn(JDBCType.VARCHAR);
     when(regionMapping.getFieldMappings()).thenReturn(Arrays.asList(fieldMapping1, fieldMapping2));
+    when(regionMapping.getIds()).thenReturn("fieldOne,fieldTwo");
     createSqlHandler();
 
     handler.write(region, Operation.DESTROY, destroyKey, value);
@@ -616,10 +668,18 @@ public class SqlHandlerTest {
     FieldMapping fieldMapping1 = mock(FieldMapping.class);
     when(fieldMapping1.getJdbcName()).thenReturn("fieldOne");
     when(fieldMapping1.getPdxName()).thenReturn("fieldOne");
+    when(fieldMapping1.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
     FieldMapping fieldMapping2 = mock(FieldMapping.class);
     when(fieldMapping2.getJdbcName()).thenReturn("fieldTwo");
     when(fieldMapping2.getPdxName()).thenReturn("fieldTwo");
+    when(fieldMapping2.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
+    columnNames.clear();
+    columnNames.add("fieldOne");
+    columnNames.add("fieldTwo");
+    when(tableMetaDataView.getColumnDataType("fieldOne")).thenReturn(JDBCType.VARCHAR);
+    when(tableMetaDataView.getColumnDataType("fieldTwo")).thenReturn(JDBCType.VARCHAR);
     when(regionMapping.getFieldMappings()).thenReturn(Arrays.asList(fieldMapping1, fieldMapping2));
+    when(regionMapping.getIds()).thenReturn("fieldOne,fieldTwo");
     createSqlHandler();
 
     EntryColumnData entryColumnData =
@@ -635,7 +695,7 @@ public class SqlHandlerTest {
   }
 
   @Test
-  public void getEntryColumnDataGivenWrongNumberOfCompositeKeyFieldsFails() throws Exception {
+  public void getEntryColumnDataGivenWrongNumberOfCompositeKeyFieldsFails() {
     PdxInstance compositeKey = mock(PdxInstance.class);
     when(compositeKey.isDeserializable()).thenReturn(false);
     when(compositeKey.getFieldNames()).thenReturn(Arrays.asList("fieldOne"));
@@ -643,10 +703,18 @@ public class SqlHandlerTest {
     FieldMapping fieldMapping1 = mock(FieldMapping.class);
     when(fieldMapping1.getJdbcName()).thenReturn("fieldOne");
     when(fieldMapping1.getPdxName()).thenReturn("fieldOne");
+    when(fieldMapping1.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
     FieldMapping fieldMapping2 = mock(FieldMapping.class);
     when(fieldMapping2.getJdbcName()).thenReturn("fieldTwo");
     when(fieldMapping2.getPdxName()).thenReturn("fieldTwo");
+    when(fieldMapping2.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
     when(regionMapping.getFieldMappings()).thenReturn(Arrays.asList(fieldMapping1, fieldMapping2));
+    columnNames.clear();
+    columnNames.add("fieldOne");
+    columnNames.add("fieldTwo");
+    when(tableMetaDataView.getColumnDataType("fieldOne")).thenReturn(JDBCType.VARCHAR);
+    when(tableMetaDataView.getColumnDataType("fieldTwo")).thenReturn(JDBCType.VARCHAR);
+    when(regionMapping.getIds()).thenReturn("fieldOne,fieldTwo");
     createSqlHandler();
     thrown.expect(JdbcConnectorException.class);
     thrown.expectMessage(
@@ -656,7 +724,7 @@ public class SqlHandlerTest {
   }
 
   @Test
-  public void getEntryColumnDataGivenWrongFieldNameInCompositeKeyFails() throws Exception {
+  public void getEntryColumnDataGivenWrongFieldNameInCompositeKeyFails() {
     Object compositeKeyFieldValueOne = "fieldValueOne";
     Object compositeKeyFieldValueTwo = "fieldValueTwo";
     PdxInstance compositeKey = mock(PdxInstance.class);
@@ -668,15 +736,26 @@ public class SqlHandlerTest {
     FieldMapping fieldMapping1 = mock(FieldMapping.class);
     when(fieldMapping1.getJdbcName()).thenReturn("fieldOne");
     when(fieldMapping1.getPdxName()).thenReturn("fieldOne");
+    when(fieldMapping1.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
     FieldMapping fieldMapping2 = mock(FieldMapping.class);
     when(fieldMapping2.getJdbcName()).thenReturn("fieldTwo");
     when(fieldMapping2.getPdxName()).thenReturn("fieldTwo");
+    when(fieldMapping2.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
     FieldMapping fieldMapping3 = mock(FieldMapping.class);
     String nonKeyColumn = "fieldTwoWrong";
     when(fieldMapping3.getJdbcName()).thenReturn(nonKeyColumn);
     when(fieldMapping3.getPdxName()).thenReturn(nonKeyColumn);
+    when(fieldMapping3.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
     when(regionMapping.getFieldMappings())
         .thenReturn(Arrays.asList(fieldMapping1, fieldMapping2, fieldMapping3));
+    columnNames.clear();
+    columnNames.add("fieldOne");
+    columnNames.add("fieldTwo");
+    columnNames.add("fieldTwoWrong");
+    when(tableMetaDataView.getColumnDataType("fieldOne")).thenReturn(JDBCType.VARCHAR);
+    when(tableMetaDataView.getColumnDataType("fieldTwo")).thenReturn(JDBCType.VARCHAR);
+    when(tableMetaDataView.getColumnDataType("fieldTwoWrong")).thenReturn(JDBCType.VARCHAR);
+    when(regionMapping.getIds()).thenReturn("fieldOne,fieldTwo");
     createSqlHandler();
     thrown.expect(JdbcConnectorException.class);
     thrown.expectMessage("The key \"" + compositeKey
@@ -686,12 +765,53 @@ public class SqlHandlerTest {
   }
 
   @Test
-  public void returnsCorrectColumnsForUpdate() throws Exception {
+  public void getEntryColumnDataGivenUnknownFieldNameInCompositeKeyFails() {
+    Object compositeKeyFieldValueOne = "fieldValueOne";
+    Object compositeKeyFieldValueTwo = "fieldValueTwo";
+    PdxInstance compositeKey = mock(PdxInstance.class);
+    when(compositeKey.isDeserializable()).thenReturn(false);
+    when(compositeKey.getFieldNames()).thenReturn(Arrays.asList("fieldOne", "fieldTwoUnknown"));
+    when(compositeKey.getField("fieldOne")).thenReturn(compositeKeyFieldValueOne);
+    when(compositeKey.getField("fieldTwoUnknown")).thenReturn(compositeKeyFieldValueTwo);
+    when(tableMetaDataView.getKeyColumnNames()).thenReturn(Arrays.asList("fieldOne", "fieldTwo"));
+    FieldMapping fieldMapping1 = mock(FieldMapping.class);
+    when(fieldMapping1.getJdbcName()).thenReturn("fieldOne");
+    when(fieldMapping1.getPdxName()).thenReturn("fieldOne");
+    when(fieldMapping1.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
+    FieldMapping fieldMapping2 = mock(FieldMapping.class);
+    when(fieldMapping2.getJdbcName()).thenReturn("fieldTwo");
+    when(fieldMapping2.getPdxName()).thenReturn("fieldTwo");
+    when(fieldMapping2.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
+    FieldMapping fieldMapping3 = mock(FieldMapping.class);
+    String nonKeyColumn = "fieldTwoWrong";
+    when(fieldMapping3.getJdbcName()).thenReturn(nonKeyColumn);
+    when(fieldMapping3.getPdxName()).thenReturn(nonKeyColumn);
+    when(fieldMapping3.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
+    when(regionMapping.getFieldMappings())
+        .thenReturn(Arrays.asList(fieldMapping1, fieldMapping2, fieldMapping3));
+    columnNames.clear();
+    columnNames.add("fieldOne");
+    columnNames.add("fieldTwo");
+    columnNames.add("fieldTwoWrong");
+    when(tableMetaDataView.getColumnDataType("fieldOne")).thenReturn(JDBCType.VARCHAR);
+    when(tableMetaDataView.getColumnDataType("fieldTwo")).thenReturn(JDBCType.VARCHAR);
+    when(tableMetaDataView.getColumnDataType("fieldTwoWrong")).thenReturn(JDBCType.VARCHAR);
+    when(regionMapping.getIds()).thenReturn("fieldOne,fieldTwo");
+    createSqlHandler();
+    thrown.expect(JdbcConnectorException.class);
+    thrown.expectMessage("The key \"" + compositeKey
+        + "\" has the field \"fieldTwoUnknown\" which does not match any of the key columns: [fieldOne, fieldTwo]");
+
+    handler.getEntryColumnData(tableMetaDataView, compositeKey, value, Operation.GET);
+  }
+
+  @Test
+  public void returnsCorrectColumnsForUpdate() {
     testGetEntryColumnDataForCreateOrUpdate(Operation.UPDATE);
   }
 
   @Test
-  public void returnsCorrectColumnsForCreate() throws Exception {
+  public void returnsCorrectColumnsForCreate() {
     testGetEntryColumnDataForCreateOrUpdate(Operation.CREATE);
   }
 
@@ -701,10 +821,17 @@ public class SqlHandlerTest {
     FieldMapping fieldMapping1 = mock(FieldMapping.class);
     when(fieldMapping1.getJdbcName()).thenReturn(KEY_COLUMN);
     when(fieldMapping1.getPdxName()).thenReturn(KEY_COLUMN);
+    when(fieldMapping1.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
     FieldMapping fieldMapping2 = mock(FieldMapping.class);
     when(fieldMapping2.getJdbcName()).thenReturn(nonKeyColumn);
     when(fieldMapping2.getPdxName()).thenReturn(nonKeyColumn);
+    when(fieldMapping2.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
     when(regionMapping.getFieldMappings()).thenReturn(Arrays.asList(fieldMapping1, fieldMapping2));
+    columnNames.clear();
+    columnNames.add(KEY_COLUMN);
+    columnNames.add("otherColumn");
+    when(tableMetaDataView.getColumnDataType("otherColumn")).thenReturn(JDBCType.VARCHAR);
+    when(tableMetaDataView.getColumnDataType(KEY_COLUMN)).thenReturn(JDBCType.VARCHAR);
     createSqlHandler();
 
     EntryColumnData entryColumnData =
@@ -719,107 +846,55 @@ public class SqlHandlerTest {
         .isEqualTo(KEY_COLUMN);
   }
 
-  @Test
-  public void getEntryColumnDataReturnsCorrectColumnNameWhenPdxNameIsEmpty() {
-    String nonKeyColumn = "otherColumn";
-    when(value.getFieldNames()).thenReturn(Arrays.asList(KEY_COLUMN, nonKeyColumn));
-    FieldMapping fieldMapping1 = mock(FieldMapping.class);
-    when(fieldMapping1.getJdbcName()).thenReturn(KEY_COLUMN);
-    when(fieldMapping1.getPdxName()).thenReturn(KEY_COLUMN);
-    FieldMapping fieldMapping2 = mock(FieldMapping.class);
-    when(fieldMapping2.getJdbcName()).thenReturn(nonKeyColumn);
-    when(fieldMapping2.getPdxName()).thenReturn("");
-    when(regionMapping.getFieldMappings()).thenReturn(Arrays.asList(fieldMapping1, fieldMapping2));
-    createSqlHandler();
-
-    EntryColumnData entryColumnData =
-        handler.getEntryColumnData(tableMetaDataView, key, value, Operation.CREATE);
 
-    assertThat(entryColumnData.getEntryKeyColumnData()).isNotNull();
-    assertThat(entryColumnData.getEntryValueColumnData()).hasSize(1);
-    assertThat(entryColumnData.getEntryValueColumnData().get(0).getColumnName())
-        .isEqualTo(nonKeyColumn);
-    assertThat(entryColumnData.getEntryKeyColumnData()).hasSize(1);
-    assertThat(entryColumnData.getEntryKeyColumnData().get(0).getColumnName())
-        .isEqualTo(KEY_COLUMN);
+  @Test
+  public void returnsCorrectColumnsForUpdateWithExtraPdxField() {
+    testGetEntryColumnDataForCreateOrUpdateWithExtraPdxField(Operation.UPDATE);
   }
 
   @Test
-  public void getEntryColumnDataReturnsCorrectColumnNameWhenPdxNameIsEmptyAndJdbcNameMatchesInexactly() {
+  public void returnsCorrectColumnsForCreateWithExtraPdxField() {
+    testGetEntryColumnDataForCreateOrUpdateWithExtraPdxField(Operation.CREATE);
+  }
+
+  private void testGetEntryColumnDataForCreateOrUpdateWithExtraPdxField(Operation operation) {
     String nonKeyColumn = "otherColumn";
-    when(value.getFieldNames()).thenReturn(Arrays.asList(KEY_COLUMN, nonKeyColumn));
+    when(value.getFieldNames()).thenReturn(Arrays.asList(KEY_COLUMN, nonKeyColumn, "extraField"));
     FieldMapping fieldMapping1 = mock(FieldMapping.class);
     when(fieldMapping1.getJdbcName()).thenReturn(KEY_COLUMN);
     when(fieldMapping1.getPdxName()).thenReturn(KEY_COLUMN);
+    when(fieldMapping1.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
     FieldMapping fieldMapping2 = mock(FieldMapping.class);
-    when(fieldMapping2.getJdbcName()).thenReturn(nonKeyColumn.toUpperCase());
-    when(fieldMapping2.getPdxName()).thenReturn("");
+    when(fieldMapping2.getJdbcName()).thenReturn(nonKeyColumn);
+    when(fieldMapping2.getPdxName()).thenReturn(nonKeyColumn);
+    when(fieldMapping2.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
     when(regionMapping.getFieldMappings()).thenReturn(Arrays.asList(fieldMapping1, fieldMapping2));
+    columnNames.clear();
+    columnNames.add(KEY_COLUMN);
+    columnNames.add("otherColumn");
+    when(tableMetaDataView.getColumnDataType("otherColumn")).thenReturn(JDBCType.VARCHAR);
+    when(tableMetaDataView.getColumnDataType(KEY_COLUMN)).thenReturn(JDBCType.VARCHAR);
     createSqlHandler();
 
     EntryColumnData entryColumnData =
-        handler.getEntryColumnData(tableMetaDataView, key, value, Operation.CREATE);
+        handler.getEntryColumnData(tableMetaDataView, key, value, operation);
 
     assertThat(entryColumnData.getEntryKeyColumnData()).isNotNull();
     assertThat(entryColumnData.getEntryValueColumnData()).hasSize(1);
     assertThat(entryColumnData.getEntryValueColumnData().get(0).getColumnName())
-        .isEqualTo(nonKeyColumn.toUpperCase());
+        .isEqualTo(nonKeyColumn);
     assertThat(entryColumnData.getEntryKeyColumnData()).hasSize(1);
     assertThat(entryColumnData.getEntryKeyColumnData().get(0).getColumnName())
         .isEqualTo(KEY_COLUMN);
   }
 
   @Test
-  public void getEntryColumnDataThrowsWhenPdxNameIsEmptyAndJdbcNameMatchesInexactlyMultipleTimes() {
-    String nonKeyColumn = "otherColumn";
-    when(value.getFieldNames()).thenReturn(Arrays.asList(KEY_COLUMN, nonKeyColumn));
-    FieldMapping fieldMapping1 = mock(FieldMapping.class);
-    when(fieldMapping1.getJdbcName()).thenReturn(KEY_COLUMN);
-    when(fieldMapping1.getPdxName()).thenReturn(KEY_COLUMN);
-    FieldMapping fieldMapping2 = mock(FieldMapping.class);
-    when(fieldMapping2.getJdbcName()).thenReturn(nonKeyColumn.toUpperCase());
-    when(fieldMapping2.getPdxName()).thenReturn("");
-    FieldMapping fieldMapping3 = mock(FieldMapping.class);
-    when(fieldMapping3.getJdbcName()).thenReturn(nonKeyColumn.toLowerCase());
-    when(fieldMapping3.getPdxName()).thenReturn("");
-    when(regionMapping.getFieldMappings())
-        .thenReturn(Arrays.asList(fieldMapping1, fieldMapping2, fieldMapping3));
-    createSqlHandler();
-    thrown.expect(JdbcConnectorException.class);
-    thrown.expectMessage("Multiple columns matched the pdx field \"" + nonKeyColumn + "\".");
-
-    handler.getEntryColumnData(tableMetaDataView, key, value, Operation.CREATE);
-  }
-
-  @Test
-  public void getEntryColumnDataThrowsWhenPdxNameIsEmptyAndJdbcNameDoesNotMatch() {
-    String nonKeyColumn = "otherColumn";
-    when(value.getFieldNames()).thenReturn(Arrays.asList(KEY_COLUMN, nonKeyColumn));
-    FieldMapping fieldMapping1 = mock(FieldMapping.class);
-    when(fieldMapping1.getJdbcName()).thenReturn(KEY_COLUMN);
-    when(fieldMapping1.getPdxName()).thenReturn(KEY_COLUMN);
-    FieldMapping fieldMapping2 = mock(FieldMapping.class);
-    when(fieldMapping2.getJdbcName()).thenReturn(nonKeyColumn.toUpperCase() + "noMatch");
-    when(fieldMapping2.getPdxName()).thenReturn("");
-    FieldMapping fieldMapping3 = mock(FieldMapping.class);
-    when(fieldMapping3.getJdbcName()).thenReturn("noMatch" + nonKeyColumn.toLowerCase());
-    when(fieldMapping3.getPdxName()).thenReturn("");
-    when(regionMapping.getFieldMappings())
-        .thenReturn(Arrays.asList(fieldMapping1, fieldMapping2, fieldMapping3));
-    createSqlHandler();
-    thrown.expect(JdbcConnectorException.class);
-    thrown.expectMessage("No column matched the pdx field \"" + nonKeyColumn + "\".");
-
-    handler.getEntryColumnData(tableMetaDataView, key, value, Operation.CREATE);
-  }
-
-  @Test
-  public void returnsCorrectColumnsForUpdateWithCompositeKey() throws Exception {
+  public void returnsCorrectColumnsForUpdateWithCompositeKey() {
     testGetEntryColumnDataForCreateOrUpdateWithCompositeKey(Operation.UPDATE);
   }
 
   @Test
-  public void returnsCorrectColumnsForCreateWithCompositeKey() throws Exception {
+  public void returnsCorrectColumnsForCreateWithCompositeKey() {
     testGetEntryColumnDataForCreateOrUpdateWithCompositeKey(Operation.CREATE);
   }
 
@@ -835,18 +910,28 @@ public class SqlHandlerTest {
     FieldMapping fieldMapping1 = mock(FieldMapping.class);
     when(fieldMapping1.getJdbcName()).thenReturn("fieldOne");
     when(fieldMapping1.getPdxName()).thenReturn("fieldOne");
+    when(fieldMapping1.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
     FieldMapping fieldMapping2 = mock(FieldMapping.class);
     when(fieldMapping2.getJdbcName()).thenReturn("fieldTwo");
     when(fieldMapping2.getPdxName()).thenReturn("fieldTwo");
+    when(fieldMapping2.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
     FieldMapping fieldMapping3 = mock(FieldMapping.class);
     String nonKeyColumn = "otherColumn";
     when(fieldMapping3.getJdbcName()).thenReturn(nonKeyColumn);
     when(fieldMapping3.getPdxName()).thenReturn(nonKeyColumn);
+    when(fieldMapping3.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
+    columnNames.clear();
+    columnNames.add("fieldOne");
+    columnNames.add("fieldTwo");
+    columnNames.add("otherColumn");
+    when(tableMetaDataView.getColumnDataType("fieldOne")).thenReturn(JDBCType.VARCHAR);
+    when(tableMetaDataView.getColumnDataType("fieldTwo")).thenReturn(JDBCType.VARCHAR);
+    when(tableMetaDataView.getColumnDataType("otherColumn")).thenReturn(JDBCType.VARCHAR);
+    when(regionMapping.getIds()).thenReturn("fieldOne,fieldTwo");
     when(regionMapping.getFieldMappings())
         .thenReturn(Arrays.asList(fieldMapping1, fieldMapping2, fieldMapping3));
     createSqlHandler();
     when(value.getFieldNames()).thenReturn(Arrays.asList("fieldOne", "fieldTwo", nonKeyColumn));
-
     EntryColumnData entryColumnData =
         handler.getEntryColumnData(tableMetaDataView, compositeKey, value, operation);
 
@@ -862,7 +947,7 @@ public class SqlHandlerTest {
   }
 
   @Test
-  public void returnsCorrectColumnForDestroyWithCompositeKey() throws Exception {
+  public void returnsCorrectColumnForDestroyWithCompositeKey() {
     Object compositeKeyFieldValueOne = "fieldValueOne";
     Object compositeKeyFieldValueTwo = "fieldValueTwo";
     PdxInstance compositeKey = mock(PdxInstance.class);
@@ -874,10 +959,18 @@ public class SqlHandlerTest {
     FieldMapping fieldMapping1 = mock(FieldMapping.class);
     when(fieldMapping1.getJdbcName()).thenReturn("fieldOne");
     when(fieldMapping1.getPdxName()).thenReturn("fieldOne");
+    when(fieldMapping1.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
     FieldMapping fieldMapping2 = mock(FieldMapping.class);
     when(fieldMapping2.getJdbcName()).thenReturn("fieldTwo");
     when(fieldMapping2.getPdxName()).thenReturn("fieldTwo");
+    when(fieldMapping2.getJdbcType()).thenReturn(JDBCType.VARCHAR.getName());
+    columnNames.clear();
+    columnNames.add("fieldOne");
+    columnNames.add("fieldTwo");
+    when(tableMetaDataView.getColumnDataType("fieldOne")).thenReturn(JDBCType.VARCHAR);
+    when(tableMetaDataView.getColumnDataType("fieldTwo")).thenReturn(JDBCType.VARCHAR);
     when(regionMapping.getFieldMappings()).thenReturn(Arrays.asList(fieldMapping1, fieldMapping2));
+    when(regionMapping.getIds()).thenReturn("fieldOne,fieldTwo");
     createSqlHandler();
 
     EntryColumnData entryColumnData =
@@ -897,7 +990,7 @@ public class SqlHandlerTest {
   }
 
   @Test
-  public void returnsCorrectColumnForDestroy() throws Exception {
+  public void returnsCorrectColumnForDestroy() {
     EntryColumnData entryColumnData =
         handler.getEntryColumnData(tableMetaDataView, key, value, Operation.DESTROY);
 
@@ -909,7 +1002,7 @@ public class SqlHandlerTest {
   }
 
   @Test
-  public void getEntryColumnDataWhenMultipleIdColumnsGivenNonPdxInstanceFails() throws Exception {
+  public void getEntryColumnDataWhenMultipleIdColumnsGivenNonPdxInstanceFails() {
     when(tableMetaDataView.getKeyColumnNames()).thenReturn(Arrays.asList("fieldOne", "fieldTwo"));
     Object nonCompositeKey = Integer.valueOf(123);
     thrown.expect(JdbcConnectorException.class);
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceTest.java
index 1321984..9c97fc1 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceTest.java
@@ -118,12 +118,12 @@ public class SqlToPdxInstanceTest {
     when(resultSet.getString(1)).thenReturn("column1");
     when(resultSet.getString(2)).thenReturn("column2");
     sqlToPdxInstance.addMapping(COLUMN_NAME_2, PDX_FIELD_NAME_2, FieldType.STRING);
+    thrown.expect(JdbcConnectorException.class);
+    thrown.expectMessage("The jdbc-mapping does not contain the column name \""
+        + COLUMN_NAME_1
+        + "\". This is probably caused by a column being added to the table after the jdbc-mapping was created.");
 
-    PdxInstance result = createPdxInstance();
-
-    assertThat(result).isSameAs(writablePdxInstance);
-    verify((WritablePdxInstance) result).setField(PDX_FIELD_NAME_2, "column2");
-    verifyNoMoreInteractions(result);
+    createPdxInstance();
   }
 
   @Test
@@ -318,6 +318,10 @@ public class SqlToPdxInstanceTest {
 
   @Test
   public void throwsExceptionIfMoreThanOneResultReturned() throws Exception {
+    when(metaData.getColumnCount()).thenReturn(2);
+    when(metaData.getColumnName(2)).thenReturn(COLUMN_NAME_2);
+    sqlToPdxInstance.addMapping(COLUMN_NAME_1, PDX_FIELD_NAME_1, FieldType.STRING);
+    sqlToPdxInstance.addMapping(COLUMN_NAME_2, PDX_FIELD_NAME_2, FieldType.STRING);
     when(resultSet.next()).thenReturn(true);
     thrown.expect(JdbcConnectorException.class);
     thrown.expectMessage("Multiple rows returned for query: ");
diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/DescribeMappingCommandTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/DescribeMappingCommandTest.java
index a744fcf..fa8ed14 100644
--- a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/DescribeMappingCommandTest.java
+++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/DescribeMappingCommandTest.java
@@ -20,6 +20,7 @@ import static org.apache.geode.connectors.util.internal.MappingConstants.ID_NAME
 import static org.apache.geode.connectors.util.internal.MappingConstants.PDX_NAME;
 import static org.apache.geode.connectors.util.internal.MappingConstants.REGION_NAME;
 import static org.apache.geode.connectors.util.internal.MappingConstants.SCHEMA_NAME;
+import static org.apache.geode.connectors.util.internal.MappingConstants.SPECIFIED_ID_NAME;
 import static org.apache.geode.connectors.util.internal.MappingConstants.SYNCHRONOUS_NAME;
 import static org.apache.geode.connectors.util.internal.MappingConstants.TABLE_NAME;
 import static org.mockito.Mockito.mock;
@@ -88,6 +89,7 @@ public class DescribeMappingCommandTest {
     regionMapping.setTableName("table1");
     regionMapping.setDataSourceName("name1");
     regionMapping.setIds("myId");
+    regionMapping.setSpecifiedIds(true);
     regionMapping.setCatalog("myCatalog");
     regionMapping.setSchema("mySchema");
     ArrayList<CacheElement> elements = new ArrayList<>();
@@ -106,6 +108,7 @@ public class DescribeMappingCommandTest {
     regionMapping.setTableName("table1");
     regionMapping.setDataSourceName("name1");
     regionMapping.setIds("myId");
+    regionMapping.setSpecifiedIds(true);
     regionMapping.setCatalog("myCatalog");
     regionMapping.setSchema("mySchema");
     ArrayList<CacheElement> elements = new ArrayList<>();
@@ -125,6 +128,7 @@ public class DescribeMappingCommandTest {
     regionMapping.setTableName("table1");
     regionMapping.setDataSourceName("name1");
     regionMapping.setIds("myId");
+    regionMapping.setSpecifiedIds(true);
     regionMapping.setCatalog("myCatalog");
     regionMapping.setSchema("mySchema");
     ArrayList<CacheElement> elements = new ArrayList<>();
@@ -143,6 +147,7 @@ public class DescribeMappingCommandTest {
     regionMapping.setTableName("table1");
     regionMapping.setDataSourceName("name1");
     regionMapping.setIds("myId");
+    regionMapping.setSpecifiedIds(true);
     regionMapping.setCatalog("myCatalog");
     regionMapping.setSchema("mySchema");
     when(clusterConfig.getRegions()).thenReturn(new ArrayList<>());
@@ -159,6 +164,7 @@ public class DescribeMappingCommandTest {
     regionMapping.setTableName("table1");
     regionMapping.setDataSourceName("name1");
     regionMapping.setIds("myId");
+    regionMapping.setSpecifiedIds(true);
     regionMapping.setCatalog("myCatalog");
     regionMapping.setSchema("mySchema");
     when(clusterConfig.getRegions()).thenReturn(new ArrayList<>());
@@ -176,6 +182,7 @@ public class DescribeMappingCommandTest {
     regionMapping.setTableName("table1");
     regionMapping.setDataSourceName("name1");
     regionMapping.setIds("myId");
+    regionMapping.setSpecifiedIds(true);
     regionMapping.setCatalog("myCatalog");
     regionMapping.setSchema("mySchema");
     ArrayList<CacheElement> elements = new ArrayList<>();
@@ -185,10 +192,40 @@ public class DescribeMappingCommandTest {
     gfsh.executeAndAssertThat(command, COMMAND).statusIsSuccess()
         .containsOrderedOutput(DescribeMappingCommand.RESULT_SECTION_NAME + "0", REGION_NAME,
             PDX_NAME,
-            TABLE_NAME, DATA_SOURCE_NAME, SYNCHRONOUS_NAME, ID_NAME, CATALOG_NAME, SCHEMA_NAME)
+            TABLE_NAME, DATA_SOURCE_NAME, SYNCHRONOUS_NAME, ID_NAME, SPECIFIED_ID_NAME,
+            CATALOG_NAME, SCHEMA_NAME)
         .containsOutput(REGION_NAME, "region1")
         .containsOutput(DATA_SOURCE_NAME, "name1").containsOutput(TABLE_NAME, "table1")
         .containsOutput(PDX_NAME, "class1").containsOutput(ID_NAME, "myId")
+        .containsOutput(SPECIFIED_ID_NAME, "true")
+        .containsOutput(SCHEMA_NAME, "mySchema").containsOutput(CATALOG_NAME, "myCatalog")
+        .containsOutput("true");
+  }
+
+  @Test
+  public void commandSuccessWhenClusterConfigFoundAndRegionConfigFoundIdNotSpecified() {
+    RegionMapping regionMapping = new RegionMapping();
+    regionMapping.setRegionName("region1");
+    regionMapping.setPdxName("class1");
+    regionMapping.setTableName("table1");
+    regionMapping.setDataSourceName("name1");
+    regionMapping.setIds("myId");
+    regionMapping.setSpecifiedIds(false);
+    regionMapping.setCatalog("myCatalog");
+    regionMapping.setSchema("mySchema");
+    ArrayList<CacheElement> elements = new ArrayList<>();
+    elements.add(regionMapping);
+    when(regionConfig.getCustomRegionElements()).thenReturn(elements);
+
+    gfsh.executeAndAssertThat(command, COMMAND).statusIsSuccess()
+        .containsOrderedOutput(DescribeMappingCommand.RESULT_SECTION_NAME + "0", REGION_NAME,
+            PDX_NAME,
+            TABLE_NAME, DATA_SOURCE_NAME, SYNCHRONOUS_NAME, ID_NAME, SPECIFIED_ID_NAME,
+            CATALOG_NAME, SCHEMA_NAME)
+        .containsOutput(REGION_NAME, "region1")
+        .containsOutput(DATA_SOURCE_NAME, "name1").containsOutput(TABLE_NAME, "table1")
+        .containsOutput(PDX_NAME, "class1").containsOutput(ID_NAME, "myId")
+        .containsOutput(SPECIFIED_ID_NAME, "false")
         .containsOutput(SCHEMA_NAME, "mySchema").containsOutput(CATALOG_NAME, "myCatalog")
         .containsOutput("true");
   }
@@ -210,6 +247,7 @@ public class DescribeMappingCommandTest {
     regionMapping.setTableName("table1");
     regionMapping.setDataSourceName("name1");
     regionMapping.setIds("myId");
+    regionMapping.setSpecifiedIds(true);
     regionMapping.setCatalog("myCatalog");
     regionMapping.setSchema("mySchema");
     ArrayList<CacheElement> elements = new ArrayList<>();
@@ -224,10 +262,12 @@ public class DescribeMappingCommandTest {
     gfsh.executeAndAssertThat(command, COMMAND).statusIsSuccess()
         .containsOrderedOutput(DescribeMappingCommand.RESULT_SECTION_NAME + "0", REGION_NAME,
             PDX_NAME,
-            TABLE_NAME, DATA_SOURCE_NAME, SYNCHRONOUS_NAME, ID_NAME, CATALOG_NAME, SCHEMA_NAME)
+            TABLE_NAME, DATA_SOURCE_NAME, SYNCHRONOUS_NAME, ID_NAME, SPECIFIED_ID_NAME,
+            CATALOG_NAME, SCHEMA_NAME)
         .containsOutput(REGION_NAME, "region1")
         .containsOutput(DATA_SOURCE_NAME, "name1").containsOutput(TABLE_NAME, "table1")
         .containsOutput(PDX_NAME, "class1").containsOutput(ID_NAME, "myId")
+        .containsOutput(SPECIFIED_ID_NAME, "true")
         .containsOutput(SCHEMA_NAME, "mySchema").containsOutput(CATALOG_NAME, "myCatalog")
         .containsOutput("synchronous", "false");
   }
@@ -240,6 +280,7 @@ public class DescribeMappingCommandTest {
     regionMapping.setTableName("table1");
     regionMapping.setDataSourceName("name1");
     regionMapping.setIds("myId");
+    regionMapping.setSpecifiedIds(false);
     regionMapping.setCatalog("myCatalog");
     regionMapping.setSchema("mySchema");
     FieldMapping fieldMapping =
@@ -255,10 +296,12 @@ public class DescribeMappingCommandTest {
     gfsh.executeAndAssertThat(command, COMMAND).statusIsSuccess()
         .containsOrderedOutput(DescribeMappingCommand.RESULT_SECTION_NAME + "0", REGION_NAME,
             PDX_NAME,
-            TABLE_NAME, DATA_SOURCE_NAME, SYNCHRONOUS_NAME, ID_NAME, CATALOG_NAME, SCHEMA_NAME)
+            TABLE_NAME, DATA_SOURCE_NAME, SYNCHRONOUS_NAME, ID_NAME, SPECIFIED_ID_NAME,
+            CATALOG_NAME, SCHEMA_NAME)
         .containsOutput(REGION_NAME, "region1")
         .containsOutput(DATA_SOURCE_NAME, "name1").containsOutput(TABLE_NAME, "table1")
         .containsOutput(PDX_NAME, "class1").containsOutput(ID_NAME, "myId")
+        .containsOutput(SPECIFIED_ID_NAME, "true")
         .containsOutput(SCHEMA_NAME, "mySchema").containsOutput(CATALOG_NAME, "myCatalog")
         .containsOutput("synchronous", "false")
         .containsOutput("pdxName1", "pdxType1", "jdbcName1", "jdbcType1", "true")
@@ -273,6 +316,7 @@ public class DescribeMappingCommandTest {
     regionMapping.setTableName("table1");
     regionMapping.setDataSourceName("name1");
     regionMapping.setIds("myId");
+    regionMapping.setSpecifiedIds(false);
     regionMapping.setCatalog("myCatalog");
     regionMapping.setSchema("mySchema");
     ArrayList<CacheElement> elements = new ArrayList<>();
@@ -285,10 +329,12 @@ public class DescribeMappingCommandTest {
     gfsh.executeAndAssertThat(command, COMMAND + " --groups=group1").statusIsSuccess()
         .containsOrderedOutput(DescribeMappingCommand.RESULT_SECTION_NAME + "0", REGION_NAME,
             PDX_NAME,
-            TABLE_NAME, DATA_SOURCE_NAME, SYNCHRONOUS_NAME, ID_NAME, CATALOG_NAME, SCHEMA_NAME)
+            TABLE_NAME, DATA_SOURCE_NAME, SYNCHRONOUS_NAME, ID_NAME, SPECIFIED_ID_NAME,
+            CATALOG_NAME, SCHEMA_NAME)
         .containsOutput(REGION_NAME, "region1")
         .containsOutput(DATA_SOURCE_NAME, "name1").containsOutput(TABLE_NAME, "table1")
         .containsOutput(PDX_NAME, "class1").containsOutput(ID_NAME, "myId")
+        .containsOutput(SPECIFIED_ID_NAME, "true")
         .containsOutput(SCHEMA_NAME, "mySchema").containsOutput(CATALOG_NAME, "myCatalog")
         .containsOutput("true");
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 713cf5e..741d86e 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -4266,7 +4266,6 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
          * Now replace all replaceable system properties here using {@code PropertyResolver}
          */
         String replacedXmlString = this.resolver.processUnresolvableString(stringWriter.toString());
-
         /*
          * Turn the string back into the default encoding so that the XML parser can work correctly
          * in the presence of an "encoding" attribute in the XML prolog.


Mime
View raw message