apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bhup...@apache.org
Subject [1/2] incubator-apex-malhar git commit: APEXMALHAR-1948: CassandraStore Should Allow You To Specify Protocol Version.
Date Mon, 28 Mar 2016 08:12:00 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master d9abee962 -> c829adaf1


APEXMALHAR-1948: CassandraStore Should Allow You To Specify Protocol Version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/84131850
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/84131850
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/84131850

Branch: refs/heads/master
Commit: 84131850c4bea86d5d08ddbaa1254027ccba2492
Parents: 5373a3c
Author: Priyanka Gugale <priyanka@datatorrent.com>
Authored: Wed Feb 10 15:14:08 2016 +0530
Committer: Priyanka Gugale <priyanka@datatorrent.com>
Committed: Fri Mar 18 15:32:20 2016 +0530

----------------------------------------------------------------------
 contrib/pom.xml                                 |  9 +++-
 .../contrib/cassandra/CassandraStore.java       | 49 ++++++++++++++++----
 .../cassandra/CassandraOperatorTest.java        | 29 +++++++++++-
 3 files changed, 74 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/84131850/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 687e9c9..4bbd8f5 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -486,7 +486,14 @@
     <dependency>
       <groupId>com.datastax.cassandra</groupId>
       <artifactId>cassandra-driver-core</artifactId>
-      <version>2.0.2</version>
+      <version>2.1.8</version>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>14.0.1</version>
+      <scope>provided</scope>
       <optional>true</optional>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/84131850/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java
index 49ed20c..5d9178c 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java
@@ -24,9 +24,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ProtocolVersion;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.exceptions.DriverException;
-
 import com.datatorrent.netlet.util.DTThrowable;
 import com.datatorrent.lib.db.Connectable;
 
@@ -47,6 +47,7 @@ public class CassandraStore implements Connectable
   private String node;
   protected transient Cluster cluster = null;
   protected transient Session session = null;
+  private String protocolVersion;
 
   @NotNull
   protected String keyspace;
@@ -90,6 +91,20 @@ public class CassandraStore implements Connectable
     this.password = password;
   }
 
+  public String getProtocolVersion()
+  {
+    return protocolVersion;
+  }
+
+  /**
+   * Sets the protocolVersion of Cassandra
+   * @param protocolVersion as V1, V2, V3 etc
+   */
+  public void setProtocolVersion(String protocolVersion)
+  {
+    this.protocolVersion = protocolVersion;
+  }
+
   @NotNull
   public String getNode() {
     return node;
@@ -115,21 +130,35 @@ public class CassandraStore implements Connectable
   /**
    * Creates a cluster object.
    */
-  public void buildCluster(){
-
+  public void buildCluster()
+  {
     try {
-
-      cluster = Cluster.builder()
-          .addContactPoint(node).withCredentials(userName, password).build();
-    }
-    catch (DriverException ex) {
+      if (protocolVersion != null && protocolVersion.length() != 0) {
+        ProtocolVersion version = getCassandraProtocolVersion();
+        cluster = Cluster.builder().addContactPoint(node).withCredentials(userName, password).withProtocolVersion(version).build();
+      } else {
+        cluster = Cluster.builder().addContactPoint(node).withCredentials(userName, password).build();
+      }
+    } catch (DriverException ex) {
       throw new RuntimeException("closing database resource", ex);
-    }
-    catch (Throwable t) {
+    } catch (Throwable t) {
       DTThrowable.rethrow(t);
     }
   }
 
+  private ProtocolVersion getCassandraProtocolVersion()
+  {
+    switch (protocolVersion.toUpperCase()) {
+      case "V1":
+        return ProtocolVersion.V1;
+      case "V2":
+        return ProtocolVersion.V2;
+      case "V3":
+        return ProtocolVersion.V3;
+      default:
+        throw new RuntimeException("Unsupported Cassandra Protocol Version.");
+    }
+  }
 
   /**
    * Create connection with database.

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/84131850/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
index 68a1e5c..56d9857 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
@@ -24,19 +24,18 @@ import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Attribute.AttributeMap;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
-
 import com.datatorrent.lib.helper.TestPortContext;
 import com.datatorrent.lib.util.FieldInfo;
 import com.datatorrent.netlet.util.DTThrowable;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.google.common.collect.Lists;
+
 import java.util.*;
 
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-
 import org.junit.AfterClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -219,6 +218,32 @@ public class CassandraOperatorTest
   }
 
   @Test
+  public void testCassandraProtocolVersion()
+  {
+    CassandraTransactionalStore transactionalStore = new CassandraTransactionalStore();
+    transactionalStore.setNode(NODE);
+    transactionalStore.setKeyspace(KEYSPACE);
+    transactionalStore.setProtocolVersion("v2");
+
+    AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID,
attributeMap);
+
+    TestOutputOperator outputOperator = new TestOutputOperator();
+
+    outputOperator.setTablename(TABLE_NAME);
+    List<FieldInfo> fieldInfos = Lists.newArrayList();
+    fieldInfos.add(new FieldInfo("id", "id", null));
+
+    outputOperator.setStore(transactionalStore);
+    outputOperator.setFieldInfos(fieldInfos);
+    outputOperator.setup(context);
+
+    Configuration config = outputOperator.getStore().getCluster().getConfiguration();
+    Assert.assertEquals("Procotol version was not set to V2.", ProtocolVersion.V2, config.getProtocolOptions().getProtocolVersionEnum());
+  }
+
+  @Test
   public void testCassandraOutputOperator()
   {
     CassandraTransactionalStore transactionalStore = new CassandraTransactionalStore();


Mime
View raw message