apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sand...@apache.org
Subject [1/3] apex-malhar git commit: APEXMALHAR-1953: Added JdbcPOJOInsertOutputOperator for insert queries. Added support for automatic mapping of fields from POJO to DB columns. Added unit tests.
Date Fri, 01 Jul 2016 08:04:09 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master 13883da68 -> ddd5bcf1a


APEXMALHAR-1953: Added JdbcPOJOInsertOutputOperator for insert queries. Added support for
automatic mapping of fields from POJO to DB columns. Added unit tests.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/f54ba320
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/f54ba320
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/f54ba320

Branch: refs/heads/master
Commit: f54ba3205d4177f63363d5f00e9a3548e5f89a96
Parents: f6ba2d0
Author: bhupesh <bhupeshchawda@gmail.com>
Authored: Tue Mar 15 18:58:30 2016 +0530
Committer: bhupesh <bhupeshchawda@gmail.com>
Committed: Fri Jul 1 11:53:02 2016 +0530

----------------------------------------------------------------------
 .../db/jdbc/AbstractJdbcPOJOOutputOperator.java |  79 +-------
 ...stractJdbcTransactionableOutputOperator.java |  15 +-
 .../db/jdbc/JdbcPOJOInsertOutputOperator.java   | 182 +++++++++++++++++++
 .../lib/db/jdbc/JdbcOperatorTest.java           | 127 ++++++++++++-
 4 files changed, 324 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f54ba320/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
index da491aa..c310a40 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java
@@ -21,10 +21,7 @@ package com.datatorrent.lib.db.jdbc;
 import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.sql.Types;
@@ -40,7 +37,6 @@ import com.google.common.collect.Lists;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.Operator;
 import com.datatorrent.api.annotation.InputPortFieldAnnotation;
 import com.datatorrent.lib.util.FieldInfo;
 import com.datatorrent.lib.util.PojoUtils;
@@ -63,22 +59,18 @@ import com.datatorrent.lib.util.PojoUtils.GetterShort;
  * @since 2.1.0
  */
 @org.apache.hadoop.classification.InterfaceStability.Evolving
-public class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object>
-    implements Operator.ActivationListener<OperatorContext>
+public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object>
 {
-  @NotNull
   private List<FieldInfo> fieldInfos;
 
-  private List<Integer> columnDataTypes;
+  protected List<Integer> columnDataTypes;
 
   @NotNull
   private String tablename;
 
-  private final transient List<JdbcPOJOInputOperator.ActiveFieldInfo> columnFieldGetters;
-
-  private String insertStatement;
+  protected final transient List<JdbcPOJOInputOperator.ActiveFieldInfo> columnFieldGetters;
 
-  private transient Class<?> pojoClass;
+  protected transient Class<?> pojoClass;
 
   @InputPortFieldAnnotation(optional = true, schemaRequired = true)
   public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
@@ -97,57 +89,6 @@ public class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransactionableO
 
   };
 
