gossip-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecapri...@apache.org
Subject incubator-gossip git commit: GOSSIP-59 OrSet implementation
Date Sun, 26 Feb 2017 18:56:58 GMT
Repository: incubator-gossip
Updated Branches:
  refs/heads/master b71be5e16 -> 026b8bb48


GOSSIP-59 OrSet implementation


Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/026b8bb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/026b8bb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/026b8bb4

Branch: refs/heads/master
Commit: 026b8bb488890fea8772c56a69b48153242e4ed6
Parents: b71be5e
Author: Edward Capriolo <edlinuxguru@gmail.com>
Authored: Wed Feb 22 23:35:14 2017 -0500
Committer: Edward Capriolo <edlinuxguru@gmail.com>
Committed: Sun Feb 26 13:54:55 2017 -0500

----------------------------------------------------------------------
 .../java/org/apache/gossip/crdt/CrdtModule.java |  62 ++++
 .../java/org/apache/gossip/crdt/CrdtSet.java    |   2 +-
 .../org/apache/gossip/crdt/GrowOnlySet.java     |  17 +-
 src/main/java/org/apache/gossip/crdt/OrSet.java | 311 +++++++++++++++++++
 .../org/apache/gossip/manager/GossipCore.java   |  12 +-
 .../manager/random/RandomGossipManager.java     |   4 +
 .../gossip/udp/UdpSharedGossipDataMessage.java  |   4 +-
 src/test/java/org/apache/gossip/DataTest.java   |  56 +++-
 .../java/org/apache/gossip/crdt/OrSetTest.java  | 105 +++++++
 9 files changed, 551 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/026b8bb4/src/main/java/org/apache/gossip/crdt/CrdtModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/crdt/CrdtModule.java b/src/main/java/org/apache/gossip/crdt/CrdtModule.java
