apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-1966) Cassandra output operator improvements
Date Thu, 09 Jun 2016 22:26:21 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-1966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15323504#comment-15323504
] 

ASF GitHub Bot commented on APEXMALHAR-1966:
--------------------------------------------

Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/295#discussion_r66533084
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
---
    @@ -162,35 +160,75 @@ public void activate(Context.OperatorContext context)
           }
           getters.add(getter);
         }
    +    super.activate(context);
    +  }
    +
    +  private void populateFieldInfosFromPojo(ColumnDefinitions rsMetaData)
    +  {
    +    fieldInfos = Lists.newArrayList();
    +    Field[] fields = pojoClass.getDeclaredFields();
    +    for (int i = 0; i < rsMetaData.size(); i++) {
    +      String columnName = rsMetaData.getName(i);
    +      String pojoField = getMatchingField(fields, columnName);
    +      if (pojoField != null && pojoField.length() != 0) {
    +        fieldInfos.add(new FieldInfo(columnName, pojoField, null));
    +      } else {
    +        LOG.error("Couldn't find corrosponding pojo field for column: " + columnName);
    +      }
    +    }
    +  }
    +
    +  private String getMatchingField(Field[] fields, String columnName)
    +  {
    +    for (Field f : fields) {
    +      if (f.getName().equalsIgnoreCase(columnName)) {
    +        return f.getName();
    +      }
    +    }
    +    return null;
       }
     
       @Override
       public void deactivate()
       {
       }
     
    +  /**
    +   * {@inheritDoc} <br/>
    +   * If statement/query is not specified by user, insert query is constructed from fileInfo
object and table name.
    +   */
       @Override
       protected PreparedStatement getUpdateCommand()
       {
    +    PreparedStatement statement;
    +    if (query == null) {
    +      statement = prepareStatementFromFieldsAndTableName();
    +    } else {
    +      statement = store.getSession().prepare(query);
    +    }
    +    LOG.debug("Statement is: " + statement.getQueryString());
    +    return statement;
    +  }
    +
    +  private PreparedStatement prepareStatementFromFieldsAndTableName()
    +  {
    +    if (tablename == null || tablename.length() == 0) {
    +      throw new RuntimeException("Please sepcify query or table name.");
    +    }
         StringBuilder queryfields = new StringBuilder();
         StringBuilder values = new StringBuilder();
    -    for (FieldInfo fieldInfo: fieldInfos) {
    +    for (FieldInfo fieldInfo : fieldInfos) {
           if (queryfields.length() == 0) {
             queryfields.append(fieldInfo.getColumnName());
             values.append("?");
    -      }
    -      else {
    +      } else {
             queryfields.append(",").append(fieldInfo.getColumnName());
             values.append(",").append("?");
           }
         }
    -    String statement
    -            = "INSERT INTO " + store.keyspace + "."
    -            + tablename
    -            + " (" + queryfields.toString() + ") "
    -            + "VALUES (" + values.toString() + ");";
    -    LOG.debug("statement is {}", statement);
    +    String statement = "INSERT INTO " + store.keyspace + "." + tablename + " (" + queryfields.toString()
+ ") " + "VALUES (" + values.toString() + ");";
    --- End diff --
    
    Checkstyle.
    Also why remove logging?


> Cassandra output operator improvements
> --------------------------------------
>
>                 Key: APEXMALHAR-1966
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-1966
>             Project: Apache Apex Malhar
>          Issue Type: Improvement
>            Reporter: Priyanka Gugale
>            Assignee: Priyanka Gugale
>
> Update existing Cassandra output operator to:
> 1. Accept use defined parameterized queries, the queries could be for update, insert
or delete.
> 2. Add error port to emit tuples which couldn't be written to database.
> 3. Add metrics
> 4. Provide a way to restrict batch size



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message