apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject incubator-apex-malhar git commit: APEXMALHAR-1942 #comment Added GeodePOJOOutput operator, GeodeStore implementation & unit tests
Date Mon, 08 Feb 2016 20:47:18 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master eaf74392c -> cec33da88


APEXMALHAR-1942 #comment Added GeodePOJOOutput operator, GeodeStore implementation & unit
tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/cec33da8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/cec33da8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/cec33da8

Branch: refs/heads/master
Commit: cec33da888a33e60bd62221c3c34e0dc4523d9f1
Parents: eaf7439
Author: prasi-in <indulkarprasad@gmail.com>
Authored: Tue Feb 9 01:40:25 2016 +0530
Committer: prasi-in <indulkarprasad@gmail.com>
Committed: Tue Feb 9 01:40:25 2016 +0530

----------------------------------------------------------------------
 contrib/pom.xml                                 |   8 +-
 .../geode/AbstractGeodeInputOperator.java       |  43 +++
 .../geode/AbstractGeodeOutputOperator.java      |  44 +++
 .../contrib/geode/GeodePOJOOutputOperator.java  |  70 +++++
 .../datatorrent/contrib/geode/GeodeStore.java   | 298 +++++++++++++++++++
 .../contrib/geode/GeodeOperatorTest.java        |  69 +++++
 .../contrib/geode/GeodePOJOOperatorTest.java    | 102 +++++++
 .../contrib/geode/GeodeStoreTest.java           | 110 +++++++
 8 files changed, 743 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cec33da8/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 17b6008..6145fac 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -616,6 +616,12 @@
       <artifactId>apex-common</artifactId>
       <version>${apex.core.version}</version>
       <type>jar</type>
