apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bhup...@apache.org
Subject apex-malhar git commit: APEXMALHAR-1966: Update casandra output opreator
Date Fri, 01 Jul 2016 10:49:05 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master ddd5bcf1a -> 32840a2ce


APEXMALHAR-1966: Update casandra output opreator


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/32840a2c
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/32840a2c
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/32840a2c

Branch: refs/heads/master
Commit: 32840a2cee5b4c9fde1c80c14df86679e47621eb
Parents: ddd5bcf
Author: Priyanka Gugale <priyanka@datatorrent.com>
Authored: Fri Apr 1 14:50:21 2016 +0530
Committer: Priyanka Gugale <priyanka@datatorrent.com>
Committed: Fri Jul 1 15:28:36 2016 +0530

----------------------------------------------------------------------
 .../cassandra/CassandraOutputOperator.java      |   4 +-
 ...tCassandraTransactionableOutputOperator.java |  35 ++++-
 ...assandraTransactionableOutputOperatorPS.java |  84 -----------
 .../cassandra/CassandraPOJOOutputOperator.java  | 150 +++++++++++++++----
 .../cassandra/CassandraOperatorTest.java        | 134 +++++++++++++----
 5 files changed, 256 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/32840a2c/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java
b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java
index 7d6f08c..666746b 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java
@@ -23,7 +23,7 @@ import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.exceptions.DriverException;
-import com.datatorrent.contrib.cassandra.AbstractCassandraTransactionableOutputOperatorPS;
+import com.datatorrent.contrib.cassandra.AbstractCassandraTransactionableOutputOperator;
 
 
 /**
@@ -31,7 +31,7 @@ import com.datatorrent.contrib.cassandra.AbstractCassandraTransactionableOutputO
  *
  * @since 1.0.3
  */
