tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From spmalle...@apache.org
Subject [2/2] tinkerpop git commit: Refactored ResultQueue/Set and added more tests.
Date Fri, 29 Jul 2016 17:36:43 GMT
Refactored ResultQueue/Set and added more tests.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/3ca691bb
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/3ca691bb
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/3ca691bb

Branch: refs/heads/TINKERPOP-1278
Commit: 3ca691bbefecbfb1fdc9994709e45d3571bcda5c
Parents: c875cd9
Author: Stephen Mallette <spmva@genoprime.com>
Authored: Fri Jul 29 13:33:14 2016 -0400
Committer: Stephen Mallette <spmva@genoprime.com>
Committed: Fri Jul 29 13:33:14 2016 -0400

----------------------------------------------------------------------
 .../tinkerpop/gremlin/driver/ResultQueue.java   | 108 ++++++++++----
 .../tinkerpop/gremlin/driver/ResultSet.java     |   2 -
 .../gremlin/driver/ResultQueueTest.java         | 139 +++++++++++++++++++
 .../tinkerpop/gremlin/driver/ResultSetTest.java |  27 ++++
 4 files changed, 244 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ca691bb/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
index 94edd82..28d40b6 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.driver;
 
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
 import org.apache.tinkerpop.gremlin.process.remote.traversal.step.util.BulkedResult;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
 import org.javatuples.Pair;
 
@@ -33,13 +34,12 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * A queue of incoming {@link ResponseMessage} objects.  The queue is updated by the
- * {@link Handler.GremlinResponseHandler} until a response terminator is identified.
+ * A queue of incoming {@link Result} objects.  The queue is updated by the {@link Handler.GremlinResponseHandler}
+ * until a response terminator is identified.
  *
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
@@ -61,48 +61,92 @@ final class ResultQueue {
         this.readComplete = readComplete;
     }
 
+    /**
+     * Adds a {@link Result} to the queue which will be later read by the {@link ResultSet}.
+     *
+     * @param result a return value from the {@link Traversal} or script submitted for execution
+     */
     public void add(final Result result) {
         this.resultLinkedBlockingQueue.offer(result);
         tryDrainNextWaiting(false);
     }
 
