apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From csi...@apache.org
Subject incubator-apex-malhar git commit: MLHR-1836 #comment Schema integration with JDBC operators
Date Thu, 03 Sep 2015 23:23:52 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/jdbc-schemaIntegration [created] af203f269


MLHR-1836 #comment Schema integration with JDBC operators


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/af203f26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/af203f26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/af203f26

Branch: refs/heads/jdbc-schemaIntegration
Commit: af203f2694ed7dc893f4671695337b7937d27749
Parents: a9bcce1
Author: Chandni Singh <chandni@datatorrent.com>
Authored: Fri Jul 31 17:43:45 2015 -0700
Committer: Chandni Singh <csingh@apache.org>
Committed: Thu Sep 3 16:21:34 2015 -0700

----------------------------------------------------------------------
 .../lib/db/jdbc/JdbcPOJOInputOperator.java      | 582 +++++++++++++++++++
 .../lib/db/jdbc/JdbcPOJOOutputOperator.java     | 315 ++++++----
 .../com/datatorrent/lib/db/jdbc/JdbcStore.java  |  21 +-
 .../lib/db/jdbc/JdbcOperatorTest.java           | 140 +++--
 .../datatorrent/lib/helper/TestPortContext.java |  59 ++
 pom.xml                                         |   3 +
 6 files changed, 940 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/af203f26/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
