gossip-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecapri...@apache.org
Subject incubator-gossip git commit: GOSSIP-80 Sundry cleanups * remove redundant parameter from method call. * remove uncessary threadpool. * Simplify `GossipCore.sendOneWay()` * Cleanup useage of `MessageInvoker` * `DefaultMessageInvoker` replaced by a facto
Date Tue, 18 Apr 2017 01:36:29 GMT
Repository: incubator-gossip
Updated Branches:
  refs/heads/master 298b1ae3a -> c544b8bf1


GOSSIP-80 Sundry cleanups
* remove redundant parameter from method call.
* remove uncessary threadpool.
* Simplify `GossipCore.sendOneWay()`
* Cleanup useage of `MessageInvoker`
  * `DefaultMessageInvoker` replaced by a factory
  * `MessageInvokerCombiner` replaced by same factory
  * Alter `MessageInvokerTest` to not rely on specific types
  * Remove type assertion from `GossipManagerBuilderTest`
* Merge `MessageInvoker` with `MessageHandler`
  * This required changing method signature return type from `void` to `boolean`.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/c544b8bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/c544b8bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/c544b8bf

Branch: refs/heads/master
Commit: c544b8bf167a099ec4cac6a94b059fa71ce8a8cc
Parents: 298b1ae
Author: Gary Dusbabek <gdusbabek@gmail.com>
Authored: Thu Apr 13 13:02:48 2017 -0500
Committer: Gary Dusbabek <gdusbabek@gmail.com>
Committed: Mon Apr 17 10:54:24 2017 -0500

----------------------------------------------------------------------
 .../org/apache/gossip/manager/GossipCore.java   |  42 +----
 .../apache/gossip/manager/GossipManager.java    |  12 +-
 .../gossip/manager/GossipManagerBuilder.java    |  16 +-
 .../handlers/ActiveGossipMessageHandler.java    |  12 +-
 .../manager/handlers/DefaultMessageInvoker.java |  40 ----
 .../gossip/manager/handlers/MessageHandler.java |   8 +-
 .../manager/handlers/MessageHandlerFactory.java |  58 ++++++
 .../gossip/manager/handlers/MessageInvoker.java |  33 ----
 .../handlers/MessageInvokerCombiner.java        |  48 -----
 .../handlers/PerNodeDataMessageHandler.java     |  10 +-
 .../manager/handlers/ResponseHandler.java       |  11 +-
 .../handlers/SharedDataMessageHandler.java      |  10 +-
 .../handlers/ShutdownMessageHandler.java        |  10 +-
 .../manager/handlers/SimpleMessageInvoker.java  |  45 -----
 .../manager/handlers/TypedMessageHandler.java   |  51 ++++++
 .../manager/GossipManagerBuilderTest.java       |  22 +--
 .../manager/handlers/MessageHandlerTest.java    | 182 +++++++++++++++++++
 .../manager/handlers/MessageInvokerTest.java    | 178 ------------------
 18 files changed, 374 insertions(+), 414 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
index f53419d..d01a84c 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -57,7 +57,6 @@ public class GossipCore implements GossipCoreConstants {
   public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
   private final GossipManager gossipManager;
   private ConcurrentHashMap<String, LatchAndBase> requests;
-  private ThreadPoolExecutor service;
   private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData;
   private final ConcurrentHashMap<String, SharedDataMessage> sharedData;
   private final BlockingQueue<Runnable> workQueue;
@@ -71,15 +70,12 @@ public class GossipCore implements GossipCoreConstants {
     this.gossipManager = manager;
     requests = new ConcurrentHashMap<>();
     workQueue = new ArrayBlockingQueue<>(1024);
-    service = new ThreadPoolExecutor(1, 5, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
     perNodeData = new ConcurrentHashMap<>();
     sharedData = new ConcurrentHashMap<>();
     metrics.register(WORKQUEUE_SIZE, (Gauge<Integer>)() -> workQueue.size());
     metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size());
     metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() ->  sharedData.size());
     metrics.register(REQUEST_SIZE, (Gauge<Integer>)() ->  requests.size());
-    metrics.register(THREADPOOL_ACTIVE, (Gauge<Integer>)() ->  service.getActiveCount());
-    metrics.register(THREADPOOL_SIZE, (Gauge<Integer>)() ->  service.getPoolSize());
     messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION);
     tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION);
     tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS);