+    /**
+     * Adds a side-effect to the queue which may later be read by the {@link ResultSet}.
Note that the side-effect
+     * is only returned when a {@link Traversal} is submitted and refers to the side-effects
defined in that traversal.
+     * A "script" will not return side-effects.
+     *
+     * @param k the key of the side-effect
+     * @param aggregateTo the value of the {@link ResponseMessage} metadata for {@link Tokens#ARGS_AGGREGATE_TO}.
+     * @param sideEffectValue the value of the side-effect itself
+     */
     public void addSideEffect(final String k, final String aggregateTo, final Object sideEffectValue)
{
-        if (aggregateTo.equals(Tokens.VAL_AGGREGATE_TO_BULKSET)) {
-            if (!sideEffectResult.containsKey(k))
-                putIfAbsent(k, new BulkSet());
-
-            final BulkSet bs = (BulkSet) sideEffectResult.get(k);
-            final BulkedResult bulkedResult = (BulkedResult) sideEffectValue;
-            bs.add(bulkedResult.getResult(), bulkedResult.getBulk());
-        } else if (aggregateTo.equals(Tokens.VAL_AGGREGATE_TO_LIST)) {
-            if (!sideEffectResult.containsKey(k))
-                putIfAbsent(k, new ArrayList());
-
-            ((List) sideEffectResult.get(k)).add(sideEffectValue);
-        } else if (aggregateTo.equals(Tokens.VAL_AGGREGATE_TO_MAP)) {
-            if (!sideEffectResult.containsKey(k))
-                putIfAbsent(k, new HashMap());
-
-            final Map m = (Map) sideEffectResult.get(k);
-            final Map.Entry entry = (Map.Entry) sideEffectValue;
-            m.put(entry.getKey(), entry.getValue());
-        } else if (aggregateTo.equals(Tokens.VAL_AGGREGATE_TO_NONE)) {
-            if (!sideEffectResult.containsKey(k))
-                putIfAbsent(k, sideEffectValue);
-        } else {
-            // TODO: make better
-            throw new IllegalStateException("Invalid aggregatedfjaldjfalkjfaljfalj flkajslf
ja");
+        switch (aggregateTo) {
+            case Tokens.VAL_AGGREGATE_TO_BULKSET:
+                if (!(sideEffectValue instanceof BulkedResult))
+                    throw new IllegalStateException(String.format("Side-effect \"%s\" value
%s is a %s which does not aggregate to %s",
+                            k, sideEffectValue, sideEffectValue.getClass().getSimpleName(),
aggregateTo));
+
+                if (!sideEffectResult.containsKey(k))
+                    putIfAbsent(k, new BulkSet());
+
+                final BulkSet<Object> bs = validateAndGet(k, aggregateTo, BulkSet.class);
+                final BulkedResult bulkedResult = (BulkedResult) sideEffectValue;
+                bs.add(bulkedResult.getResult(), bulkedResult.getBulk());
+                break;
+            case Tokens.VAL_AGGREGATE_TO_LIST:
+                if (!sideEffectResult.containsKey(k))
+                    putIfAbsent(k, new ArrayList());
+
+                final List<Object> list = validateAndGet(k, aggregateTo, List.class);
+                list.add(sideEffectValue);
+                break;
+            case Tokens.VAL_AGGREGATE_TO_MAP:
+                if (!(sideEffectValue instanceof Map.Entry))
+                    throw new IllegalStateException(String.format("Side-effect \"%s\" value
%s is a %s which does not aggregate to %s",
+                            k, sideEffectValue, sideEffectValue.getClass().getSimpleName(),
aggregateTo));
+
+                if (!sideEffectResult.containsKey(k))
+                    putIfAbsent(k, new HashMap());
+
+                final Map<Object,Object > m = validateAndGet(k, aggregateTo, Map.class);
+                final Map.Entry entry = (Map.Entry) sideEffectValue;
+                m.put(entry.getKey(), entry.getValue());
+                break;
+            case Tokens.VAL_AGGREGATE_TO_NONE:
+                if (!sideEffectResult.containsKey(k))
+                    putIfAbsent(k, sideEffectValue);
+                break;
+            default:
+                throw new IllegalStateException(String.format("%s is an invalid value for
%s", aggregateTo, Tokens.ARGS_AGGREGATE_TO));
         }
     }
 
-    private synchronized void putIfAbsent(final String key, final Object o) {
-        sideEffectResult.putIfAbsent(key, o);
+    private <V> V validateAndGet(final String k, final String aggregateTo, final Class<?>
expected) {
+        final Object shouldBe = sideEffectResult.get(k);
+        if (!(expected.isAssignableFrom(shouldBe.getClass())))
+            throw new IllegalStateException(String.format("Side-effect \"%s\" contains the
type %s that is not acceptable for %s",
+                    k, shouldBe.getClass().getSimpleName(), aggregateTo));
+
+        return (V) shouldBe;
     }
 
+    /**
+     * Gets the keys gather for the side-effect. If the queue is still filling (i.e. the
read is not complete) then
+     * there could be inconsistent results depending on when this method is called. It would
be best to wait to call
+     * this method on {@link #readComplete}.
+     */
     public Set<String> getSideEffectKeys() {
         return sideEffectResult.keySet();
     }
 
+    /**
+     * Gets the current values for the side-effect. If the queue is still filling (i.e. the
read is not complete) then
+     * there could be inconsistent results depending on when this method is called. It would
be best to wait to call
+     * this method on {@link #readComplete}.
+     *
+     * @param k the key of the side-effect
+     */
     public <V> V getSideEffect(final String k) {
         return (V) sideEffectResult.get(k);
     }
@@ -142,6 +186,10 @@ final class ResultQueue {
         this.drainAllWaiting();
     }
 
+    private synchronized void putIfAbsent(final String key, final Object o) {
+        sideEffectResult.putIfAbsent(key, o);
+    }
+
     /**
      * Completes the next waiting future if there is one.
      */

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ca691bb/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
index c5570d2..81fa49f 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
@@ -18,8 +18,6 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
-import org.apache.tinkerpop.gremlin.process.remote.traversal.step.util.BulkedResult;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ca691bb/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java
index 59be4ea..37190d5 100644
--- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultQueueTest.java
@@ -19,18 +19,26 @@
 package org.apache.tinkerpop.gremlin.driver;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.step.util.BulkedResult;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
+import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -286,4 +294,135 @@ public class ResultQueueTest extends AbstractResultQueueTest {
             t.interrupt();
         }
     }
+
+    @Test
+    public void shouldHandleBulkSetSideEffects() {
+        assertThat(resultQueue.getSideEffectKeys().isEmpty(), is(true));
+
+        resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_BULKSET, new BulkedResult("stephen",
1));
+        assertThat(resultQueue.getSideEffectKeys(), hasItem("a"));
+        assertEquals(1, ((BulkSet) resultQueue.getSideEffect("a")).get("stephen"));
+
+        resultQueue.addSideEffect("b", Tokens.VAL_AGGREGATE_TO_BULKSET, new BulkedResult("brian",
2));
+        assertThat(resultQueue.getSideEffectKeys(), hasItem("b"));
+        assertEquals(2, ((BulkSet) resultQueue.getSideEffect("b")).get("brian"));
+
+        resultQueue.addSideEffect("b", Tokens.VAL_AGGREGATE_TO_BULKSET, new BulkedResult("brian",
2));
+        assertThat(resultQueue.getSideEffectKeys(), hasItem("b"));
+        assertEquals(4, ((BulkSet) resultQueue.getSideEffect("b")).get("brian"));
+
+        resultQueue.addSideEffect("b", Tokens.VAL_AGGREGATE_TO_BULKSET, new BulkedResult("belinda",
6));
+        assertThat(resultQueue.getSideEffectKeys(), hasItem("b"));
+        assertEquals(6, ((BulkSet) resultQueue.getSideEffect("b")).get("belinda"));
+
+    }
+
+    @Test
+    public void shouldNotMixAggregatesForBulkSet() {
+        assertThat(resultQueue.getSideEffectKeys().isEmpty(), is(true));
+
+        resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_BULKSET, new BulkedResult("stephen",
1));
+        assertThat(resultQueue.getSideEffectKeys(), hasItem("a"));
+        assertEquals(1, ((BulkSet) resultQueue.getSideEffect("a")).get("stephen"));
+
+        try {
+            resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_BULKSET, Arrays.asList("stephen",
"kathy", "alice"));
+        } catch (Exception ex) {
+            assertThat(ex, instanceOf(IllegalStateException.class));
+            assertEquals("Side-effect \"a\" value [stephen, kathy, alice] is a ArrayList
which does not aggregate to bulkset", ex.getMessage());
+        }
+    }
+
+    @Test
+    public void shouldHandleListSideEffects() {
+        assertThat(resultQueue.getSideEffectKeys().isEmpty(), is(true));
+
+        resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_LIST, "stephen");
+        assertThat(resultQueue.getSideEffectKeys(), hasItem("a"));
+        List<String> l = resultQueue.getSideEffect("a");
+        assertEquals(1, l.size());
+        assertEquals("stephen", l.get(0));
+
+        resultQueue.addSideEffect("d", Tokens.VAL_AGGREGATE_TO_LIST, "daniel");
+        assertThat(resultQueue.getSideEffectKeys(), hasItem("d"));
+        l = resultQueue.getSideEffect("d");
+        assertEquals(1, l.size());
+        assertEquals("daniel", l.get(0));
+
+        resultQueue.addSideEffect("d", Tokens.VAL_AGGREGATE_TO_LIST, "dave");
+        assertThat(resultQueue.getSideEffectKeys(), hasItem("d"));
+        l = resultQueue.getSideEffect("d");
+        assertEquals(2, l.size());
+        assertThat(l, contains("daniel","dave"));
+    }
+
+    @Test
+    public void shouldNotMixAggregatesForList() {
+        assertThat(resultQueue.getSideEffectKeys().isEmpty(), is(true));
+
+        resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_BULKSET, new BulkedResult("stephen",
1));
+        assertThat(resultQueue.getSideEffectKeys(), hasItem("a"));
+        assertEquals(1, ((BulkSet) resultQueue.getSideEffect("a")).get("stephen"));
+
+        try {
+            resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_LIST, Arrays.asList("stephen",
"kathy", "alice"));
+        } catch (Exception ex) {
+            assertThat(ex, instanceOf(IllegalStateException.class));
+            assertEquals("Side-effect \"a\" contains the type BulkSet that is not acceptable
for list", ex.getMessage());
+        }
+    }
+
+    @Test
+    public void shouldHandleMapSideEffects() {
+        assertThat(resultQueue.getSideEffectKeys().isEmpty(), is(true));
+
+        final Map<String,String> m = new HashMap<>();
+        m.put("s", "stephen");
+        m.put("m", "marko");
+        m.put("d", "daniel");
+
+        m.entrySet().forEach(e -> {
+            resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_MAP, e);
+            assertThat(resultQueue.getSideEffectKeys(), hasItem("a"));
+            assertEquals(e.getValue(), ((Map) resultQueue.getSideEffect("a")).get(e.getKey()));
+        });
+
+        assertEquals(3, ((Map) resultQueue.getSideEffect("a")).size());
+    }
+
+    @Test
+    public void shouldNotMixAggregatesForMap() {
+        assertThat(resultQueue.getSideEffectKeys().isEmpty(), is(true));
+
+        final Map<String,String> m = new HashMap<>();
+        m.put("s", "stephen");
+
+        resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_MAP, m.entrySet().iterator().next());
+        assertThat(resultQueue.getSideEffectKeys(), hasItem("a"));
+        assertEquals("stephen", ((Map) resultQueue.getSideEffect("a")).get("s"));
+
+        try {
+            resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_MAP, Arrays.asList("stephen",
"kathy", "alice"));
+        } catch (Exception ex) {
+            assertThat(ex, instanceOf(IllegalStateException.class));
+            assertEquals("Side-effect \"a\" value [stephen, kathy, alice] is a ArrayList
which does not aggregate to map", ex.getMessage());
+        }
+    }
+
+    @Test
+    public void shouldHandleNotAggregateSideEffects() {
+        assertThat(resultQueue.getSideEffectKeys().isEmpty(), is(true));
+
+        final Map<String,String> m = new HashMap<>();
+        m.put("s", "stephen");
+        m.put("m", "marko");
+        m.put("d", "daniel");
+
+        resultQueue.addSideEffect("a", Tokens.VAL_AGGREGATE_TO_NONE, m);
+        assertThat(resultQueue.getSideEffectKeys(), hasItem("a"));
+        assertEquals("stephen", ((Map) resultQueue.getSideEffect("a")).get("s"));
+        assertEquals("marko", ((Map) resultQueue.getSideEffect("a")).get("m"));
+        assertEquals("daniel", ((Map) resultQueue.getSideEffect("a")).get("d"));
+        assertEquals(3, ((Map) resultQueue.getSideEffect("a")).size());
+    }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ca691bb/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
index ea3729f..f5588ab 100644
--- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
+++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
@@ -23,6 +23,7 @@ import org.junit.Test;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -193,4 +194,30 @@ public class ResultSetTest extends AbstractResultQueueTest {
 
         assertEquals(100, counter.get());
     }
+
+    @Test
+    public void shouldRetrieveSideEffects() throws Exception {
+        final Iterator itty = resultSet.iterator();
+        final CompletableFuture<Map<String,Object>> sideEffects = resultSet.getSideEffectResults();
+
+        assertThat(sideEffects.isDone(), is(false));
+
+        // queue is not marked finished so the side effect future is still not complete
+        addToQueue(100, 1, true, false);
+
+        for (int i = 0; i < 101; i++) {
+            assertThat(itty.hasNext(), is(true));
+        }
+
+        // now complete the queue
+        addToQueue(0, 1, true, true, 0);
+
+        // addToQueue doesn't block for "read complete" so gotta spin the thread
+        while (!readCompleted.isDone()) {
+            Thread.sleep(10);
+        }
+
+        // side effects are empty in this case, but that's fine for the purpose of this test
+        assertThat(sideEffects.isDone(), is(true));
+    }
 }


Mime
View raw message