gossip-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecapri...@apache.org
Subject incubator-gossip git commit: GOSSIP-62 Implement Crdt PN-Counter
Date Sun, 25 Jun 2017 19:23:06 GMT
Repository: incubator-gossip
Updated Branches:
  refs/heads/master c009b77d2 -> 49cdac62a


GOSSIP-62 Implement Crdt PN-Counter

restored layout of pom.xml for minimal changes in PR

Snapshot - I think I have the basic framework in place. No tests are passing and nothing works,
but most of the calls and the builder are in place.

capture working code

starting working on example code

Working examples

GOSSIP-65 Implement crdt LWW-Element-Set

LWWSet implemented + se/de + unit tests + jackson tests + DataTests

GOSSIP-55 Added event handlers to notify share data and per node data changes

Reformat code to match apache standard

Fixed DataTest errors WRT PNCounter


Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/49cdac62
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/49cdac62
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/49cdac62

Branch: refs/heads/master
Commit: 49cdac62a2fe7b26b5a070c62793d3ec4e2c42b5
Parents: c009b77
Author: Terry Weymouth <weymouth@umich.edu>
Authored: Sat Jun 10 17:07:33 2017 -0400
Committer: Terry Weymouth <weymouth@umich.edu>
Committed: Sun Jun 25 11:04:49 2017 -0400

----------------------------------------------------------------------
 gossip-base/pom.xml                             |   7 +
 .../java/org/apache/gossip/crdt/CrdtModule.java |   8 +
 .../java/org/apache/gossip/crdt/PNCounter.java  | 139 +++++++++++++++
 .../org/apache/gossip/crdt/PNCounterTest.java   | 137 +++++++++++++++
 .../gossip/examples/StandAlonePNCounter.java    | 170 ++++++++++++++++++
 .../test/java/org/apache/gossip/DataTest.java   | 172 ++++++++++++++-----
 pom.xml                                         |  11 +-
 7 files changed, 596 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/49cdac62/gossip-base/pom.xml
----------------------------------------------------------------------
diff --git a/gossip-base/pom.xml b/gossip-base/pom.xml
index 34f346c..e72a455 100644
--- a/gossip-base/pom.xml
+++ b/gossip-base/pom.xml
@@ -73,6 +73,13 @@
 				</exclusion>
 			</exclusions>
 		</dependency>
+		<dependency>
+			<groupId>org.mockito</groupId>
+			<artifactId>mockito-core</artifactId>
+			<version>${mockito.version}</version>
+			<scope>test</scope>
+		</dependency>
+		
 	</dependencies>
 	
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/49cdac62/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
index bb1a052..1c95b28 100644
--- a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
+++ b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
@@ -61,6 +61,13 @@ abstract class GrowOnlyCounterMixin {
   @JsonProperty("counters") abstract Map<String, Long> getCounters();
 }
 