new file mode 100644
index 0000000..d5a8e1f
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java
@@ -0,0 +1,582 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.math.BigDecimal;
+import java.sql.*;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * A concrete input operator that reads from a database through the JDBC API.<br/>
+ * This operator by default uses the {@link #fieldInfos} and the table name to construct the sql query.<br/>
+ *
+ * A user can provide there own query by setting the {@link #query} property which takes precedence.<br/>
+ *
+ * For eg. user can set the query property to a complex one : "select x1, x2 from t1, t2 where t1.x3 = t2.x3 ;"<br/>
+ *
+ * @displayName Jdbc Input Operator
+ * @category Input
+ * @tags database, sql, pojo, jdbc
+ * @since 2.1.0
+ */
+public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> implements Operator.ActivationListener<Context.OperatorContext>
+{
+  private static int DEF_BATCH_SIZE = 100;
+
+  //table and query definition
+  protected String tableName;
+  protected String whereCondition;
+  protected String groupByClause;
+  protected String havingCondition;
+  protected String orderByExpr;
+  protected String query;
+  protected boolean limitSupported;
+
+  @NotNull
+  protected List<FieldInfo> fieldInfos;
+
+  private long offset;
+
+  @Min(1)
+  private int batchSize;
+
+  private final transient List<ActiveFieldInfo> columnFieldSetters;
+  private transient boolean windowDone;
+
+  protected String columnsExpression;
+  protected List<Integer> columnDataTypes;
+
+  private transient PreparedStatement preparedStatement;
+  protected transient Class<?> pojoClass;
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>()
+  {
+    @Override
+    public void setup(Context.PortContext context)
+    {
+      pojoClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+    }
+  };
+
+  public JdbcPOJOInputOperator()
+  {
+    super();
+    batchSize = DEF_BATCH_SIZE;
+    limitSupported = true;
+    columnFieldSetters = Lists.newArrayList();
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    Preconditions.checkArgument(query != null || tableName != null, "both query and table name are not set");
+    super.setup(context);
+
+    try {
+      //closing the query statement in super class as it is not needed
+      queryStatement.close();
+      if (query == null && columnsExpression == null) {
+        StringBuilder columns = new StringBuilder();
+        for (int i = 0; i < fieldInfos.size(); i++) {
+          columns.append(fieldInfos.get(i).getColumnName());
+          if (i < fieldInfos.size() - 1) {
+            columns.append(",");
+          }
+        }
+        columnsExpression = columns.toString();
+        LOG.debug("select expr {}", columnsExpression);
+      }
+
+      preparedStatement = store.connection.prepareStatement(queryToRetrieveData());
+      if (columnDataTypes == null) {
+        populateColumnDataTypes();
+      }
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+
+    for (FieldInfo fi : fieldInfos) {
+      columnFieldSetters.add(new ActiveFieldInfo(fi));
+    }
+  }
+
+  protected void populateColumnDataTypes() throws SQLException
+  {
+    columnDataTypes = Lists.newArrayList();
+    preparedStatement.setLong(1, 0);
+    preparedStatement.setLong(2, 1);
+    try (ResultSet rs = preparedStatement.executeQuery()) {
+      Map<String, Integer> nameToType = Maps.newHashMap();
+      ResultSetMetaData rsMetaData = rs.getMetaData();
+      LOG.debug("resultSet MetaData column count {}", rsMetaData.getColumnCount());
+
+      for (int i = 1; i <= rsMetaData.getColumnCount(); i++) {
+        int type = rsMetaData.getColumnType(i);
+        String name = rsMetaData.getColumnName(i);
+        LOG.debug("column name {} type {}", name, type);
+        if (query == null) {
+          columnDataTypes.add(type);
+        } else {
+          //when it is a custom query we need to ensure the types are in the same order as field infos
+          nameToType.put(name, type);
+        }
+      }
+
+      if (query != null) {
+        for (FieldInfo fieldInfo : fieldInfos) {
+          columnDataTypes.add(nameToType.get(fieldInfo.getColumnName()));
+        }
+      }
+    }
+  }
+
+  @Override
+  public void beginWindow(long l)
+  {
+    windowDone = false;
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    if (!windowDone) {
+      try {
+        preparedStatement.setLong(1, offset);
+        preparedStatement.setLong(2, batchSize);
+
+        ResultSet resultSet = preparedStatement.executeQuery();
+        if (resultSet.next()) {
+          do {
+            Object tuple = getTuple(resultSet);
+            outputPort.emit(tuple);
+            offset++;
+          }
+          while (resultSet.next());
+        } else {
+          windowDone = true;
+        }
+        resultSet.close();
+      } catch (SQLException ex) {
+        store.disconnect();
+        throw new RuntimeException(ex);
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Object getTuple(ResultSet result)
+  {
+    Object obj;
+    try {
+      obj = pojoClass.newInstance();
+    } catch (InstantiationException | IllegalAccessException ex) {
+      store.disconnect();
+      throw new RuntimeException(ex);
+    }
+
+    try {
+      for (int i = 0; i < fieldInfos.size(); i++) {
+        int type = columnDataTypes.get(i);
+        ActiveFieldInfo afi = columnFieldSetters.get(i);
+
+        switch (type) {
+          case Types.CHAR:
+          case Types.VARCHAR:
+            String strVal = result.getString(i + 1);
+            ((PojoUtils.Setter<Object, String>) afi.setterOrGetter).set(obj, strVal);
+            break;
+
+          case Types.BOOLEAN:
+            boolean boolVal = result.getBoolean(i + 1);
+            ((PojoUtils.SetterBoolean<Object>) afi.setterOrGetter).set(obj, boolVal);
+            break;
+
+          case Types.TINYINT:
+            byte byteVal = result.getByte(i + 1);
+            ((PojoUtils.SetterByte<Object>) afi.setterOrGetter).set(obj, byteVal);
+            break;
+
+          case Types.SMALLINT:
+            short shortVal = result.getShort(i + 1);
+            ((PojoUtils.SetterShort<Object>) afi.setterOrGetter).set(obj, shortVal);
+            break;
+
+          case Types.INTEGER:
+            int intVal = result.getInt(i + 1);
+            ((PojoUtils.SetterInt<Object>) afi.setterOrGetter).set(obj, intVal);
+            break;
+
+          case Types.BIGINT:
+            long longVal = result.getLong(i + 1);
+            ((PojoUtils.SetterLong<Object>) afi.setterOrGetter).set(obj, longVal);
+            break;
+
+          case Types.FLOAT:
+            float floatVal = result.getFloat(i + 1);
+            ((PojoUtils.SetterFloat<Object>) afi.setterOrGetter).set(obj, floatVal);
+            break;
+
+          case Types.DOUBLE:
+            double doubleVal = result.getDouble(i + 1);
+            ((PojoUtils.SetterDouble<Object>) afi.setterOrGetter).set(obj, doubleVal);
+            break;
+
+          case Types.DECIMAL:
+            BigDecimal bdVal = result.getBigDecimal(i + 1);
+            ((PojoUtils.Setter<Object, BigDecimal>) afi.setterOrGetter).set(obj, bdVal);
+            break;
+
+          case Types.TIMESTAMP:
+            Timestamp tsVal = result.getTimestamp(i + 1);
+            ((PojoUtils.SetterLong<Object>) afi.setterOrGetter).set(obj, tsVal.getTime());
+            break;
+
+          case Types.TIME:
+            Time timeVal = result.getTime(i + 1);
+            ((PojoUtils.SetterLong<Object>) afi.setterOrGetter).set(obj, timeVal.getTime());
+            break;
+
+          case Types.DATE:
+            Date dateVal = result.getDate(i + 1);
+            ((PojoUtils.SetterLong<Object>) afi.setterOrGetter).set(obj, dateVal.getTime());
+            break;
+
+          default:
+            handleUnknownDataType(type, obj, afi);
+            break;
+        }
+      }
+      return obj;
+    } catch (SQLException e) {
+      store.disconnect();
+      throw new RuntimeException("fetching metadata", e);
+    }
+  }
+
+  @SuppressWarnings("UnusedParameters")
+  protected void handleUnknownDataType(int type, Object tuple, ActiveFieldInfo activeFieldInfo)
+  {
+    throw new RuntimeException("unsupported data type " + type);
+  }
+
+  @Override
+  public String queryToRetrieveData()
+  {
+    StringBuilder builder = new StringBuilder();
+
+    if (query != null) {
+      builder.append(query.trim());
+      if (builder.charAt(builder.length() - 1) == ';') {
+        builder.deleteCharAt(builder.length() - 1);
+      }
+    } else {
+      builder.append("SELECT ").append(columnsExpression).append(" FROM ").append(tableName);
+      if (whereCondition != null) {
+        builder.append(" WHERE ").append(whereCondition);
+      }
+      if (groupByClause != null) {
+        builder.append(" GROUP BY ").append(groupByClause);
+        if (havingCondition != null) {
+          builder.append(" HAVING ").append(havingCondition);
+        }
+      }
+      if (orderByExpr != null) {
+        builder.append(" ORDER BY ").append(orderByExpr);
+      }
+    }
+    if (limitSupported) {
+      builder.append(" LIMIT ? , ? ");
+    } else {
+      //Starting from Oracle 12c R1 there is a row limiting clause.
+      builder.append(" OFFSET ? ROWS FETCH NEXT ? ROWS ONLY ");
+    }
+    builder.append(";");
+    String queryStr = builder.toString();
+    LOG.debug("built query {}", queryStr);
+    return queryStr;
+  }
+
+  /**
+   * A list of {@link FieldInfo}s where each item maps a column name to a pojo field name.
+   */
+  public List<FieldInfo> getFieldInfos()
+  {
+    return fieldInfos;
+  }
+
+  /**
+   * Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a pojo field name.<br/>
+   * The value from fieldInfo.column is assigned to fieldInfo.pojoFieldExpression.
+   *
+   * @description $[].columnName name of the database column name
+   * @description $[].pojoFieldExpression pojo field name or expression
+   * @useSchema $[].pojoFieldExpression outputPort.fields[].name
+   */
+  public void setFieldInfos(List<FieldInfo> fieldInfos)
+  {
+    this.fieldInfos = fieldInfos;
+  }
+
+  /**
+   * @return table name
+   */
+  public String getTableName()
+  {
+    return tableName;
+  }
+
+  /**
+   * Sets the table name.
+   *
+   * @param tableName table name
+   */
+  public void setTableName(String tableName)
+  {
+    this.tableName = tableName;
+  }
+
+  /**
+   * @return where condition
+   */
+  public String getWhereCondition()
+  {
+    return whereCondition;
+  }
+
+  /**
+   * Sets the where condition.
+   *
+   * @param whereCondition where condition.
+   */
+  public void setWhereCondition(String whereCondition)
+  {
+    this.whereCondition = whereCondition;
+  }
+
+  /**
+   * @return group-by clause
+   */
+  public String getGroupByClause()
+  {
+    return groupByClause;
+  }
+
+  /**
+   * Sets the group by clause.
+   *
+   * @param groupByClause group-by clause.
+   */
+  public void setGroupByClause(String groupByClause)
+  {
+    this.groupByClause = groupByClause;
+  }
+
+  /**
+   * @return having condition
+   */
+  public String getHavingCondition()
+  {
+    return havingCondition;
+  }
+
+  /**
+   * Sets the having condition.
+   *
+   * @param havingCondition having condition
+   */
+  public void setHavingCondition(String havingCondition)
+  {
+    this.havingCondition = havingCondition;
+  }
+
+  /**
+   * @return order by expression.
+   */
+  public String getOrderByExpr()
+  {
+    return orderByExpr;
+  }
+
+  /**
+   * Sets the order by expression.
+   *
+   * @param orderByExpr order by expression.
+   */
+  public void setOrderByExpr(String orderByExpr)
+  {
+    this.orderByExpr = orderByExpr;
+  }
+
+  /**
+   * @return query
+   */
+  public String getQuery()
+  {
+    return query;
+  }
+
+  /**
+   * Sets the query
+   *
+   * @param query query
+   */
+  public void setQuery(String query)
+  {
+    this.query = query;
+  }
+
+  /**
+   * @return true if database support limit clause; false otherwise
+   */
+  public boolean isLimitSupported()
+  {
+    return limitSupported;
+  }
+
+  /**
+   * Sets whether database supports limit clause. If it doesn't then - offset {lastRow} rows fetch next {numRows} rows only
+   * syntax is used.
+   *
+   * @param limitSupported true if limit is supported by database; false otherwise
+   */
+  public void setLimitSupported(boolean limitSupported)
+  {
+    this.limitSupported = limitSupported;
+  }
+
+  @Override
+  public void activate(Context.OperatorContext context)
+  {
+    for (int i = 0; i < columnDataTypes.size(); i++) {
+      final int type = columnDataTypes.get(i);
+      JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo = columnFieldSetters.get(i);
+      switch (type) {
+        case (Types.CHAR):
+        case (Types.VARCHAR):
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(),
+            String.class);
+          break;
+
+        case (Types.BOOLEAN):
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterBoolean(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        case (Types.TINYINT):
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterByte(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        case (Types.SMALLINT):
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterShort(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        case (Types.INTEGER):
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterInt(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        case (Types.BIGINT):
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        case (Types.FLOAT):
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterFloat(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        case (Types.DOUBLE):
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        case Types.DECIMAL:
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(),
+            BigDecimal.class);
+          break;
+
+        case Types.TIMESTAMP:
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        case Types.TIME:
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        case Types.DATE:
+          activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        default:
+          handleUnknownDataType(type, null, activeFieldInfo);
+          break;
+      }
+    }
+  }
+
+  @Override
+  public void deactivate()
+  {
+
+  }
+
+  protected static class ActiveFieldInfo
+  {
+    final FieldInfo fieldInfo;
+    Object setterOrGetter;
+
+    ActiveFieldInfo(FieldInfo fieldInfo)
+    {
+      this.fieldInfo = fieldInfo;
+    }
+  }
+
+  /**
+   * @return batch size which is the number of rows retrieved from the database in a window.
+   */
+  public int getBatchSize()
+  {
+    return batchSize;
+  }
+
+  /**
+   * Sets the batch size which is the number of rows retrieved from the database in a window.
+   *
+   * @param batchSize number of rows retrieved from db in a window.
+   */
+  public void setBatchSize(int batchSize)
+  {
+    this.batchSize = batchSize;
+  }
+
+  public static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOInputOperator.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/af203f26/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
index 7451a50..35fcf0f 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
@@ -15,19 +15,25 @@
  */
 package com.datatorrent.lib.db.jdbc;
 
+import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+
+import com.datatorrent.lib.util.FieldInfo;
 import com.datatorrent.lib.util.PojoUtils;
 import com.datatorrent.lib.util.PojoUtils.Getter;
 import com.datatorrent.lib.util.PojoUtils.GetterBoolean;
-import com.datatorrent.lib.util.PojoUtils.GetterChar;
 import com.datatorrent.lib.util.PojoUtils.GetterDouble;
 import com.datatorrent.lib.util.PojoUtils.GetterFloat;
 import com.datatorrent.lib.util.PojoUtils.GetterInt;
 import com.datatorrent.lib.util.PojoUtils.GetterLong;
 import com.datatorrent.lib.util.PojoUtils.GetterShort;
 
+import java.math.BigDecimal;
 import java.sql.*;
-import java.util.ArrayList;
+import java.util.List;
 
 import javax.validation.constraints.NotNull;
 
@@ -35,6 +41,8 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 /**
  * <p>
  * JdbcPOJOOutputOperator class.</p>
@@ -46,71 +54,48 @@ import org.slf4j.LoggerFactory;
  * @since 2.1.0
  */
 @Evolving
-public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object>
+public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object> implements Operator.ActivationListener<OperatorContext>
 {
   @NotNull
-  private ArrayList<String> dataColumns;
-  //These are extracted from table metadata
-  private ArrayList<Integer> columnDataTypes;
-
-  /*
-   * An arraylist of data column names to be set in database.
-   * Gets column names.
-   */
-  public ArrayList<String> getDataColumns()
-  {
-    return dataColumns;
-  }
+  private List<FieldInfo> fieldInfos;
 
-  public void setDataColumns(ArrayList<String> dataColumns)
-  {
-    this.dataColumns = dataColumns;
-  }
+  private List<Integer> columnDataTypes;
 
   @NotNull
   private String tablename;
 
-  /*
-   * Gets the Tablename in database.
-   */
-  public String getTablename()
-  {
-    return tablename;
-  }
+  private final transient List<JdbcPOJOInputOperator.ActiveFieldInfo> columnFieldGetters;
 
-  public void setTablename(String tablename)
-  {
-    this.tablename = tablename;
-  }
+  private String insertStatement;
 
-  /*
-   * An ArrayList of Java expressions that will yield the field value from the POJO.
-   * Each expression corresponds to one column in the database table.
-   */
-  public ArrayList<String> getExpressions()
-  {
-    return expressions;
-  }
+  private transient Class<?> pojoClass;
 
-  public void setExpressions(ArrayList<String> expressions)
+  @InputPortFieldAnnotation(optional = true, schemaRequired = true)
+  public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
   {
-    this.expressions = expressions;
-  }
+    @Override
+    public void setup(Context.PortContext context)
+    {
+      pojoClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+    }
 
-  @NotNull
-  private ArrayList<String> expressions;
-  private transient ArrayList<Object> getters;
-  private String insertStatement;
+    @Override
+    public void process(Object t)
+    {
+      JdbcPOJOOutputOperator.super.input.process(t);
+    }
+
+  };
 
   @Override
   public void setup(OperatorContext context)
   {
-    StringBuilder columns = new StringBuilder("");
-    StringBuilder values = new StringBuilder("");
-    for (int i = 0; i < dataColumns.size(); i++) {
-      columns.append(dataColumns.get(i));
+    StringBuilder columns = new StringBuilder();
+    StringBuilder values = new StringBuilder();
+    for (int i = 0; i < fieldInfos.size(); i++) {
+      columns.append(fieldInfos.get(i).getColumnName());
       values.append("?");
-      if (i < dataColumns.size() - 1) {
+      if (i < fieldInfos.size() - 1) {
         columns.append(",");
         values.append(",");
       }
@@ -120,155 +105,227 @@ public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOpe
             + " (" + columns.toString() + ")"
             + " VALUES (" + values.toString() + ")";
     LOG.debug("insert statement is {}", insertStatement);
+
     super.setup(context);
-    Connection conn = store.getConnection();
-    LOG.debug("Got Connection.");
-    try {
-      Statement st = conn.createStatement();
-      ResultSet rs = st.executeQuery("select * from " + tablename);
 
-      ResultSetMetaData rsMetaData = rs.getMetaData();
+    if (columnDataTypes == null) {
+      try {
+        populateColumnDataTypes(columns.toString());
+      } catch (SQLException e) {
+        throw new RuntimeException(e);
+      }
+    }
 
-      int numberOfColumns = 0;
+    for (FieldInfo fi : fieldInfos) {
+      columnFieldGetters.add(new JdbcPOJOInputOperator.ActiveFieldInfo(fi));
+    }
+  }
 
-      numberOfColumns = rsMetaData.getColumnCount();
+  protected void populateColumnDataTypes(String columns) throws SQLException
+  {
+    columnDataTypes = Lists.newArrayList();
+    try (Statement st = store.getConnection().createStatement()) {
+      ResultSet rs = st.executeQuery("select " + columns + " from " + tablename);
 
-      LOG.debug("resultSet MetaData column Count=" + numberOfColumns);
+      ResultSetMetaData rsMetaData = rs.getMetaData();
+      LOG.debug("resultSet MetaData column count {}", rsMetaData.getColumnCount());
 
-      for (int i = 1; i <= numberOfColumns; i++) {
-        // get the designated column's SQL type.
+      for (int i = 1; i <= rsMetaData.getColumnCount(); i++) {
         int type = rsMetaData.getColumnType(i);
-        LOG.debug("column name {}", rsMetaData.getColumnTypeName(i));
         columnDataTypes.add(type);
-        LOG.debug("sql column type is " + type);
+        LOG.debug("column name {} type {}", rsMetaData.getColumnName(i), type);
       }
     }
-    catch (SQLException ex) {
-      throw new RuntimeException(ex);
-    }
-
   }
 
   public JdbcPOJOOutputOperator()
   {
     super();
-    columnDataTypes = new ArrayList<Integer>();
-    getters = new ArrayList<Object>();
+    columnFieldGetters = Lists.newArrayList();
   }
 
   @Override
-  public void processTuple(Object tuple)
+  protected String getUpdateCommand()
   {
-    if (getters.isEmpty()) {
-      processFirstTuple(tuple);
-    }
-    super.processTuple(tuple);
+    LOG.debug("insert statement is {}", insertStatement);
+    return insertStatement;
   }
 
-  public void processFirstTuple(Object tuple)
+  @Override
+  @SuppressWarnings("unchecked")
+  protected void setStatementParameters(PreparedStatement statement, Object tuple) throws SQLException
   {
-    final Class<?> fqcn = tuple.getClass();
     final int size = columnDataTypes.size();
     for (int i = 0; i < size; i++) {
       final int type = columnDataTypes.get(i);
-      final String getterExpression = expressions.get(i);
-      final Object getter;
+      JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo = columnFieldGetters.get(i);
       switch (type) {
-        case Types.CHAR:
-          getter = PojoUtils.createGetterChar(fqcn, getterExpression);
+        case (Types.CHAR):
+        case (Types.VARCHAR):
+          statement.setString(i + 1, ((Getter<Object, String>) activeFieldInfo.setterOrGetter).get(tuple));
           break;
-        case Types.VARCHAR:
-          getter = PojoUtils.createGetter(fqcn, getterExpression, String.class);
+
+        case (Types.BOOLEAN):
+          statement.setBoolean(i + 1, ((GetterBoolean<Object>) activeFieldInfo.setterOrGetter).get(tuple));
           break;
-        case Types.BOOLEAN:
-        case Types.TINYINT:
-          getter = PojoUtils.createGetterBoolean(fqcn, getterExpression);
+
+        case (Types.TINYINT):
+          statement.setByte(i + 1, ((PojoUtils.GetterByte<Object>) activeFieldInfo.setterOrGetter).get(tuple));
           break;
-        case Types.SMALLINT:
-          getter = PojoUtils.createGetterShort(fqcn, getterExpression);
+
+        case (Types.SMALLINT):
+          statement.setShort(i + 1, ((GetterShort<Object>) activeFieldInfo.setterOrGetter).get(tuple));
           break;
-        case Types.INTEGER:
-          getter = PojoUtils.createGetterInt(fqcn, getterExpression);
+
+        case (Types.INTEGER):
+          statement.setInt(i + 1, ((GetterInt<Object>) activeFieldInfo.setterOrGetter).get(tuple));
           break;
-        case Types.BIGINT:
-          getter = PojoUtils.createGetterLong(fqcn, getterExpression);
+
+        case (Types.BIGINT):
+          statement.setLong(i + 1, ((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple));
           break;
-        case Types.FLOAT:
-          getter = PojoUtils.createGetterFloat(fqcn, getterExpression);
+
+        case (Types.FLOAT):
+          statement.setFloat(i + 1, ((GetterFloat<Object>) activeFieldInfo.setterOrGetter).get(tuple));
           break;
-        case Types.DOUBLE:
-          getter = PojoUtils.createGetterDouble(fqcn, getterExpression);
+
+        case (Types.DOUBLE):
+          statement.setDouble(i + 1, ((GetterDouble<Object>) activeFieldInfo.setterOrGetter).get(tuple));
+          break;
+
+        case Types.DECIMAL:
+          statement.setBigDecimal(i + 1, ((Getter<Object, BigDecimal>) activeFieldInfo.setterOrGetter).get(tuple));
+          break;
+
+        case Types.TIMESTAMP:
+          statement.setTimestamp(i + 1, new Timestamp(((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple)));
+          break;
+
+        case Types.TIME:
+          statement.setTime(i + 1, new Time(((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple)));
           break;
+
+        case Types.DATE:
+          statement.setDate(i + 1, new Date(((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple)));
+          break;
+
         default:
-          /*
-           Types.DECIMAL
-           Types.DATE
-           Types.TIME
-           Types.ARRAY
-           Types.OTHER
-           */
-          getter = PojoUtils.createGetter(fqcn, getterExpression, Object.class);
+          handleUnknownDataType(type, tuple, activeFieldInfo);
           break;
       }
-      getters.add(getter);
     }
+  }
 
+  @SuppressWarnings("UnusedParameters")
+  protected void handleUnknownDataType(int type, Object tuple, JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo)
+  {
+    throw new RuntimeException("unsupported data type " + type);
   }
 
-  @Override
-  protected String getUpdateCommand()
+  /**
+   * A list of {@link FieldInfo}s where each item maps a column name to a pojo field name.
+   */
+  public List<FieldInfo> getFieldInfos()
   {
-    LOG.debug("insertstatement is {}", insertStatement);
-    return insertStatement;
+    return fieldInfos;
+  }
+
+  /**
+   * Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a pojo field name.<br/>
+   * The value from fieldInfo.column is assigned to fieldInfo.pojoFieldExpression.
+   *
+   * @description $[].columnName name of the database column name
+   * @description $[].pojoFieldExpression pojo field name or expression
+   * @useSchema $[].pojoFieldExpression input.fields[].name
+   */
+  public void setFieldInfos(List<FieldInfo> fieldInfos)
+  {
+    this.fieldInfos = fieldInfos;
+  }
+
+  /*
+   * Gets the name of the table in database.
+   */
+  public String getTablename()
+  {
+    return tablename;
+  }
+
+  public void setTablename(String tablename)
+  {
+    this.tablename = tablename;
   }
 
+  private static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOOutputOperator.class);
+
   @Override
-  @SuppressWarnings("unchecked")
-  protected void setStatementParameters(PreparedStatement statement, Object tuple) throws SQLException
+  public void activate(OperatorContext context)
   {
     final int size = columnDataTypes.size();
     for (int i = 0; i < size; i++) {
       final int type = columnDataTypes.get(i);
+      JdbcPOJOInputOperator.ActiveFieldInfo activeFieldInfo = columnFieldGetters.get(i);
       switch (type) {
         case (Types.CHAR):
-          statement.setString(i + 1, ((Getter<Object, String>)getters.get(i)).get(tuple));
-          break;
         case (Types.VARCHAR):
-          statement.setString(i + 1, ((Getter<Object, String>)getters.get(i)).get(tuple));
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(),
+            String.class);
           break;
+
         case (Types.BOOLEAN):
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetterBoolean(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
         case (Types.TINYINT):
-          statement.setBoolean(i + 1, ((GetterBoolean<Object>)getters.get(i)).get(tuple));
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetterByte(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
+
         case (Types.SMALLINT):
-          statement.setShort(i + 1, ((GetterShort<Object>)getters.get(i)).get(tuple));
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetterShort(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
+
         case (Types.INTEGER):
-          statement.setInt(i + 1, ((GetterInt<Object>)getters.get(i)).get(tuple));
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetterInt(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
+
         case (Types.BIGINT):
-          statement.setLong(i + 1, ((GetterLong<Object>)getters.get(i)).get(tuple));
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
+
         case (Types.FLOAT):
-          statement.setFloat(i + 1, ((GetterFloat<Object>)getters.get(i)).get(tuple));
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetterFloat(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
+
         case (Types.DOUBLE):
-          statement.setDouble(i + 1, ((GetterDouble<Object>)getters.get(i)).get(tuple));
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        case Types.DECIMAL:
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(),
+            BigDecimal.class);
+          break;
+
+        case Types.TIMESTAMP:
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
+        case Types.TIME:
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
           break;
+
+        case Types.DATE:
+          activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression());
+          break;
+
         default:
-          /*
-           Types.DECIMAL
-           Types.DATE
-           Types.TIME
-           Types.ARRAY
-           Types.OTHER
-           */
-          statement.setObject(i + 1, ((Getter<Object, Object>)getters.get(i)).get(tuple));
+          handleUnknownDataType(type, null, activeFieldInfo);
           break;
       }
     }
   }
 
-  private static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOOutputOperator.class);
-
+  @Override
+  public void deactivate()
+  {
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/af203f26/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java
index 82e613a..4c4c004 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java
@@ -44,7 +44,7 @@ public class JdbcStore implements Connectable
   private String databaseUrl;
   @NotNull
   private String databaseDriver;
-  private final Properties connectionProperties;
+  private Properties connectionProperties;
   protected transient Connection connection = null;
 
   /*
@@ -115,10 +115,10 @@ public class JdbcStore implements Connectable
   }
 
   /**
-   * Connection Properties for JDBC Connection.
-   * Sets the properties on the jdbc connection.
+   * Sets the connection properties on JDBC connection. Connection properties are provided as a string.
+   *
    * @param connectionProps Comma separated list of properties. Property key and value are separated by colon.
-   *                             eg. user:xyz,password:ijk
+   *                        eg. user:xyz,password:ijk
    */
   public void setConnectionProperties(String connectionProps)
   {
@@ -131,8 +131,17 @@ public class JdbcStore implements Connectable
   }
 
   /**
-   * Connection Properties for JDBC Connection.
-   * Gets the properties on the jdbc connection.
+   * Sets the connection properties on JDBC connection.
+   *
+   * @param connectionProperties connection properties.
+   */
+  public void setConnectionProperties(Properties connectionProperties)
+  {
+    this.connectionProperties = connectionProperties;
+  }
+
+  /**
+   * Get the connection properties of JDBC connection.
    */
   public Properties getConnectionProperties()
   {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/af203f26/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
index 4cca31f..c77fb73 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
@@ -26,12 +26,15 @@ import org.junit.Test;
 
 import com.google.common.collect.Lists;
 
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 
-import com.datatorrent.netlet.util.DTThrowable;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.helper.TestPortContext;
 import com.datatorrent.lib.testbench.CollectorTestSink;
-import java.util.ArrayList;
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.netlet.util.DTThrowable;
 
 /**
  * Tests for {@link AbstractJdbcTransactionableOutputOperator} and {@link AbstractJdbcInputOperator}
@@ -103,19 +106,18 @@ public class JdbcOperatorTest
       Statement stmt = con.createStatement();
 
       String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
-              + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
-              + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
-              + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
-              + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
-              + ")";
+        + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+        + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+        + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
+        + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+        + ")";
       stmt.executeUpdate(createMetaTable);
 
       String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (ID INTEGER)";
       stmt.executeUpdate(createTable);
       String createPOJOTable = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME + "(id INTEGER not NULL,name VARCHAR(255), PRIMARY KEY ( id ))";
       stmt.executeUpdate(createPOJOTable);
-    }
-    catch (Throwable e) {
+    } catch (Throwable e) {
       DTThrowable.rethrow(e);
     }
   }
@@ -131,8 +133,23 @@ public class JdbcOperatorTest
 
       cleanTable = "delete from " + JdbcTransactionalStore.DEFAULT_META_TABLE;
       stmt.executeUpdate(cleanTable);
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
     }
-    catch (SQLException e) {
+  }
+
+  public static void insertEventsInTable(int numEvents)
+  {
+    try {
+      Connection con = DriverManager.getConnection(URL);
+      String insert = "insert into " + TABLE_NAME + " values (?)";
+      PreparedStatement stmt = con.prepareStatement(insert);
+
+      for (int i = 0; i < numEvents; i++) {
+        stmt.setInt(1, i);
+        stmt.executeUpdate();
+      }
+    } catch (SQLException e) {
       throw new RuntimeException(e);
     }
   }
@@ -170,8 +187,7 @@ public class JdbcOperatorTest
         ResultSet resultSet = stmt.executeQuery(countQuery);
         resultSet.next();
         return resultSet.getInt(1);
-      }
-      catch (SQLException e) {
+      } catch (SQLException e) {
         throw new RuntimeException("fetching count", e);
       }
     }
@@ -195,8 +211,7 @@ public class JdbcOperatorTest
         ResultSet resultSet = stmt.executeQuery(countQuery);
         resultSet.next();
         return resultSet.getInt(1);
-      }
-      catch (SQLException e) {
+      } catch (SQLException e) {
         throw new RuntimeException("fetching count", e);
       }
     }
@@ -218,8 +233,7 @@ public class JdbcOperatorTest
     {
       try {
         return new TestEvent(result.getInt(1));
-      }
-      catch (SQLException e) {
+      } catch (SQLException e) {
         throw new RuntimeException(e);
       }
     }
@@ -229,23 +243,6 @@ public class JdbcOperatorTest
     {
       return retrieveQuery;
     }
-
-    public void insertEventsInTable(int numEvents)
-    {
-      try {
-        Connection con = DriverManager.getConnection(URL);
-        String insert = "insert into " + TABLE_NAME + " values (?)";
-        PreparedStatement stmt = con.prepareStatement(insert);
-
-        for (int i = 0; i < numEvents; i++) {
-          stmt.setInt(1, i);
-          stmt.executeUpdate();
-        }
-      }
-      catch (SQLException e) {
-        throw new RuntimeException(e);
-      }
-    }
   }
 
   @Test
@@ -281,7 +278,7 @@ public class JdbcOperatorTest
   }
 
   @Test
-  public void testJdbcPOJOOutputOperator()
+  public void testJdbcPojoOutputOperator()
   {
     JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
     transactionalStore.setDatabaseDriver(DB_DRIVER);
@@ -294,25 +291,30 @@ public class JdbcOperatorTest
     TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator();
     outputOperator.setBatchSize(3);
     outputOperator.setTablename(TABLE_POJO_NAME);
-    ArrayList<String> dataColumns = new ArrayList<String>();
-    dataColumns.add("id");
-    dataColumns.add("name");
-    outputOperator.setDataColumns(dataColumns);
+
+    List<FieldInfo> fieldInfos = Lists.newArrayList();
+    fieldInfos.add(new FieldInfo("ID", "id", null));
+    fieldInfos.add(new FieldInfo("NAME", "name", null));
+    outputOperator.setFieldInfos(fieldInfos);
+
     outputOperator.setStore(transactionalStore);
-    ArrayList<String> expressions = new ArrayList<String>();
-    expressions.add("getId()");
-    expressions.add("getName()");
-    outputOperator.setExpressions(expressions);
 
     outputOperator.setup(context);
 
+    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+    TestPortContext tpc = new TestPortContext(portAttributes);
+    outputOperator.input.setup(tpc);
+
+    outputOperator.activate(context);
+
     List<TestPOJOEvent> events = Lists.newArrayList();
     for (int i = 0; i < 10; i++) {
       events.add(new TestPOJOEvent(i, "test" + i));
     }
 
     outputOperator.beginWindow(0);
-    for (TestPOJOEvent event: events) {
+    for (TestPOJOEvent event : events) {
       outputOperator.input.process(event);
     }
     outputOperator.endWindow();
@@ -321,7 +323,7 @@ public class JdbcOperatorTest
   }
 
   @Test
-  public void TestJdbcInputOperator()
+  public void testJdbcInputOperator()
   {
     JdbcStore store = new JdbcStore();
     store.setDatabaseDriver(DB_DRIVER);
@@ -333,7 +335,7 @@ public class JdbcOperatorTest
 
     TestInputOperator inputOperator = new TestInputOperator();
     inputOperator.setStore(store);
-    inputOperator.insertEventsInTable(10);
+    insertEventsInTable(10);
 
     CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
     inputOperator.outputPort.setSink(sink);
@@ -345,5 +347,53 @@ public class JdbcOperatorTest
 
     Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
   }
+
+  @Test
+  public void testJdbcPojoInputOperator()
+  {
+    JdbcStore store = new JdbcStore();
+    store.setDatabaseDriver(DB_DRIVER);
+    store.setDatabaseUrl(URL);
+
+    Attribute.AttributeMap.DefaultAttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
+
+    insertEventsInTable(10);
+
+    JdbcPOJOInputOperator inputOperator = new JdbcPOJOInputOperator();
+    inputOperator.setStore(store);
+    inputOperator.setTableName(TABLE_NAME);
+
+    List<FieldInfo> fieldInfos = Lists.newArrayList();
+    fieldInfos.add(new FieldInfo("ID", "id", null));
+    inputOperator.setFieldInfos(fieldInfos);
+
+    inputOperator.setBatchSize(5);
+
+    CollectorTestSink<Object> sink = new CollectorTestSink<>();
+    inputOperator.outputPort.setSink(sink);
+
+    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+    TestPortContext tpc = new TestPortContext(portAttributes);
+
+    inputOperator.setup(context);
+    inputOperator.outputPort.setup(tpc);
+
+    inputOperator.activate(context);
+
+    inputOperator.beginWindow(0);
+    inputOperator.emitTuples();
+    inputOperator.endWindow();
+
+    Assert.assertEquals("rows from db", 5, sink.collectedTuples.size());
+
+    inputOperator.beginWindow(1);
+    inputOperator.emitTuples();
+    inputOperator.endWindow();
+
+    Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/af203f26/library/src/test/java/com/datatorrent/lib/helper/TestPortContext.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/helper/TestPortContext.java b/library/src/test/java/com/datatorrent/lib/helper/TestPortContext.java
new file mode 100644
index 0000000..8bd72a6
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/helper/TestPortContext.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.helper;
+
+import java.util.Collection;
+
+import javax.validation.constraints.NotNull;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+
+public class TestPortContext implements Context.PortContext
+{
+  public final Attribute.AttributeMap attributeMap;
+
+  public TestPortContext(@NotNull Attribute.AttributeMap attributeMap)
+  {
+    this.attributeMap = Preconditions.checkNotNull(attributeMap, "attributes");
+  }
+
+  @Override
+  public Attribute.AttributeMap getAttributes()
+  {
+    return attributeMap;
+  }
+
+  @Override
+  public <T> T getValue(Attribute<T> key)
+  {
+    return attributeMap.get(key);
+  }
+
+  @Override
+  public void setCounters(Object counters)
+  {
+
+  }
+
+  @Override
+  public void sendMetrics(Collection<String> metricNames)
+  {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/af203f26/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1468163..c9f5397 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,6 +80,9 @@
                 <onlyBinaryIncompatible>false</onlyBinaryIncompatible>
                 <includeSynthetic>false</includeSynthetic>
                 <ignoreMissingClasses>true</ignoreMissingClasses>
+                <excludes>
+                  <exclude>*POJO*</exclude>
+                </excludes>
               </parameter>
               <skip>${semver.plugin.skip}</skip>
             </configuration>


Mime
View raw message