-  @Override
-  public void setup(OperatorContext context)
-  {
-    StringBuilder columns = new StringBuilder();
-    StringBuilder values = new StringBuilder();
-    for (int i = 0; i < fieldInfos.size(); i++) {
-      columns.append(fieldInfos.get(i).getColumnName());
-      values.append("?");
-      if (i < fieldInfos.size() - 1) {
-        columns.append(",");
-        values.append(",");
-      }
-    }
-    insertStatement = "INSERT INTO "
-            + tablename
-            + " (" + columns.toString() + ")"
-            + " VALUES (" + values.toString() + ")";
-    LOG.debug("insert statement is {}", insertStatement);
-
-    super.setup(context);
-
-    if (columnDataTypes == null) {
-      try {
-        populateColumnDataTypes(columns.toString());
-      } catch (SQLException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    for (FieldInfo fi : fieldInfos) {
-      columnFieldGetters.add(new JdbcPOJOInputOperator.ActiveFieldInfo(fi));
-    }
-  }
-
-  protected void populateColumnDataTypes(String columns) throws SQLException
-  {
-    columnDataTypes = Lists.newArrayList();
-    try (Statement st = store.getConnection().createStatement()) {
-      ResultSet rs = st.executeQuery("select " + columns + " from " + tablename);
-
-      ResultSetMetaData rsMetaData = rs.getMetaData();
-      LOG.debug("resultSet MetaData column count {}", rsMetaData.getColumnCount());
-
-      for (int i = 1; i <= rsMetaData.getColumnCount(); i++) {
-        int type = rsMetaData.getColumnType(i);
-        columnDataTypes.add(type);
-        LOG.debug("column name {} type {}", rsMetaData.getColumnName(i), type);
-      }
-    }
-  }
-
   public AbstractJdbcPOJOOutputOperator()
   {
     super();
@@ -155,13 +96,6 @@ public class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransactionableO
   }
 
   @Override
-  protected String getUpdateCommand()
-  {
-    LOG.debug("insert statement is {}", insertStatement);
-    return insertStatement;
-  }
-
-  @Override
   @SuppressWarnings("unchecked")
   protected void setStatementParameters(PreparedStatement statement, Object tuple) throws
SQLException
   {
@@ -271,6 +205,7 @@ public class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransactionableO
   @Override
   public void activate(OperatorContext context)
   {
+    super.activate(context);
     final int size = columnDataTypes.size();
     for (int i = 0; i < size; i++) {
       final int type = columnDataTypes.get(i);
@@ -345,8 +280,4 @@ public class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransactionableO
     }
   }
 
-  @Override
-  public void deactivate()
-  {
-  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f54ba320/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
index fb29233..d3300fc 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
@@ -30,6 +30,8 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Lists;
 
 import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Operator;
 import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator;
 
 /**
@@ -56,6 +58,7 @@ import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator
  */
 public abstract class AbstractJdbcTransactionableOutputOperator<T>
     extends AbstractPassThruTransactionableStoreOutputOperator<T, JdbcTransactionalStore>
+    implements Operator.ActivationListener<Context.OperatorContext>
 {
   protected static int DEFAULT_BATCH_SIZE = 1000;
 
@@ -78,12 +81,17 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T>
   public void setup(Context.OperatorContext context)
   {
     super.setup(context);
+
+  }
+
+  @Override
+  public void activate(OperatorContext context)
+  {
     try {
       updateCommand = store.connection.prepareStatement(getUpdateCommand());
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
-
   }
 
   @Override
@@ -98,6 +106,11 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T>
   }
 
   @Override
+  public void deactivate()
+  {
+  }
+
+  @Override
   public void processTuple(T tuple)
   {
     tuples.add(tuple);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f54ba320/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
new file mode 100644
index 0000000..09bab2f
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import java.lang.reflect.Field;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.util.FieldInfo;
+
+/**
+ * <p>
+ * JdbcPOJOInsertOutputOperator class.</p>
+ * An implementation of AbstractJdbcTransactionableOutputOperator which takes in any POJO.
+ *
+ * @displayName Jdbc Output Operator
+ * @category Output
+ * @tags database, sql, pojo, jdbc
+ * @since 2.1.0
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class JdbcPOJOInsertOutputOperator extends AbstractJdbcPOJOOutputOperator
+{
+  String insertStatement;
+  List<String> columnNames;
+  List<Integer> columnNullabilities;
+  String columnString;
+  String valueString;
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    super.setup(context);
+
+    // Populate columnNames and columnDataTypes
+    try {
+      if (getFieldInfos() == null) { // then assume direct mapping
+        LOG.info("Assuming direct mapping between POJO fields and DB columns");
+        populateColumnDataTypes(null);
+      } else {
+        // FieldInfo supplied by user
+        StringBuilder columns = new StringBuilder();
+        StringBuilder values = new StringBuilder();
+        for (int i = 0; i < getFieldInfos().size(); i++) {
+          columns.append(getFieldInfos().get(i).getColumnName());
+          values.append("?");
+          if (i < getFieldInfos().size() - 1) {
+            columns.append(",");
+            values.append(",");
+          }
+        }
+        populateColumnDataTypes(columns.toString());
+      }
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void activate(OperatorContext context)
+  {
+    if(getFieldInfos() == null) {
+      Field[] fields = pojoClass.getDeclaredFields();
+      // Create fieldInfos in case of direct mapping
+      List<FieldInfo> fieldInfos = Lists.newArrayList();
+      for (int i = 0; i < columnNames.size(); i++) {
+        String columnName = columnNames.get(i);
+        String pojoField = getMatchingField(fields, columnName);
+
+        if(columnNullabilities.get(i) == ResultSetMetaData.columnNoNulls &&
+                (pojoField == null || pojoField.length() == 0)) {
+          throw new RuntimeException("Data for a non-nullable field not found in POJO");
+        } else {
+          if(pojoField != null && pojoField.length() != 0) {
+            FieldInfo fi = new FieldInfo(columnName, pojoField, null);
+            fieldInfos.add(fi);
+          } else {
+            columnDataTypes.remove(i);
+            columnNames.remove(i);
+            columnNullabilities.remove(i);
+            i--;
+          }
+        }
+      }
+      setFieldInfos(fieldInfos);
+    }
+
+    for (FieldInfo fi : getFieldInfos()) {
+      columnFieldGetters.add(new JdbcPOJOInputOperator.ActiveFieldInfo(fi));
+    }
+
+    StringBuilder columns = new StringBuilder();
+    StringBuilder values = new StringBuilder();
+
+    for (int i = 0; i < columnNames.size(); i++) {
+      columns.append(columnNames.get(i));
+      values.append("?");
+      if (i < columnNames.size() - 1) {
+        columns.append(",");
+        values.append(",");
+      }
+    }
+
+    insertStatement = "INSERT INTO "
+            + getTablename()
+            + " (" + columns.toString() + ")"
+            + " VALUES (" + values.toString() + ")";
+    LOG.debug("insert statement is {}", insertStatement);
+
+    super.activate(context);
+  }
+
+  private String getMatchingField(Field[] fields, String columnName)
+  {
+    for (Field f: fields) {
+      if(f.getName().equalsIgnoreCase(columnName)) {
+        return f.getName();
+      }
+    }
+    return null;
+  }
+
+  protected void populateColumnDataTypes(String columns) throws SQLException
+  {
+    columnNames = Lists.newArrayList();
+    columnDataTypes = Lists.newArrayList();
+    columnNullabilities = Lists.newArrayList();
+
+    try (Statement st = store.getConnection().createStatement()) {
+      if (columns == null || columns.length() == 0) {
+        columns = "*";
+      }
+      ResultSet rs = st.executeQuery("select " + columns + " from " + getTablename());
+
+      ResultSetMetaData rsMetaData = rs.getMetaData();
+      LOG.debug("resultSet MetaData column count {}", rsMetaData.getColumnCount());
+
+      for (int i = 1; i <= rsMetaData.getColumnCount(); i++) {
+        int type = rsMetaData.getColumnType(i);
+        String columnName = rsMetaData.getColumnName(i);
+        columnNames.add(columnName);
+        columnDataTypes.add(type);
+        columnNullabilities.add(rsMetaData.isNullable(i));
+        LOG.debug("column name {} type {}", rsMetaData.getColumnName(i), type);
+      }
+    }
+  }
+
+
+  @Override
+  protected String getUpdateCommand()
+  {
+    return insertStatement;
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOInsertOutputOperator.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f54ba320/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
index ad7e676..26196f5 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
@@ -56,6 +56,8 @@ public class JdbcOperatorTest
 
   private static final String TABLE_NAME = "test_event_table";
   private static final String TABLE_POJO_NAME = "test_pojo_event_table";
+  private static final String TABLE_POJO_NAME_ID_DIFF = "test_pojo_event_table_id_diff";
+  private static final String TABLE_POJO_NAME_NAME_DIFF = "test_pojo_event_table_name_diff";
   private static String APP_ID = "JdbcOperatorTest";
   private static int OPERATOR_ID = 0;
 
@@ -162,6 +164,12 @@ public class JdbcOperatorTest
       String createPOJOTable = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME
           + "(id INTEGER not NULL,name VARCHAR(255),startDate DATE,startTime TIME,startTimestamp
TIMESTAMP, PRIMARY KEY ( id ))";
       stmt.executeUpdate(createPOJOTable);
+      String createPOJOTableIdDiff = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME_ID_DIFF
+              + "(id1 INTEGER not NULL,name VARCHAR(255), PRIMARY KEY ( id1 ))";
+      stmt.executeUpdate(createPOJOTableIdDiff);
+      String createPOJOTableNameDiff = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME_NAME_DIFF
+              + "(id INTEGER not NULL,name1 VARCHAR(255), PRIMARY KEY ( id ))";
+      stmt.executeUpdate(createPOJOTableNameDiff);
     } catch (Throwable e) {
       DTThrowable.rethrow(e);
     }
@@ -176,6 +184,9 @@ public class JdbcOperatorTest
       String cleanTable = "delete from " + TABLE_NAME;
       stmt.executeUpdate(cleanTable);
 
+      cleanTable = "delete from " + TABLE_POJO_NAME;
+      stmt.executeUpdate(cleanTable);
+
       cleanTable = "delete from " + JdbcTransactionalStore.DEFAULT_META_TABLE;
       stmt.executeUpdate(cleanTable);
     } catch (SQLException e) {
@@ -238,21 +249,37 @@ public class JdbcOperatorTest
     }
   }
 
-  private static class TestPOJOOutputOperator extends AbstractJdbcPOJOOutputOperator
+  private static class TestPOJOOutputOperator extends JdbcPOJOInsertOutputOperator
   {
     TestPOJOOutputOperator()
     {
       cleanTable();
     }
 
-    public int getNumOfEventsInStore()
+    public int getNumOfEventsInStore(String tableName)
     {
       Connection con;
       try {
         con = DriverManager.getConnection(URL);
         Statement stmt = con.createStatement();
 
-        String countQuery = "SELECT count(*) from " + TABLE_POJO_NAME;
+        String countQuery = "SELECT count(*) from " + tableName;
+        ResultSet resultSet = stmt.executeQuery(countQuery);
+        resultSet.next();
+        return resultSet.getInt(1);
+      } catch (SQLException e) {
+        throw new RuntimeException("fetching count", e);
+      }
+    }
+
+    public int getNumOfNullEventsInStore(String tableName)
+    {
+      Connection con;
+      try {
+        con = DriverManager.getConnection(URL);
+        Statement stmt = con.createStatement();
+
+        String countQuery = "SELECT count(*) from " + tableName + " where name1 is null";
         ResultSet resultSet = stmt.executeQuery(countQuery);
         resultSet.next();
         return resultSet.getInt(1);
@@ -309,6 +336,7 @@ public class JdbcOperatorTest
 
     outputOperator.setup(context);
 
+    outputOperator.activate(context);
     List<TestEvent> events = Lists.newArrayList();
     for (int i = 0; i < 10; i++) {
       events.add(new TestEvent(i));
@@ -368,7 +396,98 @@ public class JdbcOperatorTest
     }
     outputOperator.endWindow();
 
-    Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore());
+    Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME));
+  }
+
+  /**
+   * This test will assume direct mapping for POJO fields to DB columns
+   */
+  @Test
+  public void testJdbcPojoInsertOutputOperator()
+  {
+    JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
+    transactionalStore.setDatabaseDriver(DB_DRIVER);
+    transactionalStore.setDatabaseUrl(URL);
+
+    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap =
+        new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(
+        OPERATOR_ID, attributeMap);
+
+    TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator();
+    outputOperator.setBatchSize(3);
+    outputOperator.setTablename(TABLE_POJO_NAME);
+
+    outputOperator.setStore(transactionalStore);
+
+    outputOperator.setup(context);
+
+    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+    TestPortContext tpc = new TestPortContext(portAttributes);
+    outputOperator.input.setup(tpc);
+
+    outputOperator.activate(context);
+
+    List<TestPOJOEvent> events = Lists.newArrayList();
+    for (int i = 0; i < 10; i++) {
+      events.add(new TestPOJOEvent(i, "test" + i));
+    }
+
+    outputOperator.beginWindow(0);
+    for (TestPOJOEvent event : events) {
+      outputOperator.input.process(event);
+    }
+    outputOperator.endWindow();
+
+    Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME));
+  }
+
+  /**
+   * This test will assume direct mapping for POJO fields to DB columns
+   */
+  @Test
+  public void testJdbcPojoInsertOutputOperatorNullName()
+  {
+    JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
+    transactionalStore.setDatabaseDriver(DB_DRIVER);
+    transactionalStore.setDatabaseUrl(URL);
+
+    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap =
+        new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(
+        OPERATOR_ID, attributeMap);
+
+    TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator();
+    outputOperator.setBatchSize(3);
+    outputOperator.setTablename(TABLE_POJO_NAME_NAME_DIFF);
+
+    outputOperator.setStore(transactionalStore);
+
+    outputOperator.setup(context);
+
+    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+    TestPortContext tpc = new TestPortContext(portAttributes);
+    outputOperator.input.setup(tpc);
+
+    outputOperator.activate(context);
+
+    List<TestPOJOEvent> events = Lists.newArrayList();
+    for (int i = 0; i < 10; i++) {
+      events.add(new TestPOJOEvent(i, "test" + i));
+    }
+
+    outputOperator.beginWindow(0);
+    for (TestPOJOEvent event : events) {
+      outputOperator.input.process(event);
+    }
+    outputOperator.endWindow();
+
+    Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME_NAME_DIFF));
+    Assert.assertEquals("null name rows in db", 10, outputOperator.getNumOfNullEventsInStore(TABLE_POJO_NAME_NAME_DIFF));
   }
 
   @Test


Mime
View raw message