@@ -178,17 +174,10 @@ public class GossipCore implements GossipCoreConstants {
   }
 
   public void shutdown(){
-    service.shutdown();
-    try {
-      service.awaitTermination(1, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      LOGGER.warn(e);
-    }
-    service.shutdownNow();
   }
 
   public void receive(Base base) {
-    if (!gossipManager.getMessageInvoker().invoke(this, gossipManager, base)) {
+    if (!gossipManager.getMessageHandler().invoke(this, gossipManager, base)) {
       LOGGER.warn("received message can not be handled");
     }
   }
@@ -268,29 +257,10 @@ public class GossipCore implements GossipCoreConstants {
    * @param message the message to send
    * @param u the uri to send it to
    */
-  public void sendOneWay(Base message, URI u){
-    byte[] json_bytes;
+  public void sendOneWay(Base message, URI u) {
     try {
-      if (privKey == null){
-        json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message);
-      } else {
-        SignedPayload p = new SignedPayload();
-        p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes());
-        p.setSignature(sign(p.getData()));
-        json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p);
-      }
-    } catch (IOException e) {
-      messageSerdeException.mark();
-      throw new RuntimeException(e);
-    }
-    try (DatagramSocket socket = new DatagramSocket()) {
-      socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
-      InetAddress dest = InetAddress.getByName(u.getHost());
-      DatagramPacket datagramPacket = new DatagramPacket(json_bytes, json_bytes.length, dest, u.getPort());
-      socket.send(datagramPacket);
-      tranmissionSuccess.mark();
-    } catch (IOException ex) {
-      tranmissionException.mark();
+      sendInternal(message, u);
+    } catch (RuntimeException ex) {
       LOGGER.debug("Send one way failed", ex);
     }
   }
@@ -304,13 +274,11 @@ public class GossipCore implements GossipCoreConstants {
   /**
    * Merge lists from remote members and update heartbeats
    *
-   * @param gossipManager
    * @param senderMember
    * @param remoteList
    *
    */
-  public void mergeLists(GossipManager gossipManager, RemoteMember senderMember,
-          List<Member> remoteList) {
+  public void mergeLists(RemoteMember senderMember, List<Member> remoteList) {
     if (LOGGER.isDebugEnabled()){
       debugState(senderMember, remoteList);
     }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
index c2b50ae..ff70ccc 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -25,7 +25,7 @@ import org.apache.gossip.Member;
 import org.apache.gossip.crdt.Crdt;
 import org.apache.gossip.event.GossipListener;
 import org.apache.gossip.event.GossipState;
-import org.apache.gossip.manager.handlers.MessageInvoker;
+import org.apache.gossip.manager.handlers.MessageHandler;
 import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
 import org.apache.gossip.model.PerNodeDataMessage;
 import org.apache.gossip.model.SharedDataMessage;
@@ -64,14 +64,14 @@ public abstract class GossipManager {
   private final GossipMemberStateRefresher memberStateRefresher;
   private final ObjectMapper objectMapper;
 
-  private final MessageInvoker messageInvoker;
+  private final MessageHandler messageHandler;
 
   public GossipManager(String cluster,
                        URI uri, String id, Map<String, String> properties, GossipSettings settings,
                        List<Member> gossipMembers, GossipListener listener, MetricRegistry registry,
-                       ObjectMapper objectMapper, MessageInvoker messageInvoker) {
+                       ObjectMapper objectMapper, MessageHandler messageHandler) {
     this.settings = settings;
-    this.messageInvoker = messageInvoker;
+    this.messageHandler = messageHandler;
 
     clock = new SystemClock();
     me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties,
@@ -101,8 +101,8 @@ public abstract class GossipManager {
     readSavedDataState();
   }
 
-  public MessageInvoker getMessageInvoker() {
-    return messageInvoker;
+  public MessageHandler getMessageHandler() {
+    return messageHandler;
   }
 
   public ConcurrentSkipListMap<LocalMember, GossipState> getMembers() {

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
index b87045b..bb73177 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
@@ -25,8 +25,8 @@ import org.apache.gossip.GossipSettings;
 import org.apache.gossip.StartupSettings;
 import org.apache.gossip.crdt.CrdtModule;
 import org.apache.gossip.event.GossipListener;
-import org.apache.gossip.manager.handlers.DefaultMessageInvoker;
-import org.apache.gossip.manager.handlers.MessageInvoker;
+import org.apache.gossip.manager.handlers.MessageHandler;
+import org.apache.gossip.manager.handlers.MessageHandlerFactory;
 
 import java.net.URI;
 import java.util.ArrayList;
@@ -50,7 +50,7 @@ public class GossipManagerBuilder {
     private MetricRegistry registry;
     private Map<String,String> properties;
     private ObjectMapper objectMapper;
-    private MessageInvoker messageInvoker;
+    private MessageHandler messageHandler;
 
     private ManagerBuilder() {}
 
@@ -114,8 +114,8 @@ public class GossipManagerBuilder {
       return this;
     }
 
-    public ManagerBuilder messageInvoker(MessageInvoker messageInvoker) {
-      this.messageInvoker = messageInvoker;
+    public ManagerBuilder messageHandler(MessageHandler messageHandler) {
+      this.messageHandler = messageHandler;
       return this;
     }
 
@@ -142,10 +142,10 @@ public class GossipManagerBuilder {
         objectMapper.registerModule(new CrdtModule());
         objectMapper.configure(Feature.WRITE_NUMBERS_AS_STRINGS, false);
       }
-      if (messageInvoker == null) {
-        messageInvoker = new DefaultMessageInvoker();
+      if (messageHandler == null) {
+        messageHandler = MessageHandlerFactory.defaultHandler();
       } 
-      return new GossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageInvoker) {} ;
+      return new GossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageHandler) {} ;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java
index f5e568e..e89179b 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java
@@ -32,8 +32,15 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class ActiveGossipMessageHandler implements MessageHandler {
+  
+  /**
+   * @param gossipCore context.
+   * @param gossipManager context.
+   * @param base message reference.
+   * @return boolean indicating success.
+   */
   @Override
-  public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+  public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
     List<Member> remoteGossipMembers = new ArrayList<>();
     RemoteMember senderMember = null;
     UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base;
@@ -69,6 +76,7 @@ public class ActiveGossipMessageHandler implements MessageHandler {
     o.setUriFrom(activeGossipMessage.getUriFrom());
     o.setUuid(activeGossipMessage.getUuid());
     gossipCore.sendOneWay(o, senderMember.getUri());
-    gossipCore.mergeLists(gossipManager, senderMember, remoteGossipMembers);
+    gossipCore.mergeLists(senderMember, remoteGossipMembers);
+    return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java
deleted file mode 100644
index 5b78ce3..0000000
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gossip.manager.handlers;
-
-import org.apache.gossip.manager.GossipCore;
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.model.*;
-
-public class DefaultMessageInvoker implements MessageInvoker {
-  private final MessageInvokerCombiner mic;
-
-  public DefaultMessageInvoker() {
-    mic = new MessageInvokerCombiner();
-    mic.add(new SimpleMessageInvoker(Response.class, new ResponseHandler()));
-    mic.add(new SimpleMessageInvoker(ShutdownMessage.class, new ShutdownMessageHandler()));
-    mic.add(new SimpleMessageInvoker(PerNodeDataMessage.class, new PerNodeDataMessageHandler()));
-    mic.add(new SimpleMessageInvoker(SharedDataMessage.class, new SharedDataMessageHandler()));
-    mic.add(new SimpleMessageInvoker(ActiveGossipMessage.class, new ActiveGossipMessageHandler()));
-  }
-
-  public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
-    return mic.invoke(gossipCore, gossipManager, base);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java
index 4b5d49d..5af9b14 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java
@@ -22,5 +22,11 @@ import org.apache.gossip.manager.GossipManager;
 import org.apache.gossip.model.Base;
 
 public interface MessageHandler {
-  void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base);
+  /**
+   * @param gossipCore context.
+   * @param gossipManager context.
+   * @param base message reference.
+   * @return boolean indicating success.
+   */
+  boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java
new file mode 100644
index 0000000..fff9430
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gossip.manager.handlers;
+
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.ActiveGossipMessage;
+import org.apache.gossip.model.Base;
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.apache.gossip.model.Response;
+import org.apache.gossip.model.SharedDataMessage;
+import org.apache.gossip.model.ShutdownMessage;
+
+import java.util.Arrays;
+
+public class MessageHandlerFactory {
+  
+  public static MessageHandler defaultHandler() {
+    return concurrentHandler(
+        new TypedMessageHandler(Response.class, new ResponseHandler()),
+        new TypedMessageHandler(ShutdownMessage.class, new ShutdownMessageHandler()),
+        new TypedMessageHandler(PerNodeDataMessage.class, new PerNodeDataMessageHandler()),
+        new TypedMessageHandler(SharedDataMessage.class, new SharedDataMessageHandler()),
+        new TypedMessageHandler(ActiveGossipMessage.class, new ActiveGossipMessageHandler())
+    );
+  }
+  
+  public static MessageHandler concurrentHandler(MessageHandler... handlers) {
+    if (handlers == null) throw new NullPointerException("handlers cannot be null");
+    if (Arrays.asList(handlers).stream().filter(i -> i != null).count() != handlers.length) {
+      throw new NullPointerException("found at least one null handler");
+    }
+    return new MessageHandler() {
+      @Override
+      public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+        // return true if at least one of the component handlers return true.
+        return Arrays.asList(handlers).stream().filter((mi) -> mi.invoke(gossipCore, gossipManager, base)).count() > 0;
+      }
+    };
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvoker.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvoker.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvoker.java
deleted file mode 100644
index 70be408..0000000
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvoker.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager.handlers;
-
-import org.apache.gossip.manager.GossipCore;
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.model.Base;
-
-public interface MessageInvoker {
-  /**
-   * 
-   * @param gossipCore
-   * @param gossipManager
-   * @param base
-   * @return true if the invoker processed the message type
-   */
-  boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base);
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java
deleted file mode 100644
index 5faf6a5..0000000
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gossip.manager.handlers;
-
-import org.apache.gossip.manager.GossipCore;
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.model.Base;
-
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-public class MessageInvokerCombiner implements MessageInvoker {
-  private final List<MessageInvoker> invokers = new CopyOnWriteArrayList<>();
-
-  public MessageInvokerCombiner() {
-  }
-
-  public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
-    return invokers.stream().filter((mi) -> mi.invoke(gossipCore, gossipManager, base)).count() > 0;
-  }
-
-  public void clear() {
-    invokers.clear();
-  }
-
-  public void add(MessageInvoker mi) {
-    if (mi == null) {
-      throw new NullPointerException();
-    }
-    invokers.add(mi);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java
index b3a785e..0ad0d91 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java
@@ -23,9 +23,17 @@ import org.apache.gossip.model.Base;
 import org.apache.gossip.udp.UdpPerNodeDataMessage;
 
 public class PerNodeDataMessageHandler implements MessageHandler {
+  
+  /**
+   * @param gossipCore context.
+   * @param gossipManager context.
+   * @param base message reference.
+   * @return boolean indicating success.
+   */
   @Override
-  public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+  public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
     UdpPerNodeDataMessage message = (UdpPerNodeDataMessage) base;
     gossipCore.addPerNodeData(message);
+    return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java
index 2f33b01..1070ff7 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java
@@ -23,11 +23,20 @@ import org.apache.gossip.model.Base;
 import org.apache.gossip.udp.Trackable;
 
 public class ResponseHandler implements MessageHandler {
+  
+  /**
+   * @param gossipCore context.
+   * @param gossipManager context.
+   * @param base message reference.
+   * @return boolean indicating success.
+   */
   @Override
-  public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+  public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
     if (base instanceof Trackable) {
       Trackable t = (Trackable) base;
       gossipCore.handleResponse(t.getUuid() + "/" + t.getUriFrom(), (Base) t);
+      return true;
     }
+    return false;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java
index 89ca4a0..3fe3033 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java
@@ -23,9 +23,17 @@ import org.apache.gossip.model.Base;
 import org.apache.gossip.udp.UdpSharedDataMessage;
 
 public class SharedDataMessageHandler implements MessageHandler{
+  
+  /**
+   * @param gossipCore context.
+   * @param gossipManager context.
+   * @param base message reference.
+   * @return boolean indicating success.
+   */
   @Override
-  public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+  public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
     UdpSharedDataMessage message = (UdpSharedDataMessage) base;
     gossipCore.addSharedData(message);
+    return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java
index a40c7a1..40e4c07 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java
@@ -24,8 +24,15 @@ import org.apache.gossip.model.PerNodeDataMessage;
 import org.apache.gossip.model.ShutdownMessage;
 
 public class ShutdownMessageHandler implements MessageHandler {
+  
+  /**
+   * @param gossipCore context.
+   * @param gossipManager context.
+   * @param base message reference.
+   * @return boolean indicating success.
+   */
   @Override
-  public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+  public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
     ShutdownMessage s = (ShutdownMessage) base;
     PerNodeDataMessage m = new PerNodeDataMessage();
     m.setKey(ShutdownMessage.PER_NODE_KEY);
@@ -34,5 +41,6 @@ public class ShutdownMessageHandler implements MessageHandler {
     m.setTimestamp(System.currentTimeMillis());
     m.setExpireAt(System.currentTimeMillis() + 30L * 1000L);
     gossipCore.addPerNodeData(m);
+    return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SimpleMessageInvoker.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SimpleMessageInvoker.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SimpleMessageInvoker.java
deleted file mode 100644
index 0f410d2..0000000
--- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SimpleMessageInvoker.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager.handlers;
-
-import org.apache.gossip.manager.GossipCore;
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.model.Base;
-
-public class SimpleMessageInvoker implements MessageInvoker {
-  final private Class<?> messageClass;
-  final private MessageHandler messageHandler;
-
-  public SimpleMessageInvoker(Class<?> messageClass, MessageHandler messageHandler) {
-    if (messageClass == null || messageHandler == null) {
-      throw new NullPointerException();
-    }
-    this.messageClass = messageClass;
-    this.messageHandler = messageHandler;
-  }
-
-  @Override
-  public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
-    if (messageClass.isAssignableFrom(base.getClass())) {
-      messageHandler.invoke(gossipCore, gossipManager, base);
-      return true;
-    } else {
-      return false;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/main/java/org/apache/gossip/manager/handlers/TypedMessageHandler.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/TypedMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/TypedMessageHandler.java
new file mode 100644
index 0000000..b40461d
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/TypedMessageHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.handlers;
+
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.Base;
+
+public class TypedMessageHandler implements MessageHandler {
+  final private Class<?> messageClass;
+  final private MessageHandler messageHandler;
+
+  public TypedMessageHandler(Class<?> messageClass, MessageHandler messageHandler) {
+    if (messageClass == null || messageHandler == null) {
+      throw new NullPointerException();
+    }
+    this.messageClass = messageClass;
+    this.messageHandler = messageHandler;
+  }
+
+  /**
+   * @param gossipCore context.
+   * @param gossipManager context.
+   * @param base message reference.
+   * @return true if types match, false otherwise.
+   */
+  @Override
+  public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+    if (messageClass.isAssignableFrom(base.getClass())) {
+      messageHandler.invoke(gossipCore, gossipManager, base);
+      return true;
+    } else {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java
index 8842643..959f818 100644
--- a/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java
+++ b/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java
@@ -21,10 +21,9 @@ import com.codahale.metrics.MetricRegistry;
 import org.apache.gossip.Member;
 import org.apache.gossip.GossipSettings;
 import org.apache.gossip.LocalMember;
-import org.apache.gossip.manager.handlers.DefaultMessageInvoker;
-import org.apache.gossip.manager.handlers.MessageInvoker;
+import org.apache.gossip.manager.handlers.MessageHandler;
 import org.apache.gossip.manager.handlers.ResponseHandler;
-import org.apache.gossip.manager.handlers.SimpleMessageInvoker;
+import org.apache.gossip.manager.handlers.TypedMessageHandler;
 import org.junit.Assert;
 import org.junit.jupiter.api.Test;
 import org.junit.platform.runner.JUnitPlatform;
@@ -77,28 +76,27 @@ public class GossipManagerBuilderTest {
   }
 
   @Test
-  public void createDefaultMessageInvokerIfNull() throws URISyntaxException {
+  public void createDefaultMessageHandlerIfNull() throws URISyntaxException {
     GossipManager gossipManager = GossipManagerBuilder.newBuilder()
         .id("id")
         .cluster("aCluster")
         .uri(new URI("udp://localhost:2000"))
         .gossipSettings(new GossipSettings())
-        .messageInvoker(null).registry(new MetricRegistry()).build();
-    assertNotNull(gossipManager.getMessageInvoker());
-    Assert.assertEquals(gossipManager.getMessageInvoker().getClass(), new DefaultMessageInvoker().getClass());
+        .messageHandler(null).registry(new MetricRegistry()).build();
+    assertNotNull(gossipManager.getMessageHandler());
   }
 
   @Test
-  public void testMessageInvokerKeeping() throws URISyntaxException {
-    MessageInvoker mi = new SimpleMessageInvoker(Response.class, new ResponseHandler());
+  public void testMessageHandlerKeeping() throws URISyntaxException {
+    MessageHandler mi = new TypedMessageHandler(Response.class, new ResponseHandler());
     GossipManager gossipManager = GossipManagerBuilder.newBuilder()
         .id("id")
         .cluster("aCluster")
         .uri(new URI("udp://localhost:2000"))
         .gossipSettings(new GossipSettings())
-        .messageInvoker(mi).registry(new MetricRegistry()).build();
-    assertNotNull(gossipManager.getMessageInvoker());
-    Assert.assertEquals(gossipManager.getMessageInvoker(), mi);
+        .messageHandler(mi).registry(new MetricRegistry()).build();
+    assertNotNull(gossipManager.getMessageHandler());
+    Assert.assertEquals(gossipManager.getMessageHandler(), mi);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java
new file mode 100644
index 0000000..c035d21
--- /dev/null
+++ b/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager.handlers;
+
+import org.apache.gossip.manager.GossipCore;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.model.ActiveGossipMessage;
+import org.apache.gossip.model.Base;
+import org.apache.gossip.udp.UdpSharedDataMessage;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MessageHandlerTest {
+  private class FakeMessage extends Base {
+    public FakeMessage() {
+    }
+  }
+
+  private class FakeMessageData extends Base {
+    public int data;
+
+    public FakeMessageData(int data) {
+      this.data = data;
+    }
+  }
+
+  private class FakeMessageDataHandler implements MessageHandler {
+    public int data;
+
+    public FakeMessageDataHandler() {
+      data = 0;
+    }
+
+    public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+      data = ((FakeMessageData) base).data;
+      return true;
+    }
+  }
+
+  private class FakeMessageHandler implements MessageHandler {
+    public int counter;
+
+    public FakeMessageHandler() {
+      counter = 0;
+    }
+
+    public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
+      counter++;
+      return true;
+    }
+  }
+
+  @Test
+  public void testSimpleHandler() {
+    MessageHandler mi = new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler());
+    Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
+    Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage()));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testSimpleHandlerNullClassConstructor() {
+    new TypedMessageHandler(null, new FakeMessageHandler());
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testSimpleHandlerNullHandlerConstructor() {
+    new TypedMessageHandler(FakeMessage.class, null);
+  }
+
+  @Test
+  public void testCallCountSimpleHandler() {
+    FakeMessageHandler h = new FakeMessageHandler();
+    MessageHandler mi = new TypedMessageHandler(FakeMessage.class, h);
+    mi.invoke(null, null, new FakeMessage());
+    Assert.assertEquals(1, h.counter);
+    mi.invoke(null, null, new ActiveGossipMessage());
+    Assert.assertEquals(1, h.counter);
+    mi.invoke(null, null, new FakeMessage());
+    Assert.assertEquals(2, h.counter);
+  }
+
+  @Test(expected = NullPointerException.class)
+  @SuppressWarnings("all")
+  public void cantAddNullHandler() {
+    MessageHandler handler = MessageHandlerFactory.concurrentHandler(null);
+  }
+  
+  @Test(expected = NullPointerException.class)
+  public void cantAddNullHandler2() {
+    MessageHandler handler = MessageHandlerFactory.concurrentHandler(
+        new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler()),
+        null,
+        new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler())
+    );
+  }
+
+  @Test
+  public void testMessageHandlerCombiner() {
+    //Empty combiner - false result
+    MessageHandler mi = MessageHandlerFactory.concurrentHandler();
+    Assert.assertFalse(mi.invoke(null, null, new Base()));
+
+    FakeMessageHandler h = new FakeMessageHandler();
+    mi = MessageHandlerFactory.concurrentHandler(
+      new TypedMessageHandler(FakeMessage.class, h),
+      new TypedMessageHandler(FakeMessage.class, h)
+    );
+
+    Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
+    Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage()));
+    Assert.assertEquals(2, h.counter);
+    
+    //Increase size in runtime. Should be 3 calls: 2+3 = 5
+    mi = MessageHandlerFactory.concurrentHandler(mi, new TypedMessageHandler(FakeMessage.class, h));
+    Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
+    Assert.assertEquals(5, h.counter);
+  }
+
+  @Test
+  public void testMessageHandlerCombiner2levels() {
+    FakeMessageHandler h = new FakeMessageHandler();
+
+    MessageHandler mi1 = MessageHandlerFactory.concurrentHandler(
+      new TypedMessageHandler(FakeMessage.class, h),
+      new TypedMessageHandler(FakeMessage.class, h)
+    );
+
+    MessageHandler mi2 = MessageHandlerFactory.concurrentHandler(
+      new TypedMessageHandler(FakeMessage.class, h),
+      new TypedMessageHandler(FakeMessage.class, h)
+    );
+
+    MessageHandler mi = MessageHandlerFactory.concurrentHandler(mi1, mi2);
+    
+    Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
+    Assert.assertEquals(4, h.counter);
+  }
+
+  @Test
+  public void testMessageHandlerCombinerDataShipping() {
+    MessageHandler mi = MessageHandlerFactory.concurrentHandler();
+    FakeMessageDataHandler h = new FakeMessageDataHandler();
+    mi = MessageHandlerFactory.concurrentHandler(mi, new TypedMessageHandler(FakeMessageData.class, h));
+
+    Assert.assertTrue(mi.invoke(null, null, new FakeMessageData(101)));
+    Assert.assertEquals(101, h.data);
+  }
+
+  @Test
+  public void testCombiningDefaultHandler() {
+    MessageHandler mi = MessageHandlerFactory.concurrentHandler(
+      MessageHandlerFactory.defaultHandler(),
+      new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler())
+    );
+    //UdpSharedGossipDataMessage with null gossipCore -> exception
+    boolean thrown = false;
+    try {
+      mi.invoke(null, null, new UdpSharedDataMessage());
+    } catch (NullPointerException e) {
+      thrown = true;
+    }
+    Assert.assertTrue(thrown);
+    //skips FakeMessage and FakeHandler works ok
+    Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c544b8bf/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java
deleted file mode 100644
index 571d7ba..0000000
--- a/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager.handlers;
-
-import org.apache.gossip.manager.GossipCore;
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.model.ActiveGossipMessage;
-import org.apache.gossip.model.Base;
-import org.apache.gossip.udp.UdpSharedDataMessage;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class MessageInvokerTest {
-  private class FakeMessage extends Base {
-    public FakeMessage() {
-    }
-  }
-
-  private class FakeMessageData extends Base {
-    public int data;
-
-    public FakeMessageData(int data) {
-      this.data = data;
-    }
-  }
-
-  private class FakeMessageDataHandler implements MessageHandler {
-    public int data;
-
-    public FakeMessageDataHandler() {
-      data = 0;
-    }
-
-    public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
-      data = ((FakeMessageData) base).data;
-    }
-  }
-
-  private class FakeMessageHandler implements MessageHandler {
-    public int counter;
-
-    public FakeMessageHandler() {
-      counter = 0;
-    }
-
-    public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
-      counter++;
-    }
-  }
-
-  @Test
-  public void testSimpleInvoker() {
-    MessageInvoker mi = new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler());
-    Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
-    Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage()));
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testSimpleInvokerNullClassConstructor() {
-    new SimpleMessageInvoker(null, new FakeMessageHandler());
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testSimpleInvokerNullHandlerConstructor() {
-    new SimpleMessageInvoker(FakeMessage.class, null);
-  }
-
-  @Test
-  public void testCallCountSimpleInvoker() {
-    FakeMessageHandler h = new FakeMessageHandler();
-    MessageInvoker mi = new SimpleMessageInvoker(FakeMessage.class, h);
-    mi.invoke(null, null, new FakeMessage());
-    Assert.assertEquals(1, h.counter);
-    mi.invoke(null, null, new ActiveGossipMessage());
-    Assert.assertEquals(1, h.counter);
-    mi.invoke(null, null, new FakeMessage());
-    Assert.assertEquals(2, h.counter);
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void cantAddNullInvoker() {
-    MessageInvokerCombiner mi = new MessageInvokerCombiner();
-    mi.add(null);
-  }
-
-  @Test
-  public void testCombinerClear() {
-    MessageInvokerCombiner mi = new MessageInvokerCombiner();
-    mi.add(new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler()));
-    Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
-
-    mi.clear();
-    Assert.assertFalse(mi.invoke(null, null, new FakeMessage()));
-  }
-
-  @Test
-  public void testMessageInvokerCombiner() {
-    //Empty combiner - false result
-    MessageInvokerCombiner mi = new MessageInvokerCombiner();
-    Assert.assertFalse(mi.invoke(null, null, new Base()));
-
-    FakeMessageHandler h = new FakeMessageHandler();
-    mi.add(new SimpleMessageInvoker(FakeMessage.class, h));
-    mi.add(new SimpleMessageInvoker(FakeMessage.class, h));
-
-    Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
-    Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage()));
-    Assert.assertEquals(2, h.counter);
-
-    //Increase size in runtime. Should be 3 calls: 2+3 = 5
-    mi.add(new SimpleMessageInvoker(FakeMessage.class, h));
-    Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
-    Assert.assertEquals(5, h.counter);
-  }
-
-  @Test
-  public void testMessageInvokerCombiner2levels() {
-    MessageInvokerCombiner mi = new MessageInvokerCombiner();
-    FakeMessageHandler h = new FakeMessageHandler();
-
-    MessageInvokerCombiner mi1 = new MessageInvokerCombiner();
-    mi1.add(new SimpleMessageInvoker(FakeMessage.class, h));
-    mi1.add(new SimpleMessageInvoker(FakeMessage.class, h));
-
-    MessageInvokerCombiner mi2 = new MessageInvokerCombiner();
-    mi2.add(new SimpleMessageInvoker(FakeMessage.class, h));
-    mi2.add(new SimpleMessageInvoker(FakeMessage.class, h));
-
-    mi.add(mi1);
-    mi.add(mi2);
-
-    Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
-    Assert.assertEquals(4, h.counter);
-  }
-
-  @Test
-  public void testMessageInvokerCombinerDataShipping() {
-    MessageInvokerCombiner mi = new MessageInvokerCombiner();
-    FakeMessageDataHandler h = new FakeMessageDataHandler();
-    mi.add(new SimpleMessageInvoker(FakeMessageData.class, h));
-
-    Assert.assertTrue(mi.invoke(null, null, new FakeMessageData(101)));
-    Assert.assertEquals(101, h.data);
-  }
-
-  @Test
-  public void testCombiningDefaultInvoker() {
-    MessageInvokerCombiner mi = new MessageInvokerCombiner();
-    mi.add(new DefaultMessageInvoker());
-    mi.add(new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler()));
-    //UdpSharedGossipDataMessage with null gossipCore -> exception
-    boolean thrown = false;
-    try {
-      mi.invoke(null, null, new UdpSharedDataMessage());
-    } catch (NullPointerException e) {
-      thrown = true;
-    }
-    Assert.assertTrue(thrown);
-    //DefaultInvoker skips FakeMessage and FakeHandler works ok
-    Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
-  }
-
-}


Mime
View raw message