+abstract class PNCounterMixin {
+  @JsonCreator
+  PNCounterMixin(@JsonProperty("p-counters") Map<String, Long> up, @JsonProperty("n-counters")
Map<String,Long> down) { }
+  @JsonProperty("p-counters") abstract Map<String, Long> getPCounters();
+  @JsonProperty("n-counters") abstract Map<String, Long> getNCounters();
+}
+
 //If anyone wants to take a stab at this. please have at it
 //https://github.com/FasterXML/jackson-datatype-guava/blob/master/src/main/java/com/fasterxml/jackson/datatype/guava/ser/MultimapSerializer.java
 public class CrdtModule extends SimpleModule {
@@ -76,6 +83,7 @@ public class CrdtModule extends SimpleModule {
     context.setMixInAnnotations(OrSet.class, OrSetMixin.class);
     context.setMixInAnnotations(GrowOnlySet.class, GrowOnlySetMixin.class);
     context.setMixInAnnotations(GrowOnlyCounter.class, GrowOnlyCounterMixin.class);
+    context.setMixInAnnotations(PNCounter.class, PNCounterMixin.class);
     context.setMixInAnnotations(LWWSet.class, LWWSetMixin.class);
     context.setMixInAnnotations(LWWSet.Timestamps.class, LWWSetTimestampsMixin.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/49cdac62/gossip-base/src/main/java/org/apache/gossip/crdt/PNCounter.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/PNCounter.java b/gossip-base/src/main/java/org/apache/gossip/crdt/PNCounter.java
new file mode 100644
index 0000000..f00a5f1
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/crdt/PNCounter.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.crdt;
+
+import java.util.Map;
+
+import org.apache.gossip.manager.GossipManager;
+
+public class PNCounter implements CrdtCounter<Long, PNCounter> {
+
+  private final GrowOnlyCounter pCount;
+
+  private final GrowOnlyCounter nCount;
+
+  PNCounter(Map<String, Long> pCounters, Map<String, Long> nCounters) {
+    pCount = new GrowOnlyCounter(pCounters);
+    nCount = new GrowOnlyCounter(nCounters);
+  }
+
+  public PNCounter(PNCounter starter, Builder builder) {
+    GrowOnlyCounter.Builder pBuilder = builder.makeGrowOnlyCounterBuilder(builder.pCount());
+    pCount = new GrowOnlyCounter(starter.pCount, pBuilder);
+    GrowOnlyCounter.Builder nBuilder = builder.makeGrowOnlyCounterBuilder(builder.nCount());
+    nCount = new GrowOnlyCounter(starter.nCount, nBuilder);
+  }
+
+  public PNCounter(Builder builder) {
+    GrowOnlyCounter.Builder pBuilder = builder.makeGrowOnlyCounterBuilder(builder.pCount());
+    pCount = new GrowOnlyCounter(pBuilder);
+    GrowOnlyCounter.Builder nBuilder = builder.makeGrowOnlyCounterBuilder(builder.nCount());
+    nCount = new GrowOnlyCounter(nBuilder);
+  }
+
+  public PNCounter(GossipManager manager) {
+    pCount = new GrowOnlyCounter(manager);
+    nCount = new GrowOnlyCounter(manager);
+  }
+
+  public PNCounter(PNCounter starter, PNCounter other) {
+    pCount = new GrowOnlyCounter(starter.pCount, other.pCount);
+    nCount = new GrowOnlyCounter(starter.nCount, other.nCount);
+  }
+
+  @Override
+  public PNCounter merge(PNCounter other) {
+    return new PNCounter(this, other);
+  }
+
+  @Override
+  public Long value() {
+    long pValue = (long) pCount.value();
+    long nValue = (long) nCount.value();
+    return pValue - nValue;
+  }
+
+  @Override
+  public PNCounter optimize() {
+    return new PNCounter(pCount.getCounters(), nCount.getCounters());
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (getClass() != obj.getClass())
+      return false;
+    PNCounter other = (PNCounter) obj;
+    return value().longValue() == other.value().longValue();
+  }
+
+  @Override
+  public String toString() {
+    return "PnCounter [pCount=" + pCount + ", nCount=" + nCount + ", value=" + value() +
"]";
+  }
+
+  Map<String, Long> getPCounters() {
+    return pCount.getCounters();
+  }
+
+  Map<String, Long> getNCounters() {
+    return nCount.getCounters();
+  }
+
+  public static class Builder {
+
+    private final GossipManager myManager;
+
+    private long value = 0L;
+
+    public Builder(GossipManager gossipManager) {
+      myManager = gossipManager;
+    }
+
+    public long pCount() {
+      if (value > 0) {
+        return value;
+      }
+      return 0;
+    }
+
+    public long nCount() {
+      if (value < 0) {
+        return -value;
+      }
+      return 0;
+    }
+
+    public org.apache.gossip.crdt.GrowOnlyCounter.Builder makeGrowOnlyCounterBuilder(long
value) {
+      org.apache.gossip.crdt.GrowOnlyCounter.Builder ret = new org.apache.gossip.crdt.GrowOnlyCounter.Builder(
+              myManager);
+      ret.increment(value);
+      return ret;
+    }
+
+    public PNCounter.Builder increment(long delta) {
+      value += delta;
+      return this;
+    }
+
+    public PNCounter.Builder decrement(long delta) {
+      value -= delta;
+      return this;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/49cdac62/gossip-base/src/test/java/org/apache/gossip/crdt/PNCounterTest.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/PNCounterTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/PNCounterTest.java
new file mode 100644
index 0000000..8128cde
--- /dev/null
+++ b/gossip-base/src/test/java/org/apache/gossip/crdt/PNCounterTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.crdt;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.manager.GossipManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PNCounterTest {
+
+  private List<GossipManager> mockManagers;
+
+  @Before
+  public void setupMocks() {
+    GossipManager manager1 = mock(GossipManager.class);
+    LocalMember mockMember1 = mock(LocalMember.class);
+    when(mockMember1.getId()).thenReturn("x");
+    when(manager1.getMyself()).thenReturn(mockMember1);
+
+    GossipManager manager2 = mock(GossipManager.class);
+    LocalMember mockMember2 = mock(LocalMember.class);
+    when(mockMember2.getId()).thenReturn("y");
+    when(manager2.getMyself()).thenReturn(mockMember2);
+
+    GossipManager manager3 = mock(GossipManager.class);
+    LocalMember mockMember3 = mock(LocalMember.class);
+    when(mockMember3.getId()).thenReturn("z");
+    when(manager3.getMyself()).thenReturn(mockMember3);
+
+    mockManagers = new ArrayList<GossipManager>();
+    mockManagers.add(manager1);
+    mockManagers.add(manager2);
+    mockManagers.add(manager3);
+  }
+
+  @Test
+  public void existanceTest() {
+    PNCounter counter = new PNCounter(mockManagers.get(0));
+    Assert.assertEquals(0, (long) counter.value());
+  }
+
+  @Test
+  public void localOperationTest() {
+    PNCounter counter = new PNCounter(mockManagers.get(0));
+    Assert.assertEquals(0, (long) counter.value());
+
+    counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).increment(5L));
+    Assert.assertEquals(5, (long) counter.value());
+
+    counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).increment(4L));
+    Assert.assertEquals(9, (long) counter.value());
+
+    counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).decrement(3L));
+    Assert.assertEquals(6, (long) counter.value());
+
+    counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).decrement(12L));
+    Assert.assertEquals(-6, (long) counter.value());
+  }
+
+  @Test
+  public void oddballLocalOperationTest() {
+    PNCounter counter = new PNCounter(mockManagers.get(0));
+    Assert.assertEquals(0, (long) counter.value());
+
+    counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).increment(-5L));
+    Assert.assertEquals(-5, (long) counter.value());
+
+    counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).increment(4L));
+    Assert.assertEquals(-1, (long) counter.value());
+
+    counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).decrement(-3L));
+    Assert.assertEquals(2, (long) counter.value());
+
+    counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).decrement(-12L));
+    Assert.assertEquals(14, (long) counter.value());
+  }
+
+  @Test
+  public void networkLikeOperations() {
+    PNCounter counter1 = new PNCounter(mockManagers.get(0));
+    PNCounter counter2 = new PNCounter(mockManagers.get(1));
+    PNCounter counter3 = new PNCounter(mockManagers.get(2));
+
+    Assert.assertEquals(0, (long) counter1.value());
+    Assert.assertEquals(0, (long) counter2.value());
+    Assert.assertEquals(0, (long) counter3.value());
+
+    counter1 = new PNCounter(counter1, new PNCounter.Builder(mockManagers.get(0)).increment(3L));
+    Assert.assertEquals(3, (long) counter1.value());
+
+    counter2 = new PNCounter(counter2, new PNCounter.Builder(mockManagers.get(1)).increment(5L));
+    Assert.assertEquals(5, (long) counter2.value());
+
+    counter3 = new PNCounter(counter3, new PNCounter.Builder(mockManagers.get(2)).decrement(7L));
+    Assert.assertEquals(-7, (long) counter3.value());
+
+    // 2 becomes 2 and 1
+    counter2 = counter2.merge(counter1);
+    Assert.assertEquals(8, (long) counter2.value());
+
+    // 3 becomes 3 and 1
+    counter3 = counter3.merge(counter1);
+    Assert.assertEquals(-4, (long) counter3.value());
+
+    // 3 becomes all
+    counter3 = counter3.merge(counter2);
+    Assert.assertEquals(1, (long) counter3.value());
+
+    // 2 becomes all - different order
+    counter2 = counter2.merge(counter3);
+    Assert.assertEquals(1, (long) counter3.value());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/49cdac62/gossip-examples/src/main/java/org/apache/gossip/examples/StandAlonePNCounter.java
----------------------------------------------------------------------
diff --git a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAlonePNCounter.java
b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAlonePNCounter.java
new file mode 100644
index 0000000..b0015be
--- /dev/null
+++ b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAlonePNCounter.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.examples;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.RemoteMember;
+import org.apache.gossip.crdt.PNCounter;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
+import org.apache.gossip.model.SharedDataMessage;
+
+public class StandAlonePNCounter {
+  private static ExampleCommon common = new ExampleCommon();
+  private static String lastInput = "{None}";
+
+  public static void main(String[] args) throws InterruptedException, IOException {
+    args = common.checkArgsForClearFlag(args);
+    GossipSettings s = new GossipSettings();
+    s.setWindowSize(1000);
+    s.setGossipInterval(100);
+    GossipManager gossipService = GossipManagerBuilder
+            .newBuilder()
+            .cluster("mycluster")
+            .uri(URI.create(args[0])).id(args[1])
+            .gossipMembers(
+                    Arrays.asList(new RemoteMember("mycluster", URI.create(args[2]), args[3])))
+            .gossipSettings(s)
+            .build();
+    gossipService.init();
+
+    new Thread(() -> {
+      while (true) {
+        common.optionallyClearTerminal();
+        printLiveMembers(gossipService);
+        printDeadMambers(gossipService);
+        printValues(gossipService);
+        try {
+          Thread.sleep(2000);
+        } catch (Exception ignore) {
+        }
+      }
+    }).start();
+
+    String line = null;
+    try (BufferedReader br = new BufferedReader(new InputStreamReader(System.in))) {
+      while ((line = br.readLine()) != null) {
+        System.out.println(line);
+        char op = line.charAt(0);
+        char blank = line.charAt(1);
+        String val = line.substring(2);
+        Long l = null;
+        boolean valid = true;
+        try {
+           l = Long.valueOf(val);
+        } catch (NumberFormatException ex) {
+          valid = false;
+        }
+        valid = valid &&
+          (
+            (blank == ' ') &&
+            ((op == 'i') || (op == 'd'))
+          );
+        if (valid) {
+          if (op == 'i') {
+            increment(l, gossipService);
+          } else if (op == 'd') {
+            decrement(l, gossipService);
+          }
+        }
+        setLastInput(line,valid);
+      }
+    }
+  }
+
+  private static void printValues(GossipManager gossipService) {
+    System.out.println("Last Input: " + getLastInput());
+    System.out.println("---------- " + (gossipService.findCrdt("myPNCounter") == null ? ""
+            : gossipService.findCrdt("myPNCounter").value()));
+    System.out.println("********** " + gossipService.findCrdt("myPNCounter"));
+  }
+
+  private static void printDeadMambers(GossipManager gossipService) {
+    List<LocalMember> members = gossipService.getDeadMembers();
+    if (members.isEmpty()) {
+       System.out.println("Dead: (none)");
+       return;
+    }
+    System.out.println("Dead: " + members.get(0));
+    for (int i = 1; i < members.size(); i++) {
+      System.out.println("    : " + members.get(i)); 
+    }
+  }
+
+  private static void printLiveMembers(GossipManager gossipService) {
+    List<LocalMember> members = gossipService.getLiveMembers();
+    if (members.isEmpty()) {
+       System.out.println("Live: (none)");
+       return;
+    }
+    System.out.println("Live: " + members.get(0));
+    for (int i = 1; i < members.size(); i++) {
+      System.out.println("    : " + members.get(i)); 
+    }
+  }
+
+  private static void increment(Long l, GossipManager gossipManager) {
+    PNCounter c = (PNCounter) gossipManager.findCrdt("myPNCounter");
+    if (c == null) {
+      c = new PNCounter(new PNCounter.Builder(gossipManager).increment((l)));
+    } else {
+      c = new PNCounter(c, new PNCounter.Builder(gossipManager).increment((l)));
+    }
+    SharedDataMessage m = new SharedDataMessage();
+    m.setExpireAt(Long.MAX_VALUE);
+    m.setKey("myPNCounter");
+    m.setPayload(c);
+    m.setTimestamp(System.currentTimeMillis());
+    gossipManager.merge(m);
+  }
+
+  private static void decrement(Long l, GossipManager gossipManager) {
+    PNCounter c = (PNCounter) gossipManager.findCrdt("myPNCounter");
+    if (c == null) {
+      c = new PNCounter(new PNCounter.Builder(gossipManager).decrement((l)));
+    } else {
+      c = new PNCounter(c, new PNCounter.Builder(gossipManager).decrement((l)));
+    }
+    SharedDataMessage m = new SharedDataMessage();
+    m.setExpireAt(Long.MAX_VALUE);
+    m.setKey("myPNCounter");
+    m.setPayload(c);
+    m.setTimestamp(System.currentTimeMillis());
+    gossipManager.merge(m);
+  }
+  
+  private static void setLastInput(String input, boolean valid) {
+    lastInput = input;
+    if (! valid) {
+      lastInput += " (invalid)";
+    }
+  }
+
+  private static String getLastInput() {
+    return lastInput;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/49cdac62/gossip-itest/src/test/java/org/apache/gossip/DataTest.java
----------------------------------------------------------------------
diff --git a/gossip-itest/src/test/java/org/apache/gossip/DataTest.java b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java
index 53408f8..e91426c 100644
--- a/gossip-itest/src/test/java/org/apache/gossip/DataTest.java
+++ b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java
@@ -17,55 +17,65 @@
  */
 package org.apache.gossip;
 
-import io.teknek.tunit.TUnit;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.gossip.crdt.GrowOnlyCounter;
 import org.apache.gossip.crdt.GrowOnlySet;
 import org.apache.gossip.crdt.LWWSet;
 import org.apache.gossip.crdt.OrSet;
+import org.apache.gossip.crdt.PNCounter;
 import org.apache.gossip.manager.GossipManager;
 import org.apache.gossip.manager.GossipManagerBuilder;
 import org.apache.gossip.model.PerNodeDataMessage;
 import org.apache.gossip.model.SharedDataMessage;
-import org.junit.Assert;
 import org.junit.Test;
 
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
+import io.teknek.tunit.TUnit;
 
 public class DataTest extends AbstractIntegrationBase {
 
   private String orSetKey = "cror";
   private String lwwSetKey = "crlww";
   private String gCounterKey = "crdtgc";
+  private String pnCounterKey = "crdtpn";
 
   @Test
-  public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{
+  public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException
{
     GossipSettings settings = new GossipSettings();
     settings.setPersistRingState(false);
     settings.setPersistDataState(false);
     String cluster = UUID.randomUUID().toString();
     int seedNodes = 1;
     List<Member> startupMembers = new ArrayList<>();
-    for (int i = 1; i < seedNodes + 1; ++i){
+    for (int i = 1; i < seedNodes + 1; ++i) {
       URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
       startupMembers.add(new RemoteMember(cluster, uri, i + ""));
     }
     final List<GossipManager> clients = new ArrayList<>();
     final int clusterMembers = 2;
-    for (int i = 1; i < clusterMembers + 1; ++i){
+    for (int i = 1; i < clusterMembers + 1; ++i) {
       URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
-      GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
-          .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
+      GossipManager gossipService = GossipManagerBuilder
+              .newBuilder()
+              .cluster(cluster).uri(uri)
+              .id(i + "")
+              .gossipMembers(startupMembers)
+              .gossipSettings(settings).build();
       clients.add(gossipService);
       gossipService.init();
       register(gossipService);
     }
     TUnit.assertThat(() -> {
       int total = 0;
-      for (int i = 0; i < clusterMembers; ++i){
+      for (int i = 0; i < clusterMembers; ++i) {
         total += clients.get(i).getLiveMembers().size();
       }
       return total;
@@ -89,105 +99,173 @@ public class DataTest extends AbstractIntegrationBase {
         return x.getPayload();
     }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c");
 
-
     givenDifferentDatumsInSet(clients);
     assertThatListIsMerged(clients);
 
     testOrSet(clients);
     testLWWSet(clients);
 
-    // test g counter
-    givenDifferentIncrement(clients);
-    assertThatCountIsUpdated(clients, 3);
-    givenIncreaseOther(clients);
-    assertThatCountIsUpdated(clients, 7);
+    testGrowOnlyCounter(clients);
+    testPNCounter(clients);
 
-    for (int i = 0; i < clusterMembers; ++i){
+    for (int i = 0; i < clusterMembers; ++i) {
       clients.get(i).shutdown();
     }
   }
 
-  private void testOrSet(final List<GossipManager> clients){
-    //populate
+  private void testOrSet(final List<GossipManager> clients) {
+    // populate
     clients.get(0).merge(generateSharedMsg(orSetKey, new OrSet<>("1", "2")));
     clients.get(1).merge(generateSharedMsg(orSetKey, new OrSet<>("3", "4")));
 
-    //assert merge
+    // assert merge
     assertMerged(clients.get(0), orSetKey, new OrSet<>("1", "2", "3", "4").value());
     assertMerged(clients.get(1), orSetKey, new OrSet<>("1", "2", "3", "4").value());
 
-    //drop element
+    // drop element
     @SuppressWarnings("unchecked")
     OrSet<String> o = (OrSet<String>) clients.get(0).findCrdt(orSetKey);
     OrSet<String> o2 = new OrSet<>(o, new OrSet.Builder<String>().remove("3"));
     clients.get(0).merge(generateSharedMsg(orSetKey, o2));
 
-    //assert deletion
+    // assert deletion
     assertMerged(clients.get(0), orSetKey, new OrSet<>("1", "2", "4").value());
     assertMerged(clients.get(1), orSetKey, new OrSet<>("1", "2", "4").value());
   }
 
-  private void testLWWSet(final List<GossipManager> clients){
-    //populate
+  private void testLWWSet(final List<GossipManager> clients) {
+    // populate
     clients.get(0).merge(generateSharedMsg(lwwSetKey, new LWWSet<>("1", "2")));
     clients.get(1).merge(generateSharedMsg(lwwSetKey, new LWWSet<>("3", "4")));
 
-    //assert merge
+    // assert merge
     assertMerged(clients.get(0), lwwSetKey, new LWWSet<>("1", "2", "3", "4").value());
     assertMerged(clients.get(1), lwwSetKey, new LWWSet<>("1", "2", "3", "4").value());
 
-    //drop element
+    // drop element
     @SuppressWarnings("unchecked")
     LWWSet<String> lww = (LWWSet<String>) clients.get(0).findCrdt(lwwSetKey);
     clients.get(0).merge(generateSharedMsg(lwwSetKey, lww.remove("3")));
 
-    //assert deletion
+    // assert deletion
     assertMerged(clients.get(0), lwwSetKey, new OrSet<>("1", "2", "4").value());
     assertMerged(clients.get(1), lwwSetKey, new OrSet<>("1", "2", "4").value());
   }
 
-  private void givenDifferentIncrement(final List<GossipManager> clients){
+  private void testGrowOnlyCounter(List<GossipManager> clients) {
+    givenDifferentIncrement(clients);
+    assertThatCountIsUpdated(clients, 3);
+    givenIncreaseOther(clients);
+    assertThatCountIsUpdated(clients, 7);
+  }
+
+  private void testPNCounter(List<GossipManager> clients) {
+    givenPNCounter(clients);
+    assertThatPNCounterSettlesAt(clients, 0);
+    int[] delta1 = { 2, 3 };
+    givenPNCounterUpdate(clients, delta1);
+    assertThatPNCounterSettlesAt(clients, 5);
+    int[] delta2 = { -3, 5 };
+    givenPNCounterUpdate(clients, delta2);
+    assertThatPNCounterSettlesAt(clients, 7);
+    int[] delta3 = { 1, 1 };
+    givenPNCounterUpdate(clients, delta3);
+    assertThatPNCounterSettlesAt(clients, 9);
+    int[] delta4 = { 1, -7 };
+    givenPNCounterUpdate(clients, delta4);
+    assertThatPNCounterSettlesAt(clients, 3);
+  }
+
+  private void givenDifferentIncrement(final List<GossipManager> clients) {
     Object payload = new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1L));
     clients.get(0).merge(generateSharedMsg(gCounterKey, payload));
     payload = new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(1)).increment(2L));
     clients.get(1).merge(generateSharedMsg(gCounterKey, payload));
   }
 
-  private void givenIncreaseOther(final List<GossipManager> clients){
+  private void givenIncreaseOther(final List<GossipManager> clients) {
     GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(1).findCrdt(gCounterKey);
     GrowOnlyCounter gc2 = new GrowOnlyCounter(gc,
-        new GrowOnlyCounter.Builder(clients.get(1)).increment(4L));
+            new GrowOnlyCounter.Builder(clients.get(1)).increment(4L));
 
     clients.get(1).merge(generateSharedMsg(gCounterKey, gc2));
   }
 
-  private void assertMerged(final GossipManager client, String key, final Set<String>
expected){
-    TUnit.assertThat(() -> client.findCrdt(key).value())
-        .afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(expected);
+  private void assertMerged(final GossipManager client, String key, final Set<String>
expected) {
+    TUnit.assertThat(() -> client.findCrdt(key).value()).afterWaitingAtMost(10, TimeUnit.SECONDS)
+            .isEqualTo(expected);
   }
 
-  private void givenDifferentDatumsInSet(final List<GossipManager> clients){
+  private void givenDifferentDatumsInSet(final List<GossipManager> clients) {
     clients.get(0).merge(CrdtMessage("1"));
     clients.get(1).merge(CrdtMessage("2"));
   }
 
-
-  private void assertThatCountIsUpdated(final List<GossipManager> clients, long finalCount){
+  private void assertThatCountIsUpdated(final List<GossipManager> clients, long finalCount)
{
     TUnit.assertThat(() -> clients.get(0).findCrdt(gCounterKey))
-        .afterWaitingAtMost(10, TimeUnit.SECONDS)
-        .isEqualTo(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(finalCount)));
+            .afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlyCounter(
+                    new GrowOnlyCounter.Builder(clients.get(0)).increment(finalCount)));
   }
 
-  private void assertThatListIsMerged(final List<GossipManager> clients){
-    TUnit.assertThat(() -> clients.get(0).findCrdt("cr"))
-        .afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlySet<>(Arrays.asList("1",
"2")));
+  private void assertThatListIsMerged(final List<GossipManager> clients) {
+    TUnit.assertThat(() -> clients.get(0).findCrdt("cr")).afterWaitingAtMost(10, TimeUnit.SECONDS)
+            .isEqualTo(new GrowOnlySet<>(Arrays.asList("1", "2")));
+  }
+  
+  private void givenPNCounter(List<GossipManager> clients) {
+    {
+      SharedDataMessage d = new SharedDataMessage();
+      d.setKey(pnCounterKey);
+      d.setPayload(new PNCounter(new PNCounter.Builder(clients.get(0))));
+      d.setExpireAt(Long.MAX_VALUE);
+      d.setTimestamp(System.currentTimeMillis());
+      clients.get(0).merge(d);
+    }
+    {
+      SharedDataMessage d = new SharedDataMessage();
+      d.setKey(pnCounterKey);
+      d.setPayload(new PNCounter(new PNCounter.Builder(clients.get(1))));
+      d.setExpireAt(Long.MAX_VALUE);
+      d.setTimestamp(System.currentTimeMillis());
+      clients.get(1).merge(d);
+    }
+  }
+
+  private void givenPNCounterUpdate(List<GossipManager> clients, int[] deltaArray)
{
+    int clientIndex = 0;
+    for (int delta: deltaArray) {
+      PNCounter c = (PNCounter) clients.get(clientIndex).findCrdt(pnCounterKey);
+      c = new PNCounter(c, new PNCounter.Builder(clients.get(clientIndex)).increment(((long)delta)));
+      SharedDataMessage d = new SharedDataMessage();
+      d.setKey(pnCounterKey);
+      d.setPayload(c);
+      d.setExpireAt(Long.MAX_VALUE);
+      d.setTimestamp(System.currentTimeMillis());
+      clients.get(clientIndex).merge(d);
+      clientIndex = (clientIndex + 1) % clients.size();
+    }
+  }
+
+  private void assertThatPNCounterSettlesAt(List<GossipManager> clients, long expectedValue)
{
+    for (GossipManager client: clients) {
+      TUnit.assertThat(() -> {
+        long value = 0;
+        Object o = client.findCrdt(pnCounterKey);
+        if (o != null) {
+          PNCounter c = (PNCounter)o;
+          value = c.value();
+        }
+        return value;
+      }).afterWaitingAtMost(10, TimeUnit.SECONDS)
+              .isEqualTo(expectedValue);
+    }
   }
 
-  private SharedDataMessage CrdtMessage(String item){
+  private SharedDataMessage CrdtMessage(String item) {
     return generateSharedMsg("cr", new GrowOnlySet<>(Arrays.asList(item)));
   }
 
-  private PerNodeDataMessage generatePerNodeMsg(String key, Object payload){
+  private PerNodeDataMessage generatePerNodeMsg(String key, Object payload) {
     PerNodeDataMessage g = new PerNodeDataMessage();
     g.setExpireAt(Long.MAX_VALUE);
     g.setKey(key);
@@ -196,7 +274,7 @@ public class DataTest extends AbstractIntegrationBase {
     return g;
   }
 
-  private SharedDataMessage generateSharedMsg(String key, Object payload){
+  private SharedDataMessage generateSharedMsg(String key, Object payload) {
     SharedDataMessage d = new SharedDataMessage();
     d.setKey(key);
     d.setPayload(payload);

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/49cdac62/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8740319..75a54a4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -32,7 +32,8 @@
 		<junit.vintage.version>4.12.0-M2</junit.vintage.version>
 		<log4j.version>1.2.17</log4j.version>
 		<tunit.version>0.0.0</tunit.version>
-		
+		<mockito.version>2.8.9</mockito.version>
+
 		<!-- plugins versions -->
 		<maven-compiler-plugin.version>3.5.1</maven-compiler-plugin.version>
 		<maven-eclipse-plugin.version>2.10</maven-eclipse-plugin.version>
@@ -116,6 +117,12 @@
 			<version>${tunit.version}</version>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.mockito</groupId>
+			<artifactId>mockito-core</artifactId>
+			<version>${mockito.version}</version>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 	
 	<build>
@@ -200,6 +207,8 @@
 						<exclude>eclipse_template.xml</exclude>
 						<!--  files generated by running the examples in gossip-examples  -->
 						<exclued>**/*.mycluster.*.json</exclued>
+						<!-- e.g.npm-debug.log -->
+						<exclued>**/*.log</exclued>
 					</excludes>
 				</configuration>
 				<executions>


Mime
View raw message