new file mode 100644
index 0000000..0c8a787
--- /dev/null
+++ b/src/main/java/org/apache/gossip/crdt/CrdtModule.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.crdt;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.Version;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+
+abstract class OrSetMixin<E> {
+  @JsonCreator
+  OrSetMixin(@JsonProperty("elements") Map<E, Set<UUID>> w, @JsonProperty("tombstones")
Map<E, Set<UUID>> h) { }
+  @JsonProperty("elements") abstract Map<E, Set<UUID>> getElements();
+  @JsonProperty("tombstones") abstract Map<E, Set<UUID>> getTombstones();
+  @JsonIgnore abstract boolean isEmpty();
+}
+
+abstract class GrowOnlySetMixin<E>{
+  @JsonCreator
+  GrowOnlySetMixin(@JsonProperty("elements") Set<E> elements){ }
+  @JsonProperty("elements") abstract Set<E> getElements();
+  @JsonIgnore abstract boolean isEmpty();
+}
+
+//If anyone wants to take a stab at this. please have at it
+//https://github.com/FasterXML/jackson-datatype-guava/blob/master/src/main/java/com/fasterxml/jackson/datatype/guava/ser/MultimapSerializer.java
+public class CrdtModule extends SimpleModule {
+
+  private static final long serialVersionUID = 6134836523275023418L;
+
+  public CrdtModule() {
+    super("CrdtModule", new Version(0, 0, 0, "0.0.0", "org.apache.gossip", "gossip"));
+  }
+
+  @Override
+  public void setupModule(SetupContext context) {
+    context.setMixInAnnotations(OrSet.class, OrSetMixin.class);
+    context.setMixInAnnotations(GrowOnlySet.class, GrowOnlySetMixin.class);
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/026b8bb4/src/main/java/org/apache/gossip/crdt/CrdtSet.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/crdt/CrdtSet.java b/src/main/java/org/apache/gossip/crdt/CrdtSet.java
index 3a5fbca..21b41da 100644
--- a/src/main/java/org/apache/gossip/crdt/CrdtSet.java
+++ b/src/main/java/org/apache/gossip/crdt/CrdtSet.java
@@ -20,7 +20,7 @@ package org.apache.gossip.crdt;
 import java.util.Set;
 
 public interface CrdtSet<ElementType, SetType extends Set<ElementType>, R extends
CrdtSet<ElementType, SetType, R>>
-extends Crdt<SetType, R>, Set<ElementType> {
+extends Crdt<SetType, R> {
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/026b8bb4/src/main/java/org/apache/gossip/crdt/GrowOnlySet.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/crdt/GrowOnlySet.java b/src/main/java/org/apache/gossip/crdt/GrowOnlySet.java
index 0b1771b..9e2dd49 100644
--- a/src/main/java/org/apache/gossip/crdt/GrowOnlySet.java
+++ b/src/main/java/org/apache/gossip/crdt/GrowOnlySet.java
@@ -66,72 +66,58 @@ public class GrowOnlySet<ElementType> implements CrdtSet<ElementType,
Set<Elemen
     return new GrowOnlySet<>(hidden);
   }
 
-  @Override
   public int size() {
     return hidden.size();
   }
 
-  @Override
   public boolean isEmpty() {
     return hidden.isEmpty();
   }
 
-  @Override
   public boolean contains(Object o) {
     return hidden.contains(o);
   }
 
-  @Override
   public Iterator<ElementType> iterator() {
     Set<ElementType> copy = new HashSet<>();
     copy.addAll(hidden);
     return copy.iterator();
   }
 
-  @Override
   public Object[] toArray() {
     return hidden.toArray();
   }
 
-  @Override
   public <T> T[] toArray(T[] a) {
     return hidden.toArray(a);
   }
 
-  @Override
   public boolean add(ElementType e) {
     throw new UnsupportedOperationException();
   }
 
-  @Override
   public boolean remove(Object o) {
     throw new UnsupportedOperationException();
   }
 
-  @Override
   public boolean containsAll(Collection<?> c) {
     return hidden.containsAll(c);
   }
 
-  @Override
   public boolean addAll(Collection<? extends ElementType> c) {
     throw new UnsupportedOperationException();
   }
 
-  @Override
   public boolean retainAll(Collection<?> c) {
     throw new UnsupportedOperationException();
   }
 
-  @Override
   public boolean removeAll(Collection<?> c) {
     throw new UnsupportedOperationException();
   }
 
-  @Override
   public void clear() {
     throw new UnsupportedOperationException();
-    
   }
 
   @Override
@@ -165,4 +151,7 @@ public class GrowOnlySet<ElementType> implements CrdtSet<ElementType,
Set<Elemen
     return true;
   }
 
+  Set<ElementType> getElements(){
+    return hidden;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/026b8bb4/src/main/java/org/apache/gossip/crdt/OrSet.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/crdt/OrSet.java b/src/main/java/org/apache/gossip/crdt/OrSet.java
new file mode 100644
index 0000000..972377f
--- /dev/null
+++ b/src/main/java/org/apache/gossip/crdt/OrSet.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.crdt;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.gossip.crdt.OrSet.Builder.Operation;
+
+/*
+ * A immutable set 
+ */
+public class OrSet<E>  implements Crdt<Set<E>, OrSet<E>> {
+  
+  private final Map<E, Set<UUID>> elements = new HashMap<>();
+  private final Map<E, Set<UUID>> tombstones = new HashMap<>();
+  private final transient Set<E> val;
+  
+  public OrSet(){
+    val = computeValue();
+  }
+  
+  OrSet(Map<E, Set<UUID>> elements, Map<E, Set<UUID>> tombstones){
+    this.elements.putAll(elements);
+    this.tombstones.putAll(tombstones);
+    val = computeValue();
+  }
+  
+  @SafeVarargs
+  public OrSet(E ... elements){
+    for (E e: elements){
+      internalAdd(e);
+    }
+    val = computeValue();
+  }
+  
+  public OrSet(Builder<E>builder){
+    for (Builder<E>.OrSetElement<E> e: builder.elements){
+      if (e.operation == Operation.ADD){
+        internalAdd(e.element);
+      } else {
+        internalRemove(e.element);
+      }
+    }
+    val = computeValue();
+  }
+  
+  /**
+   * This constructor is the way to remove elements from an existing set
+   * @param set
+   * @param builder 
+   */
+  public OrSet(OrSet<E> set, Builder<E> builder){
+    elements.putAll(set.elements);
+    tombstones.putAll(set.tombstones);
+    for (Builder<E>.OrSetElement<E> e: builder.elements){
+      if (e.operation == Operation.ADD){
+        internalAdd(e.element);
+      } else {
+        internalRemove(e.element);
+      }
+    }
+    val = computeValue();
+  }
+
+  public OrSet(OrSet<E> left, OrSet<E> right){
+    elements.putAll(left.elements);
+    elements.putAll(right.elements);
+    tombstones.putAll(left.tombstones);
+    tombstones.putAll(right.tombstones);
+    val = computeValue();
+  }
+  
+  public OrSet.Builder<E> builder(){
+    return new OrSet.Builder<>();
+  }
+  
+  @Override
+  public OrSet<E> merge(OrSet<E> other) {
+    return new OrSet<E>(this, other);
+  }
+  
+  private void internalAdd(E element){
+    Set<UUID> l = elements.get(element);
+    if (l == null){
+      Set<UUID> d = new HashSet<UUID>();
+      d.add(UUID.randomUUID());
+      elements.put(element, d);
+    } else {
+      l.add(UUID.randomUUID());
+    }
+  }
+  
+  private void internalRemove(E element){
+    Set<UUID> elementIds = elements.get(element);
+    if (elementIds == null){
+      //deleting elements not in the list
+      return;
+    }
+    Set<UUID> current = tombstones.get(element);
+    if (current != null){
+      current.addAll(elementIds);
+    } else {
+      tombstones.put(element, elementIds);
+    }
+  }
+
+  /*
+   * Computes the live values by analyzing the elements and tombstones
+   */
+  private Set<E> computeValue(){
+    Set<E> values = new HashSet<>();
+    for (Entry<E, Set<UUID>> entry: elements.entrySet()){
+      if (entry.getValue() == null || entry.getValue().size() == 0){
+        continue;
+      }
+      Set<UUID> deleteIds = tombstones.get(entry.getKey());
+      if (deleteIds == null){
+        values.add(entry.getKey());
+      } else {
+        if (!deleteIds.containsAll(entry.getValue())){
+          values.add(entry.getKey());
+        } else {
+          //if all the entry uuid is deleted the entry is deleted
+        }
+      }
+    }
+    return values;
+  }
+  
+  @Override
+  public Set<E> value() {
+    return val;
+  }
+
+  @Override
+  public OrSet<E> optimize() {
+    return this;
+  }
+  
+  public static class Builder<E> {
+    public static enum Operation {
+      ADD, REMOVE
+    };
+
+    private class OrSetElement<EL> {
+      EL element;
+      Operation operation;
+
+      private OrSetElement(EL element, Operation operation) {
+        this.element = element;
+        this.operation = operation;
+      }
+    }
+
+    private List<OrSetElement<E>> elements = new ArrayList<>();
+
+    public Builder<E> add(E element) {
+      elements.add(new OrSetElement<E>(element, Operation.ADD));
+      return this;
+    }
+
+    public Builder<E> remove(E element) {
+      elements.add(new OrSetElement<E>(element, Operation.REMOVE));
+      return this;
+    }
+
+    public Builder<E> mutate(E element, Operation operation) {
+      elements.add(new OrSetElement<E>(element, operation));
+      return this;
+    }
+  }
+
+  
+  public int size() {
+    return value().size();
+  }
+
+  
+  public boolean isEmpty() {
+    return value().size() == 0;
+  }
+
+  
+  public boolean contains(Object o) {
+    return value().contains(o);
+  }
+
+  
+  public Iterator<E> iterator() {
+    Iterator<E> managed = value().iterator();
+    return new Iterator<E>() {
+
+      @Override
+      public void remove() {
+        throw new IllegalArgumentException();
+      }
+
+      @Override
+      public boolean hasNext() {
+        return managed.hasNext();
+      }
+
+      @Override
+      public E next() {
+        return managed.next();
+      }
+      
+    };
+  }
+
+  public Object[] toArray() {
+    return value().toArray();
+  }
+
+  public <T> T[] toArray(T[] a) {
+    return value().toArray(a);
+  }
+
+  public boolean add(E e) {
+    throw new IllegalArgumentException("Can not add");
+  }
+
+
+  public boolean remove(Object o) {
+    throw new IllegalArgumentException();
+  }
+
+  public boolean containsAll(Collection<?> c) {
+    return this.value().containsAll(c);
+  }
+
+  public boolean addAll(Collection<? extends E> c) {
+    throw new IllegalArgumentException();
+  }
+
+  public boolean retainAll(Collection<?> c) {
+    throw new IllegalArgumentException();
+  }
+
+  public boolean removeAll(Collection<?> c) {
+    throw new IllegalArgumentException();
+  }
+
+  public void clear() {
+    throw new IllegalArgumentException();
+  }
+
+  @Override
+  public String toString() {
+    return "OrSet [elements=" + elements + ", tombstones=" + tombstones + "]" ;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((value() == null) ? 0 : value().hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    @SuppressWarnings("rawtypes")
+    OrSet other = (OrSet) obj;
+    if (elements == null) {
+      if (other.elements != null)
+        return false;
+    } else if (!value().equals(other.value()))
+      return false;
+    return true;
+  }
+
+  Map<E, Set<UUID>> getElements() {
+    return elements;
+  }
+
+  Map<E, Set<UUID>> getTombstones() {
+    return tombstones;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/026b8bb4/src/main/java/org/apache/gossip/manager/GossipCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
index de54597..dff6413 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -119,14 +119,15 @@ public class GossipCore implements GossipCoreConstants {
       sharedData.putIfAbsent(message.getKey(), message);
     } else {
       if (message.getPayload() instanceof Crdt){
-        SharedGossipDataMessage m = sharedData.get(message.getKey());
+        SharedGossipDataMessage curretnt = sharedData.get(message.getKey());
         SharedGossipDataMessage merged = new SharedGossipDataMessage();
         merged.setExpireAt(message.getExpireAt());
-        merged.setKey(m.getKey());
+        merged.setKey(curretnt.getKey());
         merged.setNodeId(message.getNodeId());
         merged.setTimestamp(message.getTimestamp());
-        merged.setPayload( ((Crdt) message.getPayload()).merge((Crdt)m.getPayload()));
-        sharedData.put(m.getKey(), merged);
+        Crdt mergedCrdt = ((Crdt) message.getPayload()).merge((Crdt)curretnt.getPayload());
+        merged.setPayload( mergedCrdt );
+        sharedData.put(curretnt.getKey(), merged);
       } else {
         if (previous.getTimestamp() < message.getTimestamp()) {
           sharedData.replace(message.getKey(), previous, message);
@@ -370,9 +371,10 @@ public class GossipCore implements GossipCoreConstants {
       copy.setExpireAt(message.getExpireAt());
       copy.setKey(message.getKey());
       copy.setNodeId(message.getNodeId());
+      copy.setTimestamp(message.getTimestamp());
       @SuppressWarnings("unchecked")
       Crdt merged = ((Crdt) ret.getPayload()).merge((Crdt) message.getPayload());
-      message.setPayload(merged);
+      copy.setPayload(merged);
       boolean replaced = sharedData.replace(message.getKey(), ret, copy);
       if (replaced){
         return merged;

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/026b8bb4/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
index bf8a8c3..3ac237a 100644
--- a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java
@@ -18,9 +18,11 @@
 package org.apache.gossip.manager.random;
 
 import com.codahale.metrics.MetricRegistry;
+import com.fasterxml.jackson.core.JsonGenerator.Feature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.gossip.GossipMember;
 import org.apache.gossip.GossipSettings;
+import org.apache.gossip.crdt.CrdtModule;
 import org.apache.gossip.event.GossipListener;
 import org.apache.gossip.manager.GossipManager;
 import org.apache.gossip.manager.handlers.DefaultMessageInvoker;
@@ -126,6 +128,8 @@ public class RandomGossipManager extends GossipManager {
       if (objectMapper == null) {
         objectMapper = new ObjectMapper();
         objectMapper.enableDefaultTyping();
+        objectMapper.registerModule(new CrdtModule());
+        objectMapper.configure(Feature.WRITE_NUMBERS_AS_STRINGS, false);
       }
       if (messageInvoker == null) {
         messageInvoker = new DefaultMessageInvoker();

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/026b8bb4/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java b/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java
index ba8e735..6020328 100644
--- a/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java
+++ b/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java
@@ -42,7 +42,9 @@ public class UdpSharedGossipDataMessage extends SharedGossipDataMessage
implemen
 
   @Override
   public String toString() {
-    return "UdpSharedGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + "]";
+    return "UdpSharedGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + ", getNodeId()="
+            + getNodeId() + ", getKey()=" + getKey() + ", getPayload()=" + getPayload()
+            + ", getTimestamp()=" + getTimestamp() + ", getExpireAt()=" + getExpireAt() +
"]";
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/026b8bb4/src/test/java/org/apache/gossip/DataTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java
index f05636b..3892e9b 100644
--- a/src/test/java/org/apache/gossip/DataTest.java
+++ b/src/test/java/org/apache/gossip/DataTest.java
@@ -29,6 +29,7 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.gossip.crdt.GrowOnlySet;
+import org.apache.gossip.crdt.OrSet;
 import org.apache.gossip.model.GossipDataMessage;
 import org.apache.gossip.model.SharedGossipDataMessage;
 import org.junit.Test;
@@ -37,6 +38,8 @@ import io.teknek.tunit.TUnit;
 
 public class DataTest {
   
+  private String orSetKey = "cror";
+  
   @Test
   public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{
     GossipSettings settings = new GossipSettings();
@@ -88,11 +91,62 @@ public class DataTest {
     givenDifferentDatumsInSet(clients);
     assertThatListIsMerged(clients);
     
+    givenOrs(clients);
+    assertThatOrSetIsMerged(clients);
+    dropIt(clients);
+    assertThatOrSetDelIsMerged(clients);
+    
     for (int i = 0; i < clusterMembers; ++i) {
       clients.get(i).shutdown();
     }
   }
   
+  private void givenOrs(List<GossipService> clients) {
+    {
+      SharedGossipDataMessage d = new SharedGossipDataMessage();
+      d.setKey(orSetKey);
+      d.setPayload(new OrSet<String>("1", "2"));
+      d.setExpireAt(Long.MAX_VALUE);
+      d.setTimestamp(System.currentTimeMillis());
+      clients.get(0).getGossipManager().merge(d);
+    }
+    {
+      SharedGossipDataMessage d = new SharedGossipDataMessage();
+      d.setKey(orSetKey);
+      d.setPayload(new OrSet<String>("3", "4"));
+      d.setExpireAt(Long.MAX_VALUE);
+      d.setTimestamp(System.currentTimeMillis());
+      clients.get(1).getGossipManager().merge(d);
+    }
+  }
+  
+  private void dropIt(List<GossipService> clients) {
+    @SuppressWarnings("unchecked")
+    OrSet<String> o = (OrSet<String>) clients.get(0).getGossipManager().findCrdt(orSetKey);
+    OrSet<String> o2 = new OrSet<String>(o, new OrSet.Builder<String>().remove("3"));
+    SharedGossipDataMessage d = new SharedGossipDataMessage();
+    d.setKey(orSetKey);
+    d.setPayload(o2);
+    d.setExpireAt(Long.MAX_VALUE);
+    d.setTimestamp(System.currentTimeMillis());
+    clients.get(0).getGossipManager().merge(d);
+  }
+  
+  private void assertThatOrSetIsMerged(final List<GossipService> clients){
+    TUnit.assertThat(() ->  {
+      return clients.get(0).getGossipManager().findCrdt(orSetKey).value();
+    }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new OrSet<String>("1", "2",
"3", "4").value());
+    TUnit.assertThat(() ->  {
+      return clients.get(1).getGossipManager().findCrdt(orSetKey).value();
+    }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new OrSet<String>("1", "2",
"3", "4").value());
+  }
+  
+  private void assertThatOrSetDelIsMerged(final List<GossipService> clients){
+    TUnit.assertThat(() ->  {
+      return clients.get(0).getGossipManager().findCrdt(orSetKey);
+    }).afterWaitingAtMost(10, TimeUnit.SECONDS).equals(new OrSet<String>("1", "2",
"4"));
+  }
+
   private void givenDifferentDatumsInSet(final List<GossipService> clients){
     clients.get(0).getGossipManager().merge(CrdtMessage("1"));
     clients.get(1).getGossipManager().merge(CrdtMessage("2"));
@@ -101,7 +155,7 @@ public class DataTest {
   private void assertThatListIsMerged(final List<GossipService> clients){
     TUnit.assertThat(() ->  {
       return clients.get(0).getGossipManager().findCrdt("cr");
-    }).afterWaitingAtMost(10, TimeUnit.SECONDS).equals(new GrowOnlySet<String>(Arrays.asList("1","2")));
+    }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlySet<String>(Arrays.asList("1","2")));
   }
   
   private SharedGossipDataMessage CrdtMessage(String item){

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/026b8bb4/src/test/java/org/apache/gossip/crdt/OrSetTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/crdt/OrSetTest.java b/src/test/java/org/apache/gossip/crdt/OrSetTest.java
new file mode 100644
index 0000000..8b8766a
--- /dev/null
+++ b/src/test/java/org/apache/gossip/crdt/OrSetTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.crdt;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.gossip.GossipService;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.RemoteGossipMember;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.codahale.metrics.MetricRegistry;
+
+public class OrSetTest {
+
+  @Test
+  public void atest() {
+    OrSet<Integer> i = new OrSet<>(new OrSet.Builder<Integer>().add(4).add(5).add(6).remove(5));
+    Assert.assertArrayEquals(Arrays.asList(4, 6).toArray(), i.value().toArray());
+  }
+  
+  @Test
+  public void mergeTest(){
+    OrSet<Integer> i = new OrSet<>(new OrSet.Builder<Integer>().add(4).add(5).add(6).remove(5));
+    Assert.assertArrayEquals(Arrays.asList(4, 6).toArray(), i.value().toArray());
+    OrSet<Integer> j = new OrSet<>(new OrSet.Builder<Integer>().add(9).add(4).add(5).remove(6));
+    OrSet<Integer> h = i.merge(j);
+    Assert.assertEquals(new OrSet<Integer>(4,6,9,5), h);
+  }
+  
+  @Test
+  public void mergeTest2(){
+    OrSet<Integer> i = new OrSet<>(new OrSet.Builder<Integer>().add(5).add(4).remove(4).add(6));
+    Assert.assertEquals(new OrSet<Integer>(5,6), i);
+    SortedSet<Integer> tree = new TreeSet<>();
+    for (Integer in: i.value()){
+      tree.add(in);
+    }
+    TreeSet<Integer> compare = new TreeSet<>();
+    compare.add(5);
+    compare.add(6);
+    Assert.assertEquals(tree, compare);
+  }
+  
+  @Test
+  public void mergeTest4() {
+    Assert.assertArrayEquals(new Integer[] {},
+            new OrSet<Integer>(new OrSet.Builder<Integer>().add(1).remove(1)).toArray());
+  }
+  
+  @Test
+  public void mergeTest3(){
+    OrSet<Integer> i = new OrSet<>(1);
+    OrSet<Integer> j = new OrSet<>(2);
+    OrSet<Integer> k = new OrSet<>(i.merge(j),  new OrSet.Builder<Integer>().remove(1));
+    Assert.assertArrayEquals(new Integer[] { 2 }, i.merge(j).merge(k).toArray());
+    Assert.assertArrayEquals(new Integer[] { 2 }, j.merge(i).merge(k).toArray());
+    Assert.assertArrayEquals(new Integer[] { 2 }, k.merge(i).merge(j).toArray());
+    Assert.assertArrayEquals(new Integer[] { 2 }, k.merge(j).merge(i).toArray());
+    Assert.assertEquals(j , i.merge(j.merge(k)));
+  }
+  
+  @Test
+  public void mergeTest9(){
+    OrSet<Integer> i = new OrSet<>(19);
+    OrSet<Integer> j = i.merge(i);
+    Assert.assertEquals(i.value(), j.value());
+  }
+  
+  @Test
+  public void serialTest() throws InterruptedException, URISyntaxException, IOException {
+    GossipService gossipService2 = new GossipService("a", new URI("udp://" + "127.0.0.1"
+ ":" + (29000 + 1)), "1", new HashMap<>(),
+            Arrays.asList(new RemoteGossipMember("a",
+                    new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0")),
+            new GossipSettings(), (a, b) -> { }, new MetricRegistry());
+    OrSet<Integer> i = new OrSet<Integer>(new OrSet.Builder<Integer>().add(1).remove(1));
+    String s = gossipService2.getGossipManager().getObjectMapper().writeValueAsString(i);
+    @SuppressWarnings("unchecked")
+    OrSet<Integer> back = gossipService2.getGossipManager().getObjectMapper().readValue(s,
OrSet.class);
+    Assert.assertEquals(back, i);
+  }
+  
+}


Mime
View raw message