-    </dependency>  
+    </dependency>
+    <dependency>
+      <groupId>org.apache.geode</groupId>
+      <artifactId>gemfire-core</artifactId>
+      <version>1.0.0-incubating.M1</version>
+      <optional>true</optional>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cec33da8/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeInputOperator.java
b/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeInputOperator.java
new file mode 100644
index 0000000..7595e1a
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeInputOperator.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.geode;
+
+import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator;
+
+/**
+ * This is the base implementation used for geode input adapters.&nbsp; A
+ * concrete operator should be created from this skeleton implementation.
+ * <p>
+ * </p>
+ * 
+ * @displayName Abstract Geode Input
+ * @category Input
+ * @tags geode, key value
+ *
+ * @param <T>
+ *          The tuple type.
+ * 
+ */
+public abstract class AbstractGeodeInputOperator<T> extends AbstractKeyValueStoreInputOperator<T,
GeodeStore>
+{
+  public AbstractGeodeInputOperator()
+  {
+    store = new GeodeStore();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cec33da8/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeOutputOperator.java
b/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeOutputOperator.java
new file mode 100644
index 0000000..157fbf4
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeOutputOperator.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.geode;
+
+import com.datatorrent.lib.db.AbstractStoreOutputOperator;
+
+/**
+ * This is the base implementation of geode output operators.&nbsp; A concrete
+ * operator should be created from this skeleton implementation.
+ * <p>
+ * </p>
+ * 
+ * @displayName Abstract Geode Output
+ * @category Output
+ * @tags geode, key value
+ *
+ * @param <T>
+ *          The tuple type.
+ * 
+ */
+public abstract class AbstractGeodeOutputOperator<T> extends AbstractStoreOutputOperator<T,
GeodeStore>
+{
+  public AbstractGeodeOutputOperator()
+  {
+    store = new GeodeStore();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cec33da8/contrib/src/main/java/com/datatorrent/contrib/geode/GeodePOJOOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodePOJOOutputOperator.java
b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodePOJOOutputOperator.java
new file mode 100644
index 0000000..ad7e90c
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodePOJOOutputOperator.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.geode;
+
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.datatorrent.lib.util.FieldInfo;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+import com.datatorrent.lib.util.TableInfo;
+
+/**
+ *
+ * @displayName Geode Output Operator
+ * @category Output
+ * @tags pojo, geode
+ * 
+ */
+@Evolving
+public class GeodePOJOOutputOperator extends AbstractGeodeOutputOperator<Object>
+{
+
+  private TableInfo<FieldInfo> tableInfo;
+  private transient Getter<Object, String> rowGetter;
+
+  @Override
+  public void processTuple(Object tuple)
+  {
+    if (rowGetter == null) {
+      rowGetter = PojoUtils.createGetter(tuple.getClass(), tableInfo.getRowOrIdExpression(),
String.class);
+    }
+
+    getStore().put(rowGetter.get(tuple), tuple);
+  }
+
+  /**
+   *
+   * the information to convert pojo
+   */
+  public TableInfo<FieldInfo> getTableInfo()
+  {
+    return tableInfo;
+  }
+
+  /**
+   *
+   * the information to convert pojo
+   */
+  public void setTableInfo(TableInfo<FieldInfo> tableInfo)
+  {
+    this.tableInfo = tableInfo;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cec33da8/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java
new file mode 100644
index 0000000..14cb5a5
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.geode;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.accumulo.core.client.impl.thrift.ThriftTest.Processor.throwsError;
+
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.EntryNotFoundException;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.cache.client.ClientCache;
+import com.gemstone.gemfire.cache.client.ClientCacheFactory;
+import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
+import com.gemstone.gemfire.cache.query.FunctionDomainException;
+import com.gemstone.gemfire.cache.query.NameResolutionException;
+import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.TypeMismatchException;
+
+import com.datatorrent.lib.db.KeyValueStore;
+
+/**
+ * Provides the implementation of a Geode store.
+ * Geode is a distributed in-memory database
+ *  that provides reliable asynchronous event notifications and guaranteed message delivery.
+ * Geode is a data management platform that provides real-time
+ * , consistent access to data-intensive applications.
+ * 
+ */
+public class GeodeStore implements KeyValueStore, Serializable
+{
+  /**
+   * 
+   */
+  private static final long serialVersionUID = -5076452548893319967L;
+  private static final Logger logger = LoggerFactory.getLogger(GeodeStore.class);
+  private transient ClientCache clientCache = null;
+  private transient Region<Object, Object> region = null;
+  private String locatorHost;
+  private int locatorPort;
+  private String regionName;
+
+  private ClientCache initClient()
+  {
+    try {
+      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
+    } catch (CacheClosedException ex) {
+      throw new RuntimeException("Exception while creating cache", ex);
+
+    }
+
+    return clientCache;
+  }
+
+  /**
+   * @return the regionName
+   */
+  public String getRegionName()
+  {
+    return regionName;
+  }
+
+
+  /**
+   * @return the clientCache
+   */
+  public ClientCache getClientCache()
+  {
+    return clientCache;
+  }
+
+  /**
+   * @return the locatorPort
+   */
+  public int getLocatorPort()
+  {
+    return locatorPort;
+  }
+
+  /**
+   * @param locatorPort
+   *          the locatorPort to set
+   */
+  public void setLocatorPort(int locatorPort)
+  {
+    this.locatorPort = locatorPort;
+  }
+
+  /**
+   * @return the locatorHost
+   */
+  public String getLocatorHost()
+  {
+    return locatorHost;
+  }
+
+  /**
+   * @param locatorHost
+   *          the locatorHost to set
+   */
+  public void setLocatorHost(String locatorHost)
+  {
+    this.locatorHost = locatorHost;
+  }
+
+  /**
+   * @return the region
+   * @throws IOException
+   */
+  public Region<Object, Object> getRegion() throws IOException
+  {
+    // return region;
+    if (clientCache == null || clientCache.isClosed()) {
+      initClient();
+    }
+
+    if (region == null) {
+      region = clientCache.getRegion(regionName);
+      if (region == null) {
+        region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
+      }
+    }
+
+    return region;
+  }
+
+  @Override
+  public void connect() throws IOException
+  {
+    try {
+      clientCache = new ClientCacheFactory().addPoolLocator(getLocatorHost(), getLocatorPort()).create();
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+
+    region = clientCache.getRegion(getRegionName());
+
+    if (region == null) {
+      region = clientCache.<Object, Object> createClientRegionFactory(ClientRegionShortcut.PROXY).create(
+          getRegionName());
+    }
+
+  }
+
+  @Override
+  public void disconnect() throws IOException
+  {
+    clientCache.close();
+
+  }
+
+  @Override
+  public boolean isConnected()
+  {
+    return (clientCache.isClosed());
+
+  }
+
+  /**
+   * Gets the value given the key. Note that it does NOT work with hash values
+   * or list values
+   *
+   * @param key
+   * @return The value.
+   */
+  @Override
+  public Object get(Object key)
+  {
+
+    try {
+      return (getRegion().get(key));
+    } catch (IOException ex) {
+      throw new RuntimeException("Exception while getting the object", ex);      
+
+    }
+
+  }
+
+  /**
+   * Gets all the values given the keys. Note that it does NOT work with hash
+   * values or list values
+   *
+   * @param keys
+   * @return All values for the given keys.
+   */
+  @SuppressWarnings("unchecked")
+  @Override
+  public List<Object> getAll(List<Object> keys)
+  {
+
+    List<Object> values = new ArrayList<Object>();
+
+    try {
+      final Map<Object, Object> entries = getRegion().getAll(keys);
+      for (int i = 0; i < keys.size(); i++) {
+        values.add(entries.get(keys.get(i)));
+      }
+    } catch (IOException ex) {
+      logger.info("error getting region ", ex);
+    }
+
+    return (values);
+
+  }
+
+  /**
+   * @param regionName
+   *          the regionName to set
+   */
+  public void setRegionName(String regionName)
+  {
+    this.regionName = regionName;
+  }
+
+
+  public Map<Object, Object> getAllMap(List<Object> keys)
+  {
+
+    try {
+      final Map<Object, Object> entries = getRegion().getAll(keys);
+      return (entries);
+    } catch (IOException ex) {
+      logger.info("error getting object ", ex);
+      return null;
+    }
+
+  }
+
+  @SuppressWarnings("rawtypes")
+  public SelectResults query(String predicate)
+  {
+    try {
+      return (getRegion().query(predicate));
+    } catch (FunctionDomainException | TypeMismatchException | NameResolutionException |
QueryInvocationTargetException
+        | IOException e) {
+      logger.info("error in querying object ", e);
+      return null;
+    }
+
+  }
+
+  @Override
+  public void put(Object key, Object value)
+  {
+    try {
+      getRegion().put(key, value);
+    } catch (IOException e) {
+      logger.info("while putting in region", e);
+    }
+  }
+
+  @Override
+  public void putAll(Map<Object, Object> map)
+  {
+    try {
+      getRegion().putAll(map);
+    } catch (IOException e) {
+      logger.info("while putting all in region", e);
+    }
+  }
+
+  @Override
+  public void remove(Object key)
+  {
+    try {
+      getRegion().destroy(key);
+    } catch (TimeoutException | CacheWriterException | EntryNotFoundException | IOException
e) {
+      logger.info("while deleting", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cec33da8/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeOperatorTest.java
new file mode 100644
index 0000000..5f06391
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeOperatorTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.geode;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.datatorrent.lib.db.KeyValueStoreOperatorTest;
+
+public class GeodeOperatorTest
+{
+  KeyValueStoreOperatorTest<GeodeStore> testFramework;
+
+  private GeodeStore geodeStore;
+  private GeodeStore testStore;
+
+  @Before
+  public void setup()
+  {
+
+    geodeStore = new GeodeStore();
+    testStore = new GeodeStore();
+
+    geodeStore.setLocatorHost("192.168.1.128");
+    geodeStore.setLocatorPort(50505);
+    geodeStore.setRegionName("operator");
+
+    testStore.setLocatorHost("192.168.1.128");
+    testStore.setLocatorPort(50505);
+    testStore.setRegionName("operator");
+
+    if (System.getProperty("dev.locator.connection") != null) {
+      geodeStore.setLocatorHost(System.getProperty("dev.locator.connection"));
+      testStore.setLocatorHost(System.getProperty("dev.locator.connection"));
+    }
+    testFramework = new KeyValueStoreOperatorTest<GeodeStore>(geodeStore, testStore);
+
+  }
+
+  @Test
+  public void testOutputOperator() throws Exception
+  {
+    testFramework.testOutputOperator();
+  }
+
+  @Test
+  public void testInputOperator() throws Exception
+  {
+    testFramework.testInputOperator();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cec33da8/contrib/src/test/java/com/datatorrent/contrib/geode/GeodePOJOOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/geode/GeodePOJOOperatorTest.java
b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodePOJOOperatorTest.java
new file mode 100644
index 0000000..0cacc02
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodePOJOOperatorTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.geode;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.datatorrent.contrib.util.TestPOJO;
+import com.datatorrent.contrib.util.TupleGenerator;
+import com.datatorrent.lib.util.TableInfo;
+import com.datatorrent.netlet.util.DTThrowable;
+
+@SuppressWarnings("rawtypes")
+public class GeodePOJOOperatorTest
+{
+  public static final int TUPLE_SIZE = 10;
+
+  private GeodeStore store;
+
+  @Before
+  public void setup()
+  {
+    store = new GeodeStore();
+    store.setLocatorHost("192.168.1.128");
+    if (System.getProperty("dev.locator.connection") != null) {
+      store.setLocatorHost(System.getProperty("dev.locator.connection"));
+    }
+    store.setLocatorPort(10334);
+    store.setRegionName("operator5");
+  }
+
+  public void cleanup()
+  {
+    if (store != null) {
+      try {
+        store.disconnect();
+      } catch (Exception e) {
+        DTThrowable.rethrow(e);
+      }
+    }
+
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testGeodeOutputOperatorInternal() throws Exception
+  {
+    GeodePOJOOutputOperator operator = new GeodePOJOOutputOperator();
+    operator.setStore(store);
+
+    TableInfo tableInfo = new TableInfo();
+    tableInfo.setRowOrIdExpression(TestPOJO.getRowExpression());
+    tableInfo.setFieldsInfo(TestPOJO.getFieldsInfo());
+    tableInfo.setRowOrIdExpression(TestPOJO.getRowExpression());
+    operator.setTableInfo(tableInfo);
+
+    operator.setup(null);
+
+    TupleGenerator<TestPOJO> generator = new TupleGenerator<TestPOJO>(TestPOJO.class);
+
+    for (int i = 0; i < TUPLE_SIZE; ++i) {
+      operator.processTuple(generator.getNextTuple());
+    }
+
+    generator.reset();
+
+    for (int i = 0; i < TUPLE_SIZE; ++i) {
+      operator.processTuple(generator.getNextTuple());
+    }
+
+    //    readDataAndVerify(operator.getStore(), generator);
+  }
+
+  public void readDataAndVerify(GeodeStore store, TupleGenerator<TestPOJO> generator)
+  {
+    generator.reset();
+
+    for (int i = 0; i < TUPLE_SIZE; ++i) {
+      TestPOJO expected = generator.getNextTuple();
+      TestPOJO read = (TestPOJO)store.get(expected.getRow());
+      Assert.assertTrue(String.format("expected={%s}, actually={%s}", expected.toString(),
read.toString()),
+          expected.completeEquals(read));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cec33da8/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeStoreTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeStoreTest.java b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeStoreTest.java
new file mode 100644
index 0000000..cad6876
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeStoreTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.geode;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.gemstone.gemfire.cache.query.SelectResults;
+
+public class GeodeStoreTest
+{
+
+  private GeodeStore geodeStore;
+
+  @Before
+  public void setup() throws IOException
+  {
+    geodeStore = new GeodeStore();
+    geodeStore.setLocatorHost("192.168.1.128");
+    if (System.getProperty("dev.locator.connection") != null) {
+      geodeStore.setLocatorHost(System.getProperty("dev.locator.connection"));
+    }
+    geodeStore.setLocatorPort(10334);
+    geodeStore.setRegionName("operator");
+    geodeStore.connect();
+  }
+
+  @After
+  public void tearDown() throws IOException
+  {
+    geodeStore.disconnect();
+  }
+
+  @Test
+  public void testputAllandget() throws Exception
+  {
+
+    Map<Object, Object> m = new HashMap<Object, Object>();
+    m.put("test1_abc", "123");
+    m.put("test1_def", "456");
+
+    geodeStore.putAll(m);
+
+    Assert.assertEquals("123", geodeStore.get("test1_abc"));
+    Assert.assertEquals("456", geodeStore.get("test1_def"));
+  }
+
+  @Test
+  public void testQuery() throws Exception
+  {
+    Map<Object, Object> m = new HashMap<Object, Object>();
+    m.put("test2_abc", "123");
+    m.put("test2_def", "456");
+    geodeStore.putAll(m);
+    String predicate = "Select key,value from /operator.entries where key like 'test2%'";
+    SelectResults results = geodeStore.query(predicate);
+    Assert.assertEquals(2, results.size());
+  }
+
+  @Test
+  public void testputAllandgetAll() throws Exception
+  {
+
+    Map<Object, Object> m = new HashMap<Object, Object>();
+    m.put("test3_abc", "123");
+    m.put("test3_def", "456");
+    geodeStore.putAll(m);
+
+    List<Object> keys = new ArrayList<Object>();
+    keys.add("test3_abc");
+    keys.add("test3_def");
+    Map<Object, Object> values = geodeStore.getAllMap(keys);
+
+    Assert.assertEquals("123", values.get("test3_abc"));
+    Assert.assertEquals("456", values.get("test3_def"));
+
+  }
+
+  @Test
+  public void testputandget() throws Exception
+  {
+    geodeStore.put("test4_abc", "123");
+    Assert.assertEquals("123", geodeStore.get("test4_abc"));
+  }
+
+}


Mime
View raw message