Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B5526200CC5 for ; Tue, 27 Jun 2017 03:24:59 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B3E8C160BDE; Tue, 27 Jun 2017 01:24:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 88566160BDA for ; Tue, 27 Jun 2017 03:24:57 +0200 (CEST) Received: (qmail 56422 invoked by uid 500); 27 Jun 2017 01:24:56 -0000 Mailing-List: contact commits-help@gossip.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@gossip.incubator.apache.org Delivered-To: mailing list commits@gossip.incubator.apache.org Received: (qmail 56413 invoked by uid 99); 27 Jun 2017 01:24:56 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Jun 2017 01:24:56 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 73377C23C4 for ; Tue, 27 Jun 2017 01:24:55 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.23 X-Spam-Level: X-Spam-Status: No, score=-4.23 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id CGeo8qQgDn3f for ; Tue, 27 Jun 2017 01:24:50 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 764ED5F6CB for ; Tue, 27 Jun 2017 01:24:48 +0000 (UTC) Received: (qmail 56402 invoked by uid 99); 27 Jun 2017 01:24:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Jun 2017 01:24:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EE352DFF18; Tue, 27 Jun 2017 01:24:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ecapriolo@apache.org To: commits@gossip.incubator.apache.org Message-Id: <8d614c8bd0d14e67834618e9a8fa15e7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-gossip git commit: GOSSIP-64 Implement Max-Change-Sets Date: Tue, 27 Jun 2017 01:24:46 +0000 (UTC) archived-at: Tue, 27 Jun 2017 01:24:59 -0000 Repository: incubator-gossip Updated Branches: refs/heads/master 49cdac62a -> 176944951 GOSSIP-64 Implement Max-Change-Sets Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/17694495 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/17694495 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/17694495 Branch: refs/heads/master Commit: 1769449515114e8b81039b30d5d3cc95c3d80a7b Parents: 49cdac6 Author: Maxim Rusak Authored: Mon Jun 26 03:34:23 2017 +0300 Committer: edward Committed: Mon Jun 26 21:24:07 2017 -0400 ---------------------------------------------------------------------- .../apache/gossip/crdt/CrdtAddRemoveSet.java | 12 + .../java/org/apache/gossip/crdt/CrdtModule.java | 23 +- .../java/org/apache/gossip/crdt/LWWSet.java | 152 ---------- .../java/org/apache/gossip/crdt/LwwSet.java | 171 +++++++++++ .../org/apache/gossip/crdt/MaxChangeSet.java | 117 ++++++++ .../main/java/org/apache/gossip/crdt/OrSet.java | 25 +- .../gossip/crdt/AbstractCRDTStringSetTest.java | 133 +++++++++ .../java/org/apache/gossip/crdt/LWWSetTest.java | 155 ---------- .../java/org/apache/gossip/crdt/LwwSetTest.java | 71 +++++ .../apache/gossip/crdt/MaxChangeSetTest.java | 67 +++++ .../java/org/apache/gossip/crdt/OrSetTest.java | 54 ++-- .../test/java/org/apache/gossip/DataTest.java | 299 ++++++++----------- .../gossip/protocol/json/JacksonTest.java | 49 +-- 13 files changed, 789 insertions(+), 539 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/17694495/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtAddRemoveSet.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtAddRemoveSet.java b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtAddRemoveSet.java new file mode 100644 index 0000000..55ba019 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtAddRemoveSet.java @@ -0,0 +1,12 @@ +package org.apache.gossip.crdt; + +import java.util.Set; + +// Interface extends CrdtSet interface with add and remove operation that are guaranteed to be immutable. +// If your implementation provide immutable add/remove operations you can extend AbstractCRDTStringSetTest to check it in the most ways. + +public interface CrdtAddRemoveSet, R extends CrdtAddRemoveSet> extends CrdtSet { + R add(T element); + + R remove(T element); +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/17694495/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java index 1c95b28..7ec96e7 100644 --- a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java +++ b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java @@ -17,16 +17,16 @@ */ package org.apache.gossip.crdt; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.Version; import com.fasterxml.jackson.databind.module.SimpleModule; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + abstract class OrSetMixin { @JsonCreator OrSetMixin(@JsonProperty("elements") Map> w, @JsonProperty("tombstones") Map> h) { } @@ -37,8 +37,8 @@ abstract class OrSetMixin { abstract class LWWSetMixin { @JsonCreator - LWWSetMixin(@JsonProperty("data") Map struct) { } - @JsonProperty("data") abstract Map getStruct(); + LWWSetMixin(@JsonProperty("data") Map struct) { } + @JsonProperty("data") abstract Map getStruct(); } abstract class LWWSetTimestampsMixin { @@ -48,6 +48,12 @@ abstract class LWWSetTimestampsMixin { @JsonProperty("remove") abstract long getLatestRemove(); } +abstract class MaxChangeSetMixin { + @JsonCreator + MaxChangeSetMixin(@JsonProperty("data") Map struct) { } + @JsonProperty("data") abstract Map getStruct(); +} + abstract class GrowOnlySetMixin{ @JsonCreator GrowOnlySetMixin(@JsonProperty("elements") Set elements){ } @@ -84,8 +90,9 @@ public class CrdtModule extends SimpleModule { context.setMixInAnnotations(GrowOnlySet.class, GrowOnlySetMixin.class); context.setMixInAnnotations(GrowOnlyCounter.class, GrowOnlyCounterMixin.class); context.setMixInAnnotations(PNCounter.class, PNCounterMixin.class); - context.setMixInAnnotations(LWWSet.class, LWWSetMixin.class); - context.setMixInAnnotations(LWWSet.Timestamps.class, LWWSetTimestampsMixin.class); + context.setMixInAnnotations(LwwSet.class, LWWSetMixin.class); + context.setMixInAnnotations(LwwSet.Timestamps.class, LWWSetTimestampsMixin.class); + context.setMixInAnnotations(MaxChangeSet.class, MaxChangeSetMixin.class); } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/17694495/gossip-base/src/main/java/org/apache/gossip/crdt/LWWSet.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/LWWSet.java b/gossip-base/src/main/java/org/apache/gossip/crdt/LWWSet.java deleted file mode 100644 index b51ce7a..0000000 --- a/gossip-base/src/main/java/org/apache/gossip/crdt/LWWSet.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.crdt; - -import org.apache.gossip.manager.Clock; -import org.apache.gossip.manager.SystemClock; - -import java.util.*; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -public class LWWSet implements CrdtSet, LWWSet> { - static private Clock clock = new SystemClock(); - - private final Map struct; - - static class Timestamps { - private final long latestAdd; - private final long latestRemove; - - Timestamps(){ - latestAdd = 0; - latestRemove = 0; - } - - Timestamps(long add, long remove){ - latestAdd = add; - latestRemove = remove; - } - - long getLatestAdd() { - return latestAdd; - } - - long getLatestRemove() { - return latestRemove; - } - - // consider element present when addTime >= removeTime, so we prefer add to remove - boolean isPresent(){ - return latestAdd >= latestRemove; - } - - Timestamps updateAdd(){ - return new Timestamps(clock.nanoTime(), latestRemove); - } - - Timestamps updateRemove(){ - return new Timestamps(latestAdd, clock.nanoTime()); - } - - Timestamps merge(Timestamps other){ - if (other == null){ - return this; - } - return new Timestamps(Math.max(latestAdd, other.latestAdd), Math.max(latestRemove, other.latestRemove)); - } - } - - - public LWWSet(){ - struct = new HashMap<>(); - } - - @SafeVarargs - public LWWSet(ElementType... elements){ - this(new HashSet<>(Arrays.asList(elements))); - } - - public LWWSet(Set set){ - struct = new HashMap<>(); - for (ElementType e : set){ - struct.put(e, new Timestamps().updateAdd()); - } - } - - public LWWSet(LWWSet first, LWWSet second){ - Function timestampsFor = p -> { - Timestamps firstTs = first.struct.get(p); - Timestamps secondTs = second.struct.get(p); - if (firstTs == null){ - return secondTs; - } - return firstTs.merge(secondTs); - }; - struct = Stream.concat(first.struct.keySet().stream(), second.struct.keySet().stream()) - .distinct().collect(Collectors.toMap(p -> p, timestampsFor)); - } - - public LWWSet add(ElementType e){ - return this.merge(new LWWSet<>(e)); - } - - // for serialization - LWWSet(Map struct){ - this.struct = struct; - } - - Map getStruct() { - return struct; - } - - - public LWWSet remove(ElementType e){ - Timestamps eTimestamps = struct.get(e); - if (eTimestamps == null || !eTimestamps.isPresent()){ - return this; - } - Map changeMap = new HashMap<>(); - changeMap.put(e, eTimestamps.updateRemove()); - return this.merge(new LWWSet<>(changeMap)); - } - - @Override - public LWWSet merge(LWWSet other){ - return new LWWSet<>(this, other); - } - - @Override - public Set value(){ - return struct.entrySet().stream() - .filter(entry -> entry.getValue().isPresent()) - .map(Map.Entry::getKey) - .collect(Collectors.toSet()); - } - - @Override - public LWWSet optimize(){ - return this; - } - - @Override - public boolean equals(Object obj){ - return this == obj || (obj != null && getClass() == obj.getClass() && value().equals(((LWWSet) obj).value())); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/17694495/gossip-base/src/main/java/org/apache/gossip/crdt/LwwSet.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/LwwSet.java b/gossip-base/src/main/java/org/apache/gossip/crdt/LwwSet.java new file mode 100644 index 0000000..391cb09 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/crdt/LwwSet.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import org.apache.gossip.manager.Clock; +import org.apache.gossip.manager.SystemClock; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/* + Last write wins CrdtSet + Each operation has timestamp: when you add or remove SystemClock is used to get current time in nanoseconds. + When all add/remove operations are within the only node LWWSet is guaranteed to work like a Set. + If you have multiple nodes with ideally synchronized clocks: + You will observe operations on all machines later than on the initiator, but the last operations on cluster will win. + If you have some significant clock drift you will suffer from data loss. + + Read more: https://github.com/aphyr/meangirls#lww-element-set + + You can view examples of usage in tests: + LwwSetTest - unit tests + DataTest - integration test with 2 nodes, LWWSet was serialized/deserialized, sent between nodes, merged +*/ + +public class LwwSet implements CrdtAddRemoveSet, LwwSet> { + static private Clock clock = new SystemClock(); + + private final Map struct; + + static class Timestamps { + private final long latestAdd; + private final long latestRemove; + + Timestamps(){ + latestAdd = 0; + latestRemove = 0; + } + + Timestamps(long add, long remove){ + latestAdd = add; + latestRemove = remove; + } + + long getLatestAdd(){ + return latestAdd; + } + + long getLatestRemove(){ + return latestRemove; + } + + // consider element present when addTime >= removeTime, so we prefer add to remove + boolean isPresent(){ + return latestAdd >= latestRemove; + } + + Timestamps updateAdd(){ + return new Timestamps(clock.nanoTime(), latestRemove); + } + + Timestamps updateRemove(){ + return new Timestamps(latestAdd, clock.nanoTime()); + } + + Timestamps merge(Timestamps other){ + if (other == null){ + return this; + } + return new Timestamps(Math.max(latestAdd, other.latestAdd), Math.max(latestRemove, other.latestRemove)); + } + } + + + public LwwSet(){ + struct = new HashMap<>(); + } + + @SafeVarargs + public LwwSet(ElementType... elements){ + this(new HashSet<>(Arrays.asList(elements))); + } + + public LwwSet(Set set){ + struct = new HashMap<>(); + for (ElementType e : set){ + struct.put(e, new Timestamps().updateAdd()); + } + } + + public LwwSet(LwwSet first, LwwSet second){ + Function timestampsFor = p -> { + Timestamps firstTs = first.struct.get(p); + Timestamps secondTs = second.struct.get(p); + if (firstTs == null){ + return secondTs; + } + return firstTs.merge(secondTs); + }; + struct = Stream.concat(first.struct.keySet().stream(), second.struct.keySet().stream()) + .distinct().collect(Collectors.toMap(p -> p, timestampsFor)); + } + + public LwwSet add(ElementType e){ + return this.merge(new LwwSet<>(e)); + } + + // for serialization + LwwSet(Map struct){ + this.struct = struct; + } + + Map getStruct(){ + return struct; + } + + + public LwwSet remove(ElementType e){ + Timestamps eTimestamps = struct.get(e); + if (eTimestamps == null || !eTimestamps.isPresent()){ + return this; + } + Map changeMap = new HashMap<>(); + changeMap.put(e, eTimestamps.updateRemove()); + return this.merge(new LwwSet<>(changeMap)); + } + + @Override + public LwwSet merge(LwwSet other){ + return new LwwSet<>(this, other); + } + + @Override + public Set value(){ + return struct.entrySet().stream() + .filter(entry -> entry.getValue().isPresent()) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + } + + @Override + public LwwSet optimize(){ + return this; + } + + @Override + public boolean equals(Object obj){ + return this == obj || (obj != null && getClass() == obj.getClass() && value().equals(((LwwSet) obj).value())); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/17694495/gossip-base/src/main/java/org/apache/gossip/crdt/MaxChangeSet.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/MaxChangeSet.java b/gossip-base/src/main/java/org/apache/gossip/crdt/MaxChangeSet.java new file mode 100644 index 0000000..0b72b80 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/crdt/MaxChangeSet.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/* + Max Change Set CrdtSet. Value which has changed the most wins. + You cannot delete an element which is not present, and cannot add an element which is already present. + MC-sets are compact and do the right thing when changes to elements are infrequent compared to the gossiping period. + + Read more: https://github.com/aphyr/meangirls#max-change-sets + You can view examples of usage in tests: + MaxChangeSetTest - unit tests + DataTest - integration test with 2 nodes, MaxChangeSet was serialized/deserialized, sent between nodes, merged +*/ + +public class MaxChangeSet implements CrdtAddRemoveSet, MaxChangeSet> { + private final Map struct; + + public MaxChangeSet(){ + struct = new HashMap<>(); + } + + @SafeVarargs + public MaxChangeSet(ElementType... elements){ + this(new HashSet<>(Arrays.asList(elements))); + } + + public MaxChangeSet(Set set){ + struct = new HashMap<>(); + for (ElementType e : set){ + struct.put(e, 1); + } + } + + public MaxChangeSet(MaxChangeSet first, MaxChangeSet second){ + Function valueFor = element -> + Math.max(first.struct.getOrDefault(element, 0), second.struct.getOrDefault(element, 0)); + struct = Stream.concat(first.struct.keySet().stream(), second.struct.keySet().stream()) + .distinct().collect(Collectors.toMap(p -> p, valueFor)); + } + + // for serialization + MaxChangeSet(Map struct){ + this.struct = struct; + } + + Map getStruct(){ + return struct; + } + + private MaxChangeSet increment(ElementType e){ + Map changeMap = new HashMap<>(); + changeMap.put(e, struct.getOrDefault(e, 0) + 1); + return this.merge(new MaxChangeSet<>(changeMap)); + } + + public MaxChangeSet add(ElementType e){ + if (struct.getOrDefault(e, 0) % 2 == 1){ + return this; + } + return increment(e); + } + + public MaxChangeSet remove(ElementType e){ + if (struct.getOrDefault(e, 0) % 2 == 0){ + return this; + } + return increment(e); + } + + @Override + public MaxChangeSet merge(MaxChangeSet other){ + return new MaxChangeSet<>(this, other); + } + + @Override + public Set value(){ + return struct.entrySet().stream() + .filter(entry -> (entry.getValue() % 2 == 1)) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + } + + @Override + public MaxChangeSet optimize(){ + return this; + } + + @Override + public boolean equals(Object obj){ + return this == obj || (obj != null && getClass() == obj.getClass() && value().equals(((MaxChangeSet) obj).value())); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/17694495/gossip-base/src/main/java/org/apache/gossip/crdt/OrSet.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/OrSet.java b/gossip-base/src/main/java/org/apache/gossip/crdt/OrSet.java index f84dbc7..68b089a 100644 --- a/gossip-base/src/main/java/org/apache/gossip/crdt/OrSet.java +++ b/gossip-base/src/main/java/org/apache/gossip/crdt/OrSet.java @@ -26,7 +26,7 @@ import org.apache.gossip.crdt.OrSet.Builder.Operation; /* * A immutable set */ -public class OrSet implements Crdt, OrSet> { +public class OrSet implements CrdtAddRemoveSet, OrSet> { private final Map> elements = new HashMap<>(); private final Map> tombstones = new HashMap<>(); @@ -44,6 +44,10 @@ public class OrSet implements Crdt, OrSet> { @SafeVarargs public OrSet(E ... elements){ + this(new HashSet<>(Arrays.asList(elements))); + } + + public OrSet(Set elements) { for (E e: elements){ internalAdd(e); } @@ -109,7 +113,15 @@ public class OrSet implements Crdt, OrSet> { val = computeValue(); } - + + public OrSet add(E e) { + return this.merge(new OrSet<>(e)); + } + + public OrSet remove(E e) { + return new OrSet<>(this, new Builder().remove(e)); + } + public OrSet.Builder builder(){ return new OrSet.Builder<>(); } @@ -233,15 +245,6 @@ public class OrSet implements Crdt, OrSet> { return value().toArray(a); } - public boolean add(E e) { - throw new IllegalArgumentException("Can not add"); - } - - - public boolean remove(Object o) { - throw new IllegalArgumentException(); - } - public boolean containsAll(Collection c) { return this.value().containsAll(c); } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/17694495/gossip-base/src/test/java/org/apache/gossip/crdt/AbstractCRDTStringSetTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/AbstractCRDTStringSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/AbstractCRDTStringSetTest.java new file mode 100644 index 0000000..d4db4ce --- /dev/null +++ b/gossip-base/src/test/java/org/apache/gossip/crdt/AbstractCRDTStringSetTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.Ignore; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/* + Abstract test suit to test CrdtSets with Add and Remove operations. + It compares them with simple sets, validates add, remove, equals, value, etc. operations + To use it you should: + 1. subclass this and implement constructors + 2. implement CrdtAddRemoveSet in your CrdtSet + 3. make your CrdtSet immutable +*/ + +@Ignore +public abstract class AbstractCRDTStringSetTest, SetType>> { + abstract SetType construct(Set set); + + abstract SetType construct(); + + private Set sampleSet; + + @Before + public void setup(){ + sampleSet = new HashSet<>(); + sampleSet.add("4"); + sampleSet.add("5"); + sampleSet.add("12"); + } + + @Test + public void abstractSetConstructorTest(){ + Assert.assertEquals(construct(sampleSet).value(), sampleSet); + } + + @Test + public void abstractStressWithSetTest(){ + Set hashSet = new HashSet<>(); + SetType set = construct(); + for (int it = 0; it < 40; it++){ + SetType newSet; + if (it % 5 == 1){ + //deleting existing + String forDelete = hashSet.stream().skip((long) (hashSet.size() * Math.random())).findFirst().get(); + newSet = set.remove(forDelete); + Assert.assertEquals(set.value(), hashSet); // check old version is immutable + hashSet.remove(forDelete); + } else { + //adding + String forAdd = String.valueOf((int) (10000 * Math.random())); + newSet = set.add(forAdd); + Assert.assertEquals(set.value(), hashSet); // check old version is immutable + hashSet.add(forAdd); + } + set = newSet; + Assert.assertEquals(set.value(), hashSet); + } + } + + @Test + public void abstractEqualsTest(){ + SetType set = construct(sampleSet); + Assert.assertFalse(set.equals(sampleSet)); + SetType newSet = set.add("25"); + sampleSet.add("25"); + Assert.assertFalse(newSet.equals(set)); + Assert.assertEquals(construct(sampleSet), newSet); + } + + @Test + public void abstractRemoveMissingTest(){ + SetType set = construct(sampleSet); + set = set.add("25"); + set = set.remove("25"); + Assert.assertEquals(set.value(), sampleSet); + set = set.remove("25"); + set = set.add("25"); + sampleSet.add("25"); + Assert.assertEquals(set.value(), sampleSet); + } + + @Test + public void abstractStressMergeTest(){ + // in one-process context, add, remove and merge operations of lww are equal to operations of Set + // we've already checked it. Now just check merge + Set hashSet1 = new HashSet<>(), hashSet2 = new HashSet<>(); + SetType set1 = construct(), set2 = construct(); + + for (int it = 0; it < 100; it++){ + String forAdd = String.valueOf((int) (10000 * Math.random())); + if (it % 2 == 0){ + hashSet1.add(forAdd); + set1 = set1.add(forAdd); + } else { + hashSet2.add(forAdd); + set2 = set2.add(forAdd); + } + } + Assert.assertEquals(set1.value(), hashSet1); + Assert.assertEquals(set2.value(), hashSet2); + Set mergedSet = Stream.concat(hashSet1.stream(), hashSet2.stream()).collect(Collectors.toSet()); + Assert.assertEquals(set1.merge(set2).value(), mergedSet); + } + + @Test + public void abstractOptimizeTest(){ + Assert.assertEquals(construct(sampleSet).value(), sampleSet); + Assert.assertEquals(construct(sampleSet).optimize().value(), sampleSet); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/17694495/gossip-base/src/test/java/org/apache/gossip/crdt/LWWSetTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/LWWSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/LWWSetTest.java deleted file mode 100644 index bdd3258..0000000 --- a/gossip-base/src/test/java/org/apache/gossip/crdt/LWWSetTest.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.crdt; - -import org.apache.gossip.manager.Clock; -import org.apache.gossip.manager.SystemClock; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -public class LWWSetTest { - static private Clock clock = new SystemClock(); - private Set sampleSet; - - @Before - public void setup(){ - sampleSet = new HashSet<>(); - sampleSet.add(4); - sampleSet.add(5); - sampleSet.add(12); - } - - @Test - public void setConstructorTest(){ - Assert.assertEquals(new LWWSet<>(sampleSet).value(), sampleSet); - } - - @Test - public void stressWithSetTest(){ - Set set = new HashSet<>(); - LWWSet lww = new LWWSet<>(); - for (int it = 0; it < 100; it++){ - LWWSet newLww; - if (it % 5 == 1){ - //deleting existing - Integer forDelete = set.stream().skip((long) (set.size() * Math.random())).findFirst().get(); - newLww = lww.remove(forDelete); - Assert.assertEquals(lww.value(), set); // check old version is immutable - set.remove(forDelete); - } else { - //adding - Integer forAdd = (int) (10000 * Math.random()); - newLww = lww.add(forAdd); - Assert.assertEquals(lww.value(), set); // check old version is immutable - set.add(forAdd); - } - lww = newLww; - Assert.assertEquals(lww.value(), set); - } - } - - @Test - public void equalsTest(){ - LWWSet lww = new LWWSet<>(sampleSet); - Assert.assertFalse(lww.equals(sampleSet)); - LWWSet newLww = lww.add(25); - sampleSet.add(25); - Assert.assertFalse(newLww.equals(lww)); - Assert.assertEquals(new LWWSet<>(sampleSet), newLww); - } - - @Test - public void valueTest() { - Map map = new HashMap<>(); - map.put('a', new LWWSet.Timestamps(1, 0)); - map.put('b', new LWWSet.Timestamps(1, 2)); - map.put('c', new LWWSet.Timestamps(3, 3)); - Set toTest = new HashSet<>(); - toTest.add('a'); // for 'a' addTime > removeTime - toTest.add('c'); // for 'c' times are equal, we prefer add to remove - Assert.assertEquals(new LWWSet<>(map).value(), toTest); - Assert.assertEquals(new LWWSet<>(map), new LWWSet<>('a', 'c')); - } - - @Test - public void removeMissingTest(){ - LWWSet lww = new LWWSet<>(sampleSet); - lww = lww.add(25); - lww = lww.remove(25); - Assert.assertEquals(lww.value(), sampleSet); - lww = lww.remove(25); - lww = lww.add(25); - sampleSet.add(25); - Assert.assertEquals(lww.value(), sampleSet); - } - - @Test - public void stressMergeTest(){ - // in one-process context, add, remove and merge operations of lww are equal to operations of Set - // we've already checked it. Now just check merge - Set set1 = new HashSet<>(), set2 = new HashSet<>(); - LWWSet lww1 = new LWWSet<>(), lww2 = new LWWSet<>(); - - for (int it = 0; it < 100; it++){ - Integer forAdd = (int) (10000 * Math.random()); - if (it % 2 == 0){ - set1.add(forAdd); - lww1 = lww1.add(forAdd); - } else { - set2.add(forAdd); - lww2 = lww2.add(forAdd); - } - } - Assert.assertEquals(lww1.value(), set1); - Assert.assertEquals(lww2.value(), set2); - Set mergedSet = Stream.concat(set1.stream(), set2.stream()).collect(Collectors.toSet()); - Assert.assertEquals(lww1.merge(lww2).value(), mergedSet); - } - - @Test - public void fakeTimeMergeTest(){ - // try to create LWWSet with time from future (simulate other process with its own clock) and validate result - // check remove from the future - Map map = new HashMap<>(); - map.put(25, new LWWSet.Timestamps(clock.nanoTime(), clock.nanoTime() + 100000)); - LWWSet lww = new LWWSet<>(map); - Assert.assertEquals(lww, new LWWSet()); - //create new LWWSet with element 25, and merge with other LWW which has remove in future - Assert.assertEquals(new LWWSet<>(25).merge(lww), new LWWSet()); - - // add in future - map.put(25, new LWWSet.Timestamps(clock.nanoTime() + 100000, 0)); - lww = new LWWSet<>(map); - lww = lww.remove(25); - Assert.assertEquals(lww, new LWWSet<>(25)); // 25 is still here - } - - @Test - public void optimizeTest(){ - Assert.assertEquals(new LWWSet<>(sampleSet).value(), sampleSet); - Assert.assertEquals(new LWWSet<>(sampleSet).optimize().value(), sampleSet); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/17694495/gossip-base/src/test/java/org/apache/gossip/crdt/LwwSetTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/LwwSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/LwwSetTest.java new file mode 100644 index 0000000..8200b15 --- /dev/null +++ b/gossip-base/src/test/java/org/apache/gossip/crdt/LwwSetTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import org.apache.gossip.manager.Clock; +import org.apache.gossip.manager.SystemClock; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class LwwSetTest extends AbstractCRDTStringSetTest> { + static private Clock clock = new SystemClock(); + + LwwSet construct(Set set){ + return new LwwSet<>(set); + } + + LwwSet construct(){ + return new LwwSet<>(); + } + + @Test + public void valueTest(){ + Map map = new HashMap<>(); + map.put('a', new LwwSet.Timestamps(1, 0)); + map.put('b', new LwwSet.Timestamps(1, 2)); + map.put('c', new LwwSet.Timestamps(3, 3)); + Set toTest = new HashSet<>(); + toTest.add('a'); // for 'a' addTime > removeTime + toTest.add('c'); // for 'c' times are equal, we prefer add to remove + Assert.assertEquals(new LwwSet<>(map).value(), toTest); + Assert.assertEquals(new LwwSet<>(map), new LwwSet<>('a', 'c')); + } + + @Test + public void fakeTimeMergeTest(){ + // try to create LWWSet with time from future (simulate other process with its own clock) and validate result + // check remove from the future + Map map = new HashMap<>(); + map.put(25, new LwwSet.Timestamps(clock.nanoTime(), Long.MAX_VALUE)); + LwwSet lww = new LwwSet<>(map); + Assert.assertEquals(lww, new LwwSet()); + //create new LWWSet with element 25, and merge with other LWW which has remove in future + Assert.assertEquals(new LwwSet<>(25).merge(lww), new LwwSet()); + + // add in future + map.put(25, new LwwSet.Timestamps(Long.MAX_VALUE, 0)); + lww = new LwwSet<>(map); + lww = lww.remove(25); + Assert.assertEquals(lww, new LwwSet<>(25)); // 25 is still here + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/17694495/gossip-base/src/test/java/org/apache/gossip/crdt/MaxChangeSetTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/MaxChangeSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/MaxChangeSetTest.java new file mode 100644 index 0000000..2ba3f09 --- /dev/null +++ b/gossip-base/src/test/java/org/apache/gossip/crdt/MaxChangeSetTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class MaxChangeSetTest extends AbstractCRDTStringSetTest> { + MaxChangeSet construct(Set set){ + return new MaxChangeSet<>(set); + } + + MaxChangeSet construct(){ + return new MaxChangeSet<>(); + } + + @Test + public void valueTest(){ + Map struct = new HashMap<>(); + struct.put('a', 0); + struct.put('b', 1); + struct.put('c', 2); + struct.put('d', 3); + Set result = new HashSet<>(); + result.add('b'); + result.add('d'); + Assert.assertEquals(new MaxChangeSet<>(struct).value(), result); + } + + @Test + public void mergeTest(){ + MaxChangeSet set1 = new MaxChangeSet().add(1); // Set with one operation on 1 + MaxChangeSet set2 = new MaxChangeSet().add(1).remove(1); // two operations + Assert.assertEquals(set1.merge(set2), new MaxChangeSet()); // empty set wins + + set1 = set1.add(1).add(1).add(1); + // empty set still wins, repetitive operations do nothing, don't increase number of operations + Assert.assertEquals(set1.merge(set2), new MaxChangeSet()); + + set1 = set1.remove(1).add(1); // 3 operations + Assert.assertEquals(set1.merge(set2), new MaxChangeSet<>(1)); // full set wins now + + set2 = set2.remove(1).remove(1).remove(1); + // full set still wins, repetitive removes don't increase number of operations too + Assert.assertEquals(set1.merge(set2), new MaxChangeSet<>(1)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/17694495/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java index 70c0d51..bdaada9 100644 --- a/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java @@ -17,36 +17,44 @@ */ package org.apache.gossip.crdt; +import org.junit.Assert; +import org.junit.Test; + import java.util.Arrays; +import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -import org.junit.Assert; -import org.junit.Test; +public class OrSetTest extends AbstractCRDTStringSetTest> { + OrSet construct(){ + return new OrSet<>(); + } -public class OrSetTest { + OrSet construct(Set set){ + return new OrSet<>(set); + } @Test - public void atest() { + public void atest(){ OrSet i = new OrSet<>(new OrSet.Builder().add(4).add(5).add(6).remove(5)); Assert.assertArrayEquals(Arrays.asList(4, 6).toArray(), i.value().toArray()); } - + @Test public void mergeTest(){ OrSet i = new OrSet<>(new OrSet.Builder().add(4).add(5).add(6).remove(5)); Assert.assertArrayEquals(Arrays.asList(4, 6).toArray(), i.value().toArray()); OrSet j = new OrSet<>(new OrSet.Builder().add(9).add(4).add(5).remove(6)); OrSet h = i.merge(j); - Assert.assertEquals(new OrSet(4,6,9,5), h); + Assert.assertEquals(new OrSet<>(4, 6, 9, 5), h); } - + @Test public void mergeTest2(){ OrSet i = new OrSet<>(new OrSet.Builder().add(5).add(4).remove(4).add(6)); - Assert.assertEquals(new OrSet(5,6), i); + Assert.assertEquals(new OrSet<>(5, 6), i); SortedSet tree = new TreeSet<>(); - for (Integer in: i.value()){ + for (Integer in : i.value()){ tree.add(in); } TreeSet compare = new TreeSet<>(); @@ -54,34 +62,34 @@ public class OrSetTest { compare.add(6); Assert.assertEquals(tree, compare); } - + @Test - public void mergeTest4() { - Assert.assertArrayEquals(new Integer[] {}, - new OrSet(new OrSet.Builder().add(1).remove(1)).toArray()); + public void mergeTest4(){ + Assert.assertArrayEquals(new Integer[]{}, + new OrSet<>(new OrSet.Builder().add(1).remove(1)).toArray()); } - + @Test public void mergeTest3(){ OrSet i = new OrSet<>(1); OrSet j = new OrSet<>(2); - OrSet k = new OrSet<>(i.merge(j), new OrSet.Builder().remove(1)); - Assert.assertArrayEquals(new Integer[] { 2 }, i.merge(j).merge(k).toArray()); - Assert.assertArrayEquals(new Integer[] { 2 }, j.merge(i).merge(k).toArray()); - Assert.assertArrayEquals(new Integer[] { 2 }, k.merge(i).merge(j).toArray()); - Assert.assertArrayEquals(new Integer[] { 2 }, k.merge(j).merge(i).toArray()); - Assert.assertEquals(j , i.merge(j.merge(k))); + OrSet k = new OrSet<>(i.merge(j), new OrSet.Builder().remove(1)); + Assert.assertArrayEquals(new Integer[]{2}, i.merge(j).merge(k).toArray()); + Assert.assertArrayEquals(new Integer[]{2}, j.merge(i).merge(k).toArray()); + Assert.assertArrayEquals(new Integer[]{2}, k.merge(i).merge(j).toArray()); + Assert.assertArrayEquals(new Integer[]{2}, k.merge(j).merge(i).toArray()); + Assert.assertEquals(j, i.merge(j.merge(k))); } - + @Test public void mergeTest9(){ OrSet i = new OrSet<>(19); OrSet j = i.merge(i); Assert.assertEquals(i.value(), j.value()); } - + @Test - public void mergeTestSame() { + public void mergeTestSame(){ OrSet i = new OrSet<>(19); OrSet j = new OrSet<>(19); OrSet k = i.merge(j); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/17694495/gossip-itest/src/test/java/org/apache/gossip/DataTest.java ---------------------------------------------------------------------- diff --git a/gossip-itest/src/test/java/org/apache/gossip/DataTest.java b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java index e91426c..df078aa 100644 --- a/gossip-itest/src/test/java/org/apache/gossip/DataTest.java +++ b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java @@ -17,69 +17,81 @@ */ package org.apache.gossip; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - +import io.teknek.tunit.TUnit; +import org.apache.gossip.crdt.CrdtAddRemoveSet; import org.apache.gossip.crdt.GrowOnlyCounter; import org.apache.gossip.crdt.GrowOnlySet; -import org.apache.gossip.crdt.LWWSet; +import org.apache.gossip.crdt.LwwSet; +import org.apache.gossip.crdt.MaxChangeSet; import org.apache.gossip.crdt.OrSet; import org.apache.gossip.crdt.PNCounter; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.GossipManagerBuilder; import org.apache.gossip.model.PerNodeDataMessage; import org.apache.gossip.model.SharedDataMessage; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; -import io.teknek.tunit.TUnit; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; -public class DataTest extends AbstractIntegrationBase { +public class DataTest { + private final String gCounterKey = "crdtgc"; + private final String pnCounterKey = "crdtpn"; - private String orSetKey = "cror"; - private String lwwSetKey = "crlww"; - private String gCounterKey = "crdtgc"; - private String pnCounterKey = "crdtpn"; + private static final List clients = new ArrayList<>(); + + @BeforeClass + public static void initializeMembers() throws InterruptedException, UnknownHostException, URISyntaxException{ + final int clusterMembers = 2; - @Test - public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException { GossipSettings settings = new GossipSettings(); settings.setPersistRingState(false); settings.setPersistDataState(false); String cluster = UUID.randomUUID().toString(); - int seedNodes = 1; List startupMembers = new ArrayList<>(); - for (int i = 1; i < seedNodes + 1; ++i) { - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); - startupMembers.add(new RemoteMember(cluster, uri, i + "")); + for (int i = 0; i < clusterMembers; ++i){ + int id = i + 1; + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + id)); + startupMembers.add(new RemoteMember(cluster, uri, id + "")); } - final List clients = new ArrayList<>(); - final int clusterMembers = 2; - for (int i = 1; i < clusterMembers + 1; ++i) { - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); - GossipManager gossipService = GossipManagerBuilder - .newBuilder() - .cluster(cluster).uri(uri) - .id(i + "") - .gossipMembers(startupMembers) - .gossipSettings(settings).build(); + + for (Member member : startupMembers){ + GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(member.getUri()) + .id(member.getId()).gossipMembers(startupMembers).gossipSettings(settings).build(); clients.add(gossipService); gossipService.init(); - register(gossipService); } + } + + @AfterClass + public static void shutdownMembers(){ + for (final GossipManager client : clients){ + client.shutdown(); + } + } + + @Test + public void simpleDataTest(){ TUnit.assertThat(() -> { int total = 0; - for (int i = 0; i < clusterMembers; ++i) { - total += clients.get(i).getLiveMembers().size(); + for (GossipManager client : clients){ + total += client.getLiveMembers().size(); } return total; - }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); + }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(2); + clients.get(0).gossipPerNodeData(generatePerNodeMsg("a", "b")); clients.get(0).gossipSharedData(generateSharedMsg("a", "c")); @@ -89,7 +101,7 @@ public class DataTest extends AbstractIntegrationBase { return ""; else return x.getPayload(); - }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b"); + }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("b"); TUnit.assertThat(() -> { SharedDataMessage x = clients.get(1).findSharedGossipData("a"); @@ -97,175 +109,118 @@ public class DataTest extends AbstractIntegrationBase { return ""; else return x.getPayload(); - }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c"); - - givenDifferentDatumsInSet(clients); - assertThatListIsMerged(clients); - - testOrSet(clients); - testLWWSet(clients); - - testGrowOnlyCounter(clients); - testPNCounter(clients); + }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("c"); + } - for (int i = 0; i < clusterMembers; ++i) { - clients.get(i).shutdown(); - } + Set setFromList(String... elements){ + return new HashSet<>(Arrays.asList(elements)); } - private void testOrSet(final List clients) { - // populate - clients.get(0).merge(generateSharedMsg(orSetKey, new OrSet<>("1", "2"))); - clients.get(1).merge(generateSharedMsg(orSetKey, new OrSet<>("3", "4"))); + void crdtSetTest(String key, Function, CrdtAddRemoveSet, ?>> construct){ + //populate + clients.get(0).merge(generateSharedMsg(key, construct.apply(setFromList("1", "2")))); + clients.get(1).merge(generateSharedMsg(key, construct.apply(setFromList("3", "4")))); - // assert merge - assertMerged(clients.get(0), orSetKey, new OrSet<>("1", "2", "3", "4").value()); - assertMerged(clients.get(1), orSetKey, new OrSet<>("1", "2", "3", "4").value()); + assertMergedCrdt(key, construct.apply(setFromList("1", "2", "3", "4")).value()); - // drop element + //drop element @SuppressWarnings("unchecked") - OrSet o = (OrSet) clients.get(0).findCrdt(orSetKey); - OrSet o2 = new OrSet<>(o, new OrSet.Builder().remove("3")); - clients.get(0).merge(generateSharedMsg(orSetKey, o2)); + CrdtAddRemoveSet set = (CrdtAddRemoveSet) clients.get(0).findCrdt(key); + clients.get(0).merge(generateSharedMsg(key, set.remove("3"))); - // assert deletion - assertMerged(clients.get(0), orSetKey, new OrSet<>("1", "2", "4").value()); - assertMerged(clients.get(1), orSetKey, new OrSet<>("1", "2", "4").value()); + //assert deletion + assertMergedCrdt(key, construct.apply(setFromList("1", "2", "4")).value()); } - private void testLWWSet(final List clients) { - // populate - clients.get(0).merge(generateSharedMsg(lwwSetKey, new LWWSet<>("1", "2"))); - clients.get(1).merge(generateSharedMsg(lwwSetKey, new LWWSet<>("3", "4"))); - - // assert merge - assertMerged(clients.get(0), lwwSetKey, new LWWSet<>("1", "2", "3", "4").value()); - assertMerged(clients.get(1), lwwSetKey, new LWWSet<>("1", "2", "3", "4").value()); - - // drop element - @SuppressWarnings("unchecked") - LWWSet lww = (LWWSet) clients.get(0).findCrdt(lwwSetKey); - clients.get(0).merge(generateSharedMsg(lwwSetKey, lww.remove("3"))); - - // assert deletion - assertMerged(clients.get(0), lwwSetKey, new OrSet<>("1", "2", "4").value()); - assertMerged(clients.get(1), lwwSetKey, new OrSet<>("1", "2", "4").value()); + @Test + public void OrSetTest(){ + crdtSetTest("cror", OrSet::new); } - private void testGrowOnlyCounter(List clients) { - givenDifferentIncrement(clients); - assertThatCountIsUpdated(clients, 3); - givenIncreaseOther(clients); - assertThatCountIsUpdated(clients, 7); + @Test + public void LWWSetTest(){ + crdtSetTest("crlww", LwwSet::new); } - private void testPNCounter(List clients) { - givenPNCounter(clients); - assertThatPNCounterSettlesAt(clients, 0); - int[] delta1 = { 2, 3 }; - givenPNCounterUpdate(clients, delta1); - assertThatPNCounterSettlesAt(clients, 5); - int[] delta2 = { -3, 5 }; - givenPNCounterUpdate(clients, delta2); - assertThatPNCounterSettlesAt(clients, 7); - int[] delta3 = { 1, 1 }; - givenPNCounterUpdate(clients, delta3); - assertThatPNCounterSettlesAt(clients, 9); - int[] delta4 = { 1, -7 }; - givenPNCounterUpdate(clients, delta4); - assertThatPNCounterSettlesAt(clients, 3); + @Test + public void MaxChangeSetTest(){ + crdtSetTest("crmcs", MaxChangeSet::new); } - private void givenDifferentIncrement(final List clients) { + @Test + public void GrowOnlyCounterTest(){ + Consumer assertCountUpdated = count -> { + for (GossipManager client : clients){ + TUnit.assertThat(() -> client.findCrdt(gCounterKey)) + .afterWaitingAtMost(10, TimeUnit.SECONDS) + .isEqualTo(new GrowOnlyCounter(new GrowOnlyCounter.Builder(client).increment(count))); + } + }; + //generate different increment Object payload = new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1L)); clients.get(0).merge(generateSharedMsg(gCounterKey, payload)); payload = new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(1)).increment(2L)); clients.get(1).merge(generateSharedMsg(gCounterKey, payload)); - } - private void givenIncreaseOther(final List clients) { + assertCountUpdated.accept((long) 3); + + //update one GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(1).findCrdt(gCounterKey); GrowOnlyCounter gc2 = new GrowOnlyCounter(gc, - new GrowOnlyCounter.Builder(clients.get(1)).increment(4L)); - + new GrowOnlyCounter.Builder(clients.get(1)).increment(4L)); clients.get(1).merge(generateSharedMsg(gCounterKey, gc2)); - } - private void assertMerged(final GossipManager client, String key, final Set expected) { - TUnit.assertThat(() -> client.findCrdt(key).value()).afterWaitingAtMost(10, TimeUnit.SECONDS) - .isEqualTo(expected); + assertCountUpdated.accept((long) 7); } - private void givenDifferentDatumsInSet(final List clients) { - clients.get(0).merge(CrdtMessage("1")); - clients.get(1).merge(CrdtMessage("2")); - } + @Test + public void PNCounterTest(){ + Consumer> counterUpdate = list -> { + int clientIndex = 0; + for (int delta : list){ + PNCounter c = (PNCounter) clients.get(clientIndex).findCrdt(pnCounterKey); + c = new PNCounter(c, new PNCounter.Builder(clients.get(clientIndex)).increment(((long) delta))); + clients.get(clientIndex).merge(generateSharedMsg(pnCounterKey, c)); + clientIndex = (clientIndex + 1) % clients.size(); + } + }; - private void assertThatCountIsUpdated(final List clients, long finalCount) { - TUnit.assertThat(() -> clients.get(0).findCrdt(gCounterKey)) - .afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlyCounter( - new GrowOnlyCounter.Builder(clients.get(0)).increment(finalCount))); - } + // given PNCounter + clients.get(0).merge(generateSharedMsg(pnCounterKey, new PNCounter(new PNCounter.Builder(clients.get(0))))); + clients.get(1).merge(generateSharedMsg(pnCounterKey, new PNCounter(new PNCounter.Builder(clients.get(1))))); - private void assertThatListIsMerged(final List clients) { - TUnit.assertThat(() -> clients.get(0).findCrdt("cr")).afterWaitingAtMost(10, TimeUnit.SECONDS) - .isEqualTo(new GrowOnlySet<>(Arrays.asList("1", "2"))); - } - - private void givenPNCounter(List clients) { - { - SharedDataMessage d = new SharedDataMessage(); - d.setKey(pnCounterKey); - d.setPayload(new PNCounter(new PNCounter.Builder(clients.get(0)))); - d.setExpireAt(Long.MAX_VALUE); - d.setTimestamp(System.currentTimeMillis()); - clients.get(0).merge(d); - } - { - SharedDataMessage d = new SharedDataMessage(); - d.setKey(pnCounterKey); - d.setPayload(new PNCounter(new PNCounter.Builder(clients.get(1)))); - d.setExpireAt(Long.MAX_VALUE); - d.setTimestamp(System.currentTimeMillis()); - clients.get(1).merge(d); - } - } + assertMergedCrdt(pnCounterKey, (long) 0); + + List> updateLists = new ArrayList<>(); + updateLists.add(Arrays.asList(2, 3)); + updateLists.add(Arrays.asList(-3, 5)); + updateLists.add(Arrays.asList(1, 1)); + updateLists.add(Arrays.asList(1, -7)); + + Long[] expectedResults = {5L, 7L, 9L, 3L}; - private void givenPNCounterUpdate(List clients, int[] deltaArray) { - int clientIndex = 0; - for (int delta: deltaArray) { - PNCounter c = (PNCounter) clients.get(clientIndex).findCrdt(pnCounterKey); - c = new PNCounter(c, new PNCounter.Builder(clients.get(clientIndex)).increment(((long)delta))); - SharedDataMessage d = new SharedDataMessage(); - d.setKey(pnCounterKey); - d.setPayload(c); - d.setExpireAt(Long.MAX_VALUE); - d.setTimestamp(System.currentTimeMillis()); - clients.get(clientIndex).merge(d); - clientIndex = (clientIndex + 1) % clients.size(); + for (int i = 0; i < updateLists.size(); i++){ + counterUpdate.accept(updateLists.get(i)); + assertMergedCrdt(pnCounterKey, expectedResults[i]); } } - private void assertThatPNCounterSettlesAt(List clients, long expectedValue) { - for (GossipManager client: clients) { - TUnit.assertThat(() -> { - long value = 0; - Object o = client.findCrdt(pnCounterKey); - if (o != null) { - PNCounter c = (PNCounter)o; - value = c.value(); - } - return value; - }).afterWaitingAtMost(10, TimeUnit.SECONDS) - .isEqualTo(expectedValue); - } + @Test + public void GrowOnlySetTest(){ + clients.get(0).merge(generateSharedMsg("cr", new GrowOnlySet<>(Arrays.asList("1")))); + clients.get(1).merge(generateSharedMsg("cr", new GrowOnlySet<>(Arrays.asList("2")))); + + assertMergedCrdt("cr", new GrowOnlySet<>(Arrays.asList("1", "2")).value()); } - private SharedDataMessage CrdtMessage(String item) { - return generateSharedMsg("cr", new GrowOnlySet<>(Arrays.asList(item))); + private void assertMergedCrdt(String key, Object expected){ + for (GossipManager client : clients){ + TUnit.assertThat(() -> client.findCrdt(key).value()) + .afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(expected); + } } - private PerNodeDataMessage generatePerNodeMsg(String key, Object payload) { + private PerNodeDataMessage generatePerNodeMsg(String key, Object payload){ PerNodeDataMessage g = new PerNodeDataMessage(); g.setExpireAt(Long.MAX_VALUE); g.setKey(key); @@ -274,7 +229,7 @@ public class DataTest extends AbstractIntegrationBase { return g; } - private SharedDataMessage generateSharedMsg(String key, Object payload) { + private SharedDataMessage generateSharedMsg(String key, Object payload){ SharedDataMessage d = new SharedDataMessage(); d.setKey(key); d.setPayload(payload); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/17694495/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java ---------------------------------------------------------------------- diff --git a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java index 3c90ea1..d391fa1 100644 --- a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java +++ b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java @@ -22,7 +22,8 @@ import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.gossip.GossipSettings; import org.apache.gossip.Member; -import org.apache.gossip.crdt.LWWSet; +import org.apache.gossip.crdt.LwwSet; +import org.apache.gossip.crdt.MaxChangeSet; import org.apache.gossip.crdt.OrSet; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.GossipManagerBuilder; @@ -39,7 +40,7 @@ import java.util.List; import java.util.UUID; public class JacksonTest { - + private static GossipSettings simpleSettings(GossipSettings settings) { settings.setPersistRingState(false); settings.setPersistDataState(false); @@ -47,12 +48,12 @@ public class JacksonTest { settings.setProtocolManagerClass("org.apache.gossip.protocol.json.JacksonProtocolManager"); return settings; } - + private static GossipSettings withSigning(GossipSettings settings) { settings.setSignMessages(true); return settings; } - + // formerly of SignedMessageTest. @Test(expected = IllegalArgumentException.class) public void ifSignMustHaveKeys() @@ -70,11 +71,11 @@ public class JacksonTest { .build(); gossipService.init(); } - + @Test public void jacksonSerialTest() throws InterruptedException, URISyntaxException, IOException { ObjectMapper objectMapper = JacksonProtocolManager.buildObjectMapper(simpleSettings(new GossipSettings())); - + OrSet i = new OrSet(new OrSet.Builder().add(1).remove(1)); String s = objectMapper.writeValueAsString(i); @SuppressWarnings("unchecked") @@ -82,38 +83,50 @@ public class JacksonTest { Assert.assertEquals(back, i); } - @Test - public void jacksonCrdtLWWSetTest() { + void jacksonCrdtSeDeTest(Object value, Class cl){ ObjectMapper objectMapper = JacksonProtocolManager.buildObjectMapper(simpleSettings(new GossipSettings())); - LWWSet lww = new LWWSet<>("a", "b", "c"); - try { - String lwwS = objectMapper.writeValueAsString(lww); + String valueS = objectMapper.writeValueAsString(value); @SuppressWarnings("unchecked") - LWWSet parsedLww = objectMapper.readValue(lwwS, LWWSet.class); - Assert.assertEquals(lww, parsedLww); + Object parsedValue = objectMapper.readValue(valueS, cl); + Assert.assertEquals(value, parsedValue); } catch (Exception e) { - Assert.fail("LWWSet se/de error"); + Assert.fail("Jackson se/de error"); } } - + + @Test + public void jacksonOrSetTest(){ + jacksonCrdtSeDeTest(new OrSet<>("1", "2", "3"), OrSet.class); + } + + @Test + public void jacksonLWWSetTest(){ + jacksonCrdtSeDeTest(new LwwSet<>("1", "2", "3"), LwwSet.class); + } + + @Test + public void jacksonMaxChangeSetTest(){ + jacksonCrdtSeDeTest(new MaxChangeSet<>("1", "2", "3"), MaxChangeSet.class); + } + @Test public void testMessageEqualityAssumptions() { long timeA = System.nanoTime(); long timeB = System.nanoTime(); Assert.assertNotEquals(timeA, timeB); - + TestMessage messageA0 = new TestMessage(Long.toHexString(timeA)); TestMessage messageA1 = new TestMessage(Long.toHexString(timeA)); TestMessage messageB = new TestMessage(Long.toHexString(timeB)); - + Assert.assertEquals(messageA0, messageA1); Assert.assertFalse(messageA0 == messageA1); Assert.assertNotEquals(messageA0, messageB); Assert.assertNotEquals(messageA1, messageB); } - + // ideally, we would test the serializability of every message type, but we just want to make sure this works in // basic cases. @Test