apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-1953) Add generic (insert, update, delete) support to JDBC Output Operator
Date Mon, 28 Mar 2016 06:55:25 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-1953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15213891#comment-15213891
] 

ASF GitHub Bot commented on APEXMALHAR-1953:
--------------------------------------------

Github user sandeepdeshmukh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/215#discussion_r57549460
  
    --- Diff: library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java ---
    @@ -332,7 +400,154 @@ public void testJdbcPojoOutputOperator()
         }
         outputOperator.endWindow();
     
    -    Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore());
    +    Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME));
    +  }
    +
    +  /**
    +   * This test will assume direct mapping for POJO fields to DB columns
    +   */
    +  @Test
    +  public void testJdbcPojoInsertOutputOperator()
    +  {
    +    JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
    +    transactionalStore.setDatabaseDriver(DB_DRIVER);
    +    transactionalStore.setDatabaseUrl(URL);
    +
    +    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap =
    +        new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
    +    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
    +    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(
    +        OPERATOR_ID, attributeMap);
    +
    +    TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator();
    +    outputOperator.setBatchSize(3);
    +    outputOperator.setTablename(TABLE_POJO_NAME);
    +
    +    outputOperator.setStore(transactionalStore);
    +
    +    outputOperator.setup(context);
    +
    +    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
    +    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
    +    TestPortContext tpc = new TestPortContext(portAttributes);
    +    outputOperator.input.setup(tpc);
    +
    +    outputOperator.activate(context);
    +
    +    List<TestPOJOEvent> events = Lists.newArrayList();
    +    for (int i = 0; i < 10; i++) {
    +      events.add(new TestPOJOEvent(i, "test" + i));
    +    }
    +
    +    outputOperator.beginWindow(0);
    +    for (TestPOJOEvent event : events) {
    +      outputOperator.input.process(event);
    +    }
    +    outputOperator.endWindow();
    +
    +    Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME));
    +  }
    +
    +  /**
    +   * This test will assume direct mapping for POJO fields to DB columns
    +   */
    +  @Test
    +  public void testJdbcPojoInsertOutputOperatorNullName()
    +  {
    +    JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
    +    transactionalStore.setDatabaseDriver(DB_DRIVER);
    +    transactionalStore.setDatabaseUrl(URL);
    +
    +    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap =
    +        new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
    +    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
    +    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(
    +        OPERATOR_ID, attributeMap);
    +
    +    TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator();
    +    outputOperator.setBatchSize(3);
    +    outputOperator.setTablename(TABLE_POJO_NAME_NAME_DIFF);
    +
    +    outputOperator.setStore(transactionalStore);
    +
    +    outputOperator.setup(context);
    +
    +    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
    +    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
    +    TestPortContext tpc = new TestPortContext(portAttributes);
    +    outputOperator.input.setup(tpc);
    +
    +    outputOperator.activate(context);
    +
    +    List<TestPOJOEvent> events = Lists.newArrayList();
    +    for (int i = 0; i < 10; i++) {
    +      events.add(new TestPOJOEvent(i, "test" + i));
    +    }
    +
    +    outputOperator.beginWindow(0);
    +    for (TestPOJOEvent event : events) {
    +      outputOperator.input.process(event);
    +    }
    +    outputOperator.endWindow();
    +
    +    Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME_NAME_DIFF));
    +    Assert.assertEquals("null name rows in db", 10,
    +        outputOperator.getNumOfNullEventsInStore(TABLE_POJO_NAME_NAME_DIFF));
    +  }
    +
    +  @Test
    +  public void testJdbcPojoOutputOperatorMerge()
    +  {
    +    JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
    +    transactionalStore.setDatabaseDriver(DB_DRIVER);
    +    transactionalStore.setDatabaseUrl(URL);
    +
    +    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap =
    +        new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
    +    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
    +    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(
    +        OPERATOR_ID, attributeMap);
    +
    +    TestPOJONonInsertOutputOperator updateOperator = new TestPOJONonInsertOutputOperator();
    +    updateOperator.setBatchSize(3);
    +
    +    updateOperator.setStore(transactionalStore);
    +
    +    updateOperator.setSqlStatement("MERGE INTO " + TABLE_POJO_NAME + " AS T USING (VALUES
(?, ?)) AS FOO(id, name) "
    +        + "ON T.id = FOO.id "
    +        + "WHEN MATCHED THEN UPDATE SET name = FOO.name "
    +        + "WHEN NOT MATCHED THEN INSERT( id, name ) VALUES (FOO.id, FOO.name);");
    +
    +    List<JdbcFieldInfo> fieldInfos = Lists.newArrayList();
    +    fieldInfos.add(new JdbcFieldInfo("id", "id", null, "INTEGER"));
    +    fieldInfos.add(new JdbcFieldInfo("name", "name", null, "VARCHAR"));
    +    updateOperator.setFieldInfos(fieldInfos);
    +    updateOperator.setup(context);
    +
    +    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
    +    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
    +    TestPortContext tpc = new TestPortContext(portAttributes);
    +    updateOperator.input.setup(tpc);
    +
    +    updateOperator.activate(context);
    +
    +    List<TestPOJOEvent> events = Lists.newArrayList();
    +    for (int i = 0; i < 10; i++) {
    +      events.add(new TestPOJOEvent(i, "test" + i));
    +    }
    +    for (int i = 0; i < 5; i++) {
    +      events.add(new TestPOJOEvent(i, "test" + 100));
    +    }
    +
    +    updateOperator.getDistinctNonUnique();
    +    updateOperator.beginWindow(0);
    +    for (TestPOJOEvent event : events) {
    +      updateOperator.input.process(event);
    +    }
    +    updateOperator.endWindow();
    +
    +    Assert.assertEquals("rows in db", 10, updateOperator.getNumOfEventsInStore());
    --- End diff --
    
    Add comment on how this is verified.


> Add generic (insert, update, delete) support to JDBC Output Operator
> --------------------------------------------------------------------
>
>                 Key: APEXMALHAR-1953
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-1953
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: Bhupesh Chawda
>            Assignee: Bhupesh Chawda
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message