-public class CassandraOutputOperator extends  AbstractCassandraTransactionableOutputOperatorPS<Integer>{
+public class CassandraOutputOperator extends  AbstractCassandraTransactionableOutputOperator<Integer>{
 
   private int id = 0;
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/32840a2c/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperator.java
b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperator.java
index 9694bf0..9048383 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperator.java
@@ -20,14 +20,20 @@ package com.datatorrent.contrib.cassandra;
 
 import java.util.Collection;
 
+import javax.annotation.Nonnull;
+
 import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.exceptions.DriverException;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Operator.ActivationListener;
 import com.datatorrent.lib.db.AbstractBatchTransactionableStoreOutputOperator;
 
 /**
  * <p>
- * Generic base output adaptor which creates a transaction at the start of window.&nbsp;
Subclasses should provide implementation for getting the update statement.  <br/>
+ * Generic Cassandra output adaptor which creates a transaction at the start of window.&nbsp;
Subclasses should provide implementation for getting the update statement and setting the
statement parameters.  <br/>
  * </p>
  *
  * <p>
@@ -48,20 +54,33 @@ import com.datatorrent.lib.db.AbstractBatchTransactionableStoreOutputOperator;
  * @param <T>type of tuple</T>
  * @since 1.0.2
  */
-public abstract class AbstractCassandraTransactionableOutputOperator<T> extends AbstractBatchTransactionableStoreOutputOperator<T,
CassandraTransactionalStore> {
+public abstract class AbstractCassandraTransactionableOutputOperator<T> extends AbstractBatchTransactionableStoreOutputOperator<T,
CassandraTransactionalStore> implements ActivationListener<Context.OperatorContext>
+{
+  private transient PreparedStatement updateCommand;
 
-  public AbstractCassandraTransactionableOutputOperator(){
-    super();
+  @Override
+  public void activate(OperatorContext context)
+  {
+    updateCommand = getUpdateCommand();
   }
 
   /**
+   * Gets the statement which insert/update the table in the database.
+   *
+   * @return the cql statement to update a tuple in the database.
+   */
+  @Nonnull
+  protected abstract PreparedStatement getUpdateCommand();
+
+  /**
    * Sets the parameter of the insert/update statement with values from the tuple.
    *
    * @param tuple     tuple
    * @return statement The statement to execute
    * @throws DriverException
    */
-  protected abstract Statement getUpdateStatement(T tuple) throws DriverException;
+  protected abstract Statement setStatementParameters(PreparedStatement updateCommand, T
tuple) throws DriverException;
+
 
   @Override
   public void processBatch(Collection<T> tuples)
@@ -69,8 +88,12 @@ public abstract class AbstractCassandraTransactionableOutputOperator<T>
extends
     BatchStatement batchCommand = store.getBatchCommand();
     for(T tuple: tuples)
     {
-      batchCommand.add(getUpdateStatement(tuple));
+      batchCommand.add(setStatementParameters(updateCommand, tuple));
     }
   }
 
+  @Override
+  public void deactivate()
+  {
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/32840a2c/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java
b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java
deleted file mode 100644
index 21f1840..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.contrib.cassandra;
-
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.Statement;
-import com.datastax.driver.core.exceptions.DriverException;
-import com.datatorrent.api.Context;
-
-import javax.annotation.Nonnull;
-
-
-/**
- * <p>
- * Generic Cassandra Output Adaptor which creates a transaction at the start of window.&nbsp;
Subclasses should provide implementation for getting the update statement and setting the
statement parameters. <br/>
- * </p>
- *
- * <p>
- * Executes batch of CQL updates and closes the transaction at the end of the window.
- * Each tuple corresponds to an CQL update statement. The operator groups the updates in
a batch
- * and submits them with one call to the database. Batch processing improves performance
considerably and also provides atomicity.<br/>
- * The size of a batch is equal to the size of the window.
- * </p>
- *
- * <p>
- * The tuples in a window are stored in check-pointed collection which is cleared in the
endWindow().
- * This is needed for the recovery. The operator writes a tuple exactly once in the database,
which is why
- * only when all the updates are executed, the transaction is committed in the end window
call.
- * </p>
- * @displayName Abstract Cassandra Transactionable Output With Prepared Statement
- * @category Output
- * @tags cassandra, batch, transactionable
- * @param <T>type of tuple</T>
- * @since 1.0.2
- */
-public abstract class AbstractCassandraTransactionableOutputOperatorPS<T> extends AbstractCassandraTransactionableOutputOperator<T>{
-
-  private transient PreparedStatement updateCommand;
-
-  /**
-   * Gets the statement which insert/update the table in the database.
-   *
-   * @return the cql statement to update a tuple in the database.
-   */
-  @Nonnull
-  protected abstract PreparedStatement getUpdateCommand();
-
-  @Override
-  public void setup(Context.OperatorContext context)
-  {
-    super.setup(context);
-    updateCommand = getUpdateCommand();
-  }
-
-  /**
-   * Sets the parameter of the insert/update statement with values from the tuple.
-   *
-   * @param tuple     tuple
-   * @return statement The statement to execute
-   * @throws DriverException
-   */
-  protected abstract Statement setStatementParameters(PreparedStatement updateCommand, T
tuple) throws DriverException;
-
-  @Override
-  protected Statement getUpdateStatement(T tuple){
-    return setStatementParameters(updateCommand, tuple);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/32840a2c/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
index 5f3235a..2d1fea3 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
@@ -18,23 +18,23 @@
  */
 package com.datatorrent.contrib.cassandra;
 
+import java.lang.reflect.Field;
 import java.math.BigDecimal;
 import java.util.*;
 
-import javax.validation.constraints.NotNull;
-
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
 import com.datastax.driver.core.*;
 import com.datastax.driver.core.exceptions.DriverException;
-
+import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.Operator;
+import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.lib.util.FieldInfo;
 import com.datatorrent.lib.util.PojoUtils;
 import com.datatorrent.lib.util.PojoUtils.*;
@@ -50,17 +50,21 @@ import com.datatorrent.lib.util.PojoUtils.*;
  * @since 2.1.0
  */
 @Evolving
-public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<Object>
implements Operator.ActivationListener<Context.OperatorContext>
+public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionableOutputOperator<Object>
 {
-  @NotNull
   private List<FieldInfo> fieldInfos;
-  @NotNull
   private String tablename;
+  private String query;
 
   protected final transient ArrayList<DataType> columnDataTypes;
   protected final transient ArrayList<Object> getters;
   protected transient Class<?> pojoClass;
 
+  @AutoMetric
+  private long successfulRecords;
+  @AutoMetric
+  private long errorRecords;
+
   /**
    * The input port on which tuples are received for writing.
    */
@@ -81,18 +85,8 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl
 
   };
 
-  /*
-   * Tablename in cassandra.
-   */
-  public String getTablename()
-  {
-    return tablename;
-  }
-
-  public void setTablename(String tablename)
-  {
-    this.tablename = tablename;
-  }
+  @OutputPortFieldAnnotation(error = true)
+  public final transient DefaultOutputPort<Object> error = new DefaultOutputPort<>();
 
   public CassandraPOJOOutputOperator()
   {
@@ -102,20 +96,29 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl
   }
 
   @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+    successfulRecords = 0;
+    errorRecords = 0;
+  }
+
+  @Override
   public void activate(Context.OperatorContext context)
   {
     com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " +
store.keyspace + "." + tablename);
-
     final ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
 
-    final int numberOfColumns = rsMetaData.size();
+    if(fieldInfos == null) {
+      populateFieldInfosFromPojo(rsMetaData);
+    }
 
-    for (int i = 0; i < numberOfColumns; i++) {
+    for (FieldInfo fieldInfo : getFieldInfos()) {
       // get the designated column's data type.
-      final DataType type = rsMetaData.getType(i);
+      final DataType type = rsMetaData.getType(fieldInfo.getColumnName());
       columnDataTypes.add(type);
       final Object getter;
-      final String getterExpr = fieldInfos.get(i).getPojoFieldExpression();
+      final String getterExpr = fieldInfo.getPojoFieldExpression();
       switch (type.getName()) {
         case ASCII:
         case TEXT:
@@ -162,24 +165,64 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl
       }
       getters.add(getter);
     }
+    super.activate(context);
   }
 
-  @Override
-  public void deactivate()
+  private void populateFieldInfosFromPojo(ColumnDefinitions rsMetaData)
+  {
+    fieldInfos = Lists.newArrayList();
+    Field[] fields = pojoClass.getDeclaredFields();
+    for (int i = 0; i < rsMetaData.size(); i++) {
+      String columnName = rsMetaData.getName(i);
+      String pojoField = getMatchingField(fields, columnName);
+      if (pojoField != null && pojoField.length() != 0) {
+        fieldInfos.add(new FieldInfo(columnName, pojoField, null));
+      } else {
+        LOG.warn("Couldn't find corrosponding pojo field for column: " + columnName);
+      }
+    }
+  }
+
+  private String getMatchingField(Field[] fields, String columnName)
   {
+    for (Field f : fields) {
+      if (f.getName().equalsIgnoreCase(columnName)) {
+        return f.getName();
+      }
+    }
+    return null;
   }
 
+
+  /**
+   * {@inheritDoc} <br/>
+   * If statement/query is not specified by user, insert query is constructed from fileInfo
object and table name.
+   */
   @Override
   protected PreparedStatement getUpdateCommand()
   {
+    PreparedStatement statement;
+    if (query == null) {
+      statement = prepareStatementFromFieldsAndTableName();
+    } else {
+      statement = store.getSession().prepare(query);
+    }
+    LOG.debug("Statement is: " + statement.getQueryString());
+    return statement;
+  }
+
+  private PreparedStatement prepareStatementFromFieldsAndTableName()
+  {
+    if (tablename == null || tablename.length() == 0) {
+      throw new RuntimeException("Please sepcify query or table name.");
+    }
     StringBuilder queryfields = new StringBuilder();
     StringBuilder values = new StringBuilder();
     for (FieldInfo fieldInfo: fieldInfos) {
       if (queryfields.length() == 0) {
         queryfields.append(fieldInfo.getColumnName());
         values.append("?");
-      }
-      else {
+      } else {
         queryfields.append(",").append(fieldInfo.getColumnName());
         values.append(",").append("?");
       }
@@ -191,6 +234,7 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl
             + "VALUES (" + values.toString() + ");";
     LOG.debug("statement is {}", statement);
     return store.getSession().prepare(statement);
+
   }
 
   @Override
@@ -260,6 +304,18 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl
     return boundStmnt;
   }
 
+  @Override
+  public void processTuple(Object tuple)
+  {
+    try {
+      super.processTuple(tuple);
+      successfulRecords++;
+    } catch (RuntimeException e) {
+      LOG.error(e.getMessage());
+      error.emit(tuple);
+      errorRecords++;
+    }
+  }
   /**
    * A list of {@link FieldInfo}s where each item maps a column name to a pojo field name.
    */
@@ -281,5 +337,41 @@ public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionabl
     this.fieldInfos = fieldInfos;
   }
 
+  /**
+   * Gets cassandra table name
+   * @return tableName
+   */
+  public String getTablename()
+  {
+    return tablename;
+  }
+
+  /**
+   * Sets cassandra table name (optional if query is specified)
+   * @param tablename
+   */
+  public void setTablename(String tablename)
+  {
+    this.tablename = tablename;
+  }
+
+  /**
+   * Gets cql Query
+   * @return query
+   */
+  public String getQuery()
+  {
+    return query;
+  }
+
+  /**
+   * Sets cql Query
+   * @param query
+   */
+  public void setQuery(String query)
+  {
+    this.query = query;
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(CassandraPOJOOutputOperator.class);
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/32840a2c/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
index f4aa29a..74f99a8 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
@@ -33,7 +33,9 @@ import com.google.common.collect.Lists;
 
 import java.util.*;
 
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.AfterClass;
@@ -54,6 +56,8 @@ public class CassandraOperatorTest
   private static final int OPERATOR_ID = 0;
   private static Cluster cluster = null;
   private static Session session = null;
+  private OperatorContextTestHelper.TestIdOperatorContext context;
+  private TestPortContext tpc;
 
   @SuppressWarnings("unused")
   private static class TestEvent
@@ -107,6 +111,25 @@ public class CassandraOperatorTest
     }
   }
 
+  @Before
+  public void setupForTest()
+  {
+    AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
+
+    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPojo.class);
+    tpc = new TestPortContext(portAttributes);
+  }
+
+  @After
+  public void afterTest()
+  {
+    session.execute("TRUNCATE " + CassandraTransactionalStore.DEFAULT_META_TABLE);
+    session.execute("TRUNCATE " + KEYSPACE + "." + TABLE_NAME);
+  }
+
   private static class TestOutputOperator extends CassandraPOJOOutputOperator
   {
     public long getNumOfEventsInStore()
@@ -220,23 +243,9 @@ public class CassandraOperatorTest
   @Test
   public void testCassandraProtocolVersion()
   {
-    CassandraTransactionalStore transactionalStore = new CassandraTransactionalStore();
-    transactionalStore.setNode(NODE);
-    transactionalStore.setKeyspace(KEYSPACE);
-    transactionalStore.setProtocolVersion("v2");
-
-    AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
-    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
-    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID,
attributeMap);
-
-    TestOutputOperator outputOperator = new TestOutputOperator();
-
-    outputOperator.setTablename(TABLE_NAME);
-    List<FieldInfo> fieldInfos = Lists.newArrayList();
-    fieldInfos.add(new FieldInfo("id", "id", null));
+    TestOutputOperator outputOperator = setupForOutputOperatorTest();
+    outputOperator.getStore().setProtocolVersion("v2");
 
-    outputOperator.setStore(transactionalStore);
-    outputOperator.setFieldInfos(fieldInfos);
     outputOperator.setup(context);
 
     Configuration config = outputOperator.getStore().getCluster().getConfiguration();
@@ -246,17 +255,8 @@ public class CassandraOperatorTest
   @Test
   public void testCassandraOutputOperator()
   {
-    CassandraTransactionalStore transactionalStore = new CassandraTransactionalStore();
-    transactionalStore.setNode(NODE);
-    transactionalStore.setKeyspace(KEYSPACE);
-
-    AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
-    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
-    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID,
attributeMap);
+    TestOutputOperator outputOperator = setupForOutputOperatorTest();
 
-    TestOutputOperator outputOperator = new TestOutputOperator();
-
-    outputOperator.setTablename(TABLE_NAME);
     List<FieldInfo> fieldInfos = Lists.newArrayList();
     fieldInfos.add(new FieldInfo("id", "id", null));
     fieldInfos.add(new FieldInfo("age", "age", null));
@@ -269,14 +269,37 @@ public class CassandraOperatorTest
     fieldInfos.add(new FieldInfo("set1", "set1", null));
     fieldInfos.add(new FieldInfo("test", "test", null));
 
-    outputOperator.setStore(transactionalStore);
     outputOperator.setFieldInfos(fieldInfos);
     outputOperator.setup(context);
+    outputOperator.input.setup(tpc);
+    outputOperator.activate(context);
 
-    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
-    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPojo.class);
-    TestPortContext tpc = new TestPortContext(portAttributes);
+    List<TestPojo> events = Lists.newArrayList();
+    for (int i = 0; i < 3; i++) {
+      Set<Integer> set = new HashSet<Integer>();
+      set.add(i);
+      List<Integer> list = new ArrayList<Integer>();
+      list.add(i);
+      Map<String, Integer> map = new HashMap<String, Integer>();
+      map.put("key" + i, i);
+      events.add(new TestPojo(UUID.randomUUID(), i, "abclast", true, i, 2.0, set, list, map,
new Date(System.currentTimeMillis())));
+    }
+
+    outputOperator.beginWindow(0);
+    for (TestPojo event : events) {
+      outputOperator.input.process(event);
+    }
+    outputOperator.endWindow();
+
+    Assert.assertEquals("rows in db", 3, outputOperator.getNumOfEventsInStore());
+    outputOperator.getEventsInStore();
+  }
 
+  @Test
+  public void testPopulateFieldInfo()
+  {
+    TestOutputOperator outputOperator = setupForOutputOperatorTest();
+    outputOperator.setup(context);
     outputOperator.input.setup(tpc);
     outputOperator.activate(context);
 
@@ -301,6 +324,57 @@ public class CassandraOperatorTest
     outputOperator.getEventsInStore();
   }
 
+  @Test
+  public void testupdateQueryWithParameters() throws InterruptedException
+  {
+    UUID id = UUID.fromString("94ab597c-a5ff-4997-8343-68993d446b14");
+    TestPojo testPojo = new TestPojo(id, 20, "Laura", true, 10, 2.0, new HashSet<Integer>(),
new ArrayList<Integer>(), null, new Date(System.currentTimeMillis()));
+    String insert = "INSERT INTO " + KEYSPACE + "." + TABLE_NAME + " (ID, age, lastname,
test, floatValue, doubleValue)" + " VALUES (94ab597c-a5ff-4997-8343-68993d446b14, 20, 'Laura',
true, 10, 2.0);";
+    session.execute(insert);
+    String recordsQuery = "SELECT * from " + TABLE_NAME + ";";
+    ResultSet resultSetRecords = session.execute(recordsQuery);
+    Row row = resultSetRecords.iterator().next();
+    Assert.assertEquals("Updated last name", "Laura", row.getString("lastname"));
+    Thread.sleep(1000); // wait till cassandra writes the record
+
+    // update record
+    String updateLastName = "Laurel";
+    String updateQuery = "update " + KEYSPACE + "." + TABLE_NAME + " set lastname='" + updateLastName
+ "' where id=?";
+    // set specific files required by update command in order as per query
+    List<FieldInfo> fieldInfos = Lists.newArrayList();
+    fieldInfos.add(new FieldInfo("id", "id", null));
+
+    // reset the operator to run new query
+    TestOutputOperator outputOperator = setupForOutputOperatorTest();
+    outputOperator.setQuery(updateQuery);
+    outputOperator.setFieldInfos(fieldInfos);
+    outputOperator.setup(context);
+    outputOperator.input.setup(tpc);
+    outputOperator.activate(context);
+
+    outputOperator.beginWindow(1);
+    outputOperator.input.process(testPojo);
+    outputOperator.endWindow();
+
+    recordsQuery = "SELECT * from " + TABLE_NAME + ";";
+    resultSetRecords = session.execute(recordsQuery);
+    row = resultSetRecords.iterator().next();
+    Assert.assertEquals("Updated last name", updateLastName, row.getString("lastname"));
+  }
+
+  private TestOutputOperator setupForOutputOperatorTest()
+  {
+    CassandraTransactionalStore transactionalStore = new CassandraTransactionalStore();
+    transactionalStore.setNode(NODE);
+    transactionalStore.setKeyspace(KEYSPACE);
+
+    TestOutputOperator operator = new TestOutputOperator();
+    operator = new TestOutputOperator();
+    operator.setTablename(TABLE_NAME);
+    operator.setStore(transactionalStore);
+    return operator;
+  }
+
   /*
    * This test can be run on cassandra server installed on node17.
    */


Mime
View raw message