aries-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject [5/9] aries-rsa git commit: [ARIES-1774] Switch to watching all endpoints in zookeeper to be able to pass tck
Date Thu, 08 Feb 2018 16:41:05 GMT
[ARIES-1774] Switch to watching all endpoints in zookeeper to be able to pass tck


Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/ca922a42
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/ca922a42
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/ca922a42

Branch: refs/heads/master
Commit: ca922a42e0705150d39c6f6955d647f7b1c3ad39
Parents: f07ee8b
Author: Christian Schneider <cschneid@adobe.com>
Authored: Thu Feb 8 09:30:22 2018 +0100
Committer: Christian Schneider <cschneid@adobe.com>
Committed: Thu Feb 8 17:40:34 2018 +0100

----------------------------------------------------------------------
 .../discovery/zookeeper/ZooKeeperDiscovery.java |   3 +-
 .../repository/ZookeeperEndpointRepository.java | 162 ++++++-----
 .../subscribe/EndpointListenerTracker.java      |  16 +-
 .../zookeeper/subscribe/InterfaceMonitor.java   | 266 -------------------
 .../subscribe/InterfaceMonitorManager.java      | 213 ++++-----------
 .../ZookeeperEndpointRepositoryTest.java        |  25 +-
 .../subscribe/InterfaceMonitorManagerTest.java  |  71 ++---
 .../subscribe/InterfaceMonitorTest.java         |  71 -----
 8 files changed, 198 insertions(+), 629 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
index d265a22..50d9598 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
@@ -98,7 +98,8 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
         repository = new ZookeeperEndpointRepository(zkClient);
         endpointListener = new PublishingEndpointListener(repository);
         endpointListener.start(bctx);
-        imManager = new InterfaceMonitorManager(bctx, zkClient);
+        imManager = new InterfaceMonitorManager(repository);
+        repository.addListener(imManager);
         endpointListenerTracker = new EndpointListenerTracker(bctx, imManager);
         endpointListenerTracker.open();
         started = true;

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
index 2349c45..c5c03a4 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
@@ -15,6 +15,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
+import org.apache.aries.rsa.discovery.zookeeper.subscribe.InterfaceMonitorManager;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
@@ -53,33 +54,11 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher {
         } catch (Exception e) {
             throw new IllegalStateException("Unable to create base path");
         }
-        // Not yet needed
-        //this.registerWatcher();
+        this.registerWatcher();
     }
 
-    private void registerWatcher() {
-        try {
-            List<String> children = zk.getChildren(ZookeeperEndpointRepository.PATH_PREFIX, this);
-            System.out.println(children);
-        } catch (KeeperException e) {
-            e.printStackTrace();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-    }
-    
-    protected void notifyListener(WatchedEvent wevent) {
-        EndpointDescription ep = read(wevent.getPath());
-        if (ep != null) {
-            int type = getEndpointEventType(wevent);
-            EndpointEvent event = new EndpointEvent(type, ep);
-            listener.endpointChanged(event, null);
-        }
-    }
-    
-    private int getEndpointEventType(WatchedEvent wevent) {
-        EventType type = wevent.getType();
-        return EndpointEvent.ADDED;
+    public void addListener(EndpointEventListener listener) {
+        this.listener = listener;
     }
 
     /**
@@ -89,23 +68,7 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher {
      * @return endpoint found in the node or null if no endpoint was found
      */
     public EndpointDescription read(String path) {
-        try {
-            Stat stat = zk.exists(path, false);
-            if (stat == null || stat.getDataLength() <= 0) {
-                return null;
-            }
-            byte[] data = zk.getData(path, false, null);
-            LOG.debug("Got data for node: {}", path);
-
-            EndpointDescription endpoint = parser.readEndpoint(new ByteArrayInputStream(data));
-            if (endpoint != null) {
-                return endpoint;
-            }
-            LOG.warn("No Discovery information found for node: {}", path);
-        } catch (Exception e) {
-            LOG.error("Problem getting EndpointDescription from node " + path, e);
-        }
-        return null;
+        return nodes.get(path);
     }
 
     public void add(EndpointDescription endpoint) throws URISyntaxException, KeeperException,
@@ -113,6 +76,8 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher {
         Collection<String> interfaces = endpoint.getInterfaces();
         String endpointKey = getKey(endpoint);
     
+        createEphemeralNode(ZookeeperEndpointRepository.getZooKeeperPath("") + endpointKey, getData(endpoint));
+        
         LOG.info("Exporting endpoint to zookeeper: {}", endpoint);
         for (String name : interfaces) {
             String path = ZookeeperEndpointRepository.getZooKeeperPath(name);
@@ -152,8 +117,42 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher {
         }
     }
     
-    public List<EndpointDescription> getAll() throws KeeperException, InterruptedException {
-        return null;
+    public Collection<EndpointDescription> getAll() {
+        return nodes.values();
+    }
+
+    /**
+     * Removes nulls and empty strings from the given string array.
+     *
+     * @param strings an array of strings
+     * @return a new array containing the non-null and non-empty
+     *         elements of the original array in the same order
+     */
+    public static List<String> removeEmpty(List<String> strings) {
+        List<String> result = new ArrayList<String>();
+        if (strings == null) {
+            return result;
+        }
+        for (String s : strings) {
+            if (s != null && !s.isEmpty()) {
+                result.add(s);
+            }
+        }
+        return result;
+    }
+
+    public static String getZooKeeperPath(String name) {
+        return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + name.replace('.', '/');
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+        LOG.info("Received event {}", event);
+        if (event.getType() == EventType.NodeDeleted) {
+            handleRemoved(event.getPath());
+            return;
+        }
+        watchRecursive(event.getPath());
     }
 
     @Override
@@ -161,6 +160,28 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher {
 
     }
 
+    private void registerWatcher() {
+        try {
+            watchRecursive(ZookeeperEndpointRepository.PATH_PREFIX);
+        } catch (Exception e) {
+            LOG.info(e.getMessage(), e);
+        }
+    }
+
+    private void watchRecursive(String path) {
+        LOG.info("Watching {}", path);
+        handleZNodeChanged(path);
+        try {
+            List<String> children = zk.getChildren(path, this);
+            for (String child : children) {
+                String childPath = (path.endsWith("/") ? path : path + "/") + child;
+                watchRecursive(childPath);
+            }
+        } catch (Exception e) {
+            LOG.info(e.getMessage(), e);
+        }
+    }
+
     private byte[] getData(EndpointDescription epd) {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         parser.writeEndpoint(epd, bos);
@@ -201,39 +222,38 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher {
         }
     }
 
-    /**
-     * Removes nulls and empty strings from the given string array.
-     *
-     * @param strings an array of strings
-     * @return a new array containing the non-null and non-empty
-     *         elements of the original array in the same order
-     */
-    public static List<String> removeEmpty(List<String> strings) {
-        List<String> result = new ArrayList<String>();
-        if (strings == null) {
-            return result;
-        }
-        for (String s : strings) {
-            if (s != null && !s.isEmpty()) {
-                result.add(s);
-            }
-        }
-        return result;
-    }
-
-    public static String getZooKeeperPath(String name) {
-        return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + name.replace('.', '/');
-    }
-
     private static String getKey(EndpointDescription endpoint) throws URISyntaxException {
         URI uri = new URI(endpoint.getId());
         return new StringBuilder().append(uri.getHost()).append("#").append(uri.getPort())
             .append("#").append(uri.getPath().replace('/', '#')).toString();
     }
 
-    @Override
-    public void process(WatchedEvent event) {
-        
+    private void handleZNodeChanged(String path) {
+        try {
+            Stat stat = new Stat();
+            byte[] data = zk.getData(path, false, stat);
+            if (data == null || data.length == 0) {
+                return;
+            }
+            EndpointDescription endpoint = parser.readEndpoint(new ByteArrayInputStream(data));
+            if (endpoint != null) {
+               handleChanged(path, endpoint);
+            }
+        } catch (Exception e) {
+            LOG.info(e.getMessage(), e);
+        }
+    }
+
+    private void handleRemoved(String path) {
+        EndpointDescription endpoint = nodes.remove(path);
+        EndpointEvent event = new EndpointEvent(EndpointEvent.REMOVED, endpoint);
+        listener.endpointChanged(event, null);
+    }
+
+    private void handleChanged(String path, EndpointDescription endpoint) {
+        EndpointDescription old = nodes.put(path, endpoint);
+        EndpointEvent event = new EndpointEvent(old == null ? EndpointEvent.ADDED : EndpointEvent.MODIFIED, endpoint);
+        listener.endpointChanged(event, null);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
index 0ed1097..d4805d0 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
@@ -51,21 +51,23 @@ public class EndpointListenerTracker extends ServiceTracker {
     }
 
     @Override
-    public Object addingService(ServiceReference endpointListener) {
-        imManager.addInterest(endpointListener);
-        return null;
+    public Object addingService(ServiceReference sref) {
+        Object epListener = super.addingService(sref);
+        imManager.addInterest(sref, epListener);
+        return epListener;
     }
 
     @Override
-    public void modifiedService(ServiceReference endpointListener, Object service) {
+    public void modifiedService(ServiceReference sref, Object epListener) {
         // called when an EndpointListener updates its service properties,
         // e.g. when its interest scope is expanded/reduced
-        imManager.addInterest(endpointListener);
+        imManager.addInterest(sref, epListener);
     }
 
     @Override
-    public void removedService(ServiceReference endpointListener, Object service) {
-        imManager.removeInterest(endpointListener);
+    public void removedService(ServiceReference sref, Object epListener) {
+        imManager.removeInterest(sref);
+        super.removedService(sref, epListener);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
deleted file mode 100644
index 2c90b3c..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.subscribe;
-
-import java.io.ByteArrayInputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
-import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
-import org.apache.zookeeper.AsyncCallback.StatCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
-import org.osgi.service.remoteserviceadmin.EndpointEvent;
-import org.osgi.service.remoteserviceadmin.EndpointEventListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Monitors ZooKeeper for changes in published endpoints.
- * <p>
- * Specifically, it monitors the node path associated with a given interface class,
- * whose data is a serialized version of an EndpointDescription, and notifies an
- * EndpointListener when changes are detected (which can then propagate the
- * notification to other EndpointListeners with a matching scope).
- * <p>
- * Note that the EndpointListener is used here as a decoupling interface for
- * convenience, and is not necessarily used according to its documented contract.
- */
-public class InterfaceMonitor implements Watcher, StatCallback {
-
-    private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitor.class);
-
-    private final String znode;
-    private final ZooKeeper zk;
-    private final EndpointEventListener endpointListener;
-    private final boolean recursive;
-    private volatile boolean closed;
-
-    // This map reference changes, so don't synchronize on it
-    private Map<String, EndpointDescription> nodes = new HashMap<String, EndpointDescription>();
-
-    private EndpointDescriptionParser parser;
-
-    public InterfaceMonitor(ZooKeeper zk, String objClass, EndpointEventListener endpointListener, String scope) {
-        this.zk = zk;
-        this.znode = ZookeeperEndpointRepository.getZooKeeperPath(objClass);
-        this.recursive = objClass == null || objClass.isEmpty();
-        this.endpointListener = endpointListener;
-        this.parser = new EndpointDescriptionParser();
-        LOG.debug("Creating new InterfaceMonitor {} for scope [{}] and objectClass [{}]",
-                new Object[] {recursive ? "(recursive)" : "", scope, objClass});
-    }
-
-    /**
-     * Returns all endpoints that are currently known to this monitor.
-     *
-     * @return all endpoints that are currently known to this monitor
-     */
-    public synchronized List<EndpointDescription> getEndpoints() {
-        return new ArrayList<EndpointDescription>(nodes.values());
-    }
-
-    public void start() {
-        watch();
-    }
-
-    private void watch() {
-        LOG.debug("registering a ZooKeeper.exists({}) callback", znode);
-        zk.exists(znode, this, this, null);
-        zk.getData(znode, this, new DataCallback() {
-            
-            @Override
-            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
-                processDelta();
-            }
-        }, null);
-    }
-
-    /**
-     * Zookeeper Watcher interface callback.
-     */
-    public void process(WatchedEvent event) {
-        LOG.debug("ZooKeeper watcher callback on node {} for event {}", znode, event);
-        processDelta();
-    }
-
-    /**
-     * Zookeeper StatCallback interface callback.
-     */
-    @SuppressWarnings("deprecation")
-    public void processResult(int rc, String path, Object ctx, Stat stat) {
-        LOG.debug("ZooKeeper callback on node: {} code: {}", znode, rc);
-
-        switch (rc) {
-        case Code.Ok:
-        case Code.NoNode:
-            processDelta();
-            return;
-
-        case Code.SessionExpired:
-        case Code.NoAuth:
-        case Code.ConnectionLoss:
-            return;
-
-        default:
-            watch();
-        }
-    }
-
-    private void processDelta() {
-        if (closed) {
-            return;
-        }
-
-        if (zk.getState() != ZooKeeper.States.CONNECTED) {
-            LOG.debug("ZooKeeper connection was already closed! Not processing changed event.");
-            return;
-        }
-
-        try {
-            if (zk.exists(znode, false) != null) {
-                zk.getChildren(znode, this);
-                refreshNodes();
-            } else {
-                LOG.debug("znode {} doesn't exist -> not processing any changes", znode);
-            }
-        } catch (Exception e) {
-            if (zk.getState() != ZooKeeper.States.CONNECTED) {
-                LOG.debug("Error getting Zookeeper data: " + e); // e.g. session expired, handled by ZooKeeperDiscovery
-            } else {
-                LOG.error("Error getting ZooKeeper data.", e);
-            }
-        }
-    }
-
-    public synchronized void close() {
-        closed = true;
-        for (EndpointDescription endpoint : nodes.values()) {
-            EndpointEvent event = new EndpointEvent(EndpointEvent.REMOVED, endpoint);
-            endpointListener.endpointChanged(event, null);
-        }
-        nodes.clear();
-    }
-
-    private synchronized void refreshNodes() {
-        if (closed) {
-            return;
-        }
-        LOG.info("Processing change on node: {}", znode);
-
-        Map<String, EndpointDescription> newNodes = new HashMap<String, EndpointDescription>();
-        Map<String, EndpointDescription> prevNodes = new HashMap<String, EndpointDescription>(nodes);
-        processChildren(znode, newNodes, prevNodes);
-
-        // whatever is left in prevNodes now has been removed from Discovery
-        LOG.debug("processChildren done. Nodes that are missing now and need to be removed: {}", prevNodes.values());
-        for (EndpointDescription endpoint : prevNodes.values()) {
-            EndpointEvent event = new EndpointEvent(EndpointEvent.REMOVED, endpoint);
-            endpointListener.endpointChanged(event, null);
-        }
-        nodes = newNodes;
-    }
-
-    /**
-     * Iterates through all child nodes of the given node and tries to find
-     * endpoints. If the recursive flag is set it also traverses into the child
-     * nodes.
-     *
-     * @return true if an endpoint was found and if the node therefore needs to
-     *         be monitored for changes
-     */
-    private boolean processChildren(String zn, Map<String, EndpointDescription> newNodes,
-            Map<String, EndpointDescription> prevNodes) {
-        List<String> children;
-        try {
-            LOG.debug("Processing the children of {}", zn);
-            children = zk.getChildren(zn, false);
-
-            boolean foundANode = false;
-            for (String child : children) {
-                String childZNode = zn + '/' + child;
-                EndpointDescription endpoint = getEndpointDescriptionFromNode(childZNode);
-                if (endpoint != null) {
-                    EndpointDescription prevEndpoint = prevNodes.get(child);
-                    
-                    newNodes.put(child, endpoint);
-                    prevNodes.remove(child);
-                    foundANode = true;
-                    LOG.debug("Properties: {}", endpoint.getProperties());
-                    if (prevEndpoint == null) {
-                        // This guy is new
-                        LOG.info("found new node " + zn + "/[" + child + "]   ( []->child )  props: "
-                                + endpoint.getProperties().values());
-                        EndpointEvent event = new EndpointEvent(EndpointEvent.ADDED, endpoint);
-                        endpointListener.endpointChanged(event, null);
-                    } else if (!prevEndpoint.getProperties().equals(endpoint.getProperties())) {
-                        LOG.info("Found changed node " + zn + "/[" + child + "]   ( []->child )  props: "
-                                + endpoint.getProperties().values());
-                        EndpointEvent event = new EndpointEvent(EndpointEvent.MODIFIED, endpoint);
-                        endpointListener.endpointChanged(event, null);
-                    }
-                }
-                if (recursive && processChildren(childZNode, newNodes, prevNodes)) {
-                    zk.getChildren(childZNode, this);
-                }
-            }
-
-            return foundANode;
-        } catch (KeeperException e) {
-            LOG.error("Problem processing ZooKeeper node", e);
-        } catch (InterruptedException e) {
-            LOG.error("Problem processing ZooKeeper node", e);
-        }
-        return false;
-    }
-
-    /**
-     * Retrieves data from the given node and parses it into an EndpointDescription.
-     *
-     * @param node a node path
-     * @return endpoint found in the node or null if no endpoint was found
-     */
-    private EndpointDescription getEndpointDescriptionFromNode(String node) {
-        try {
-            Stat stat = zk.exists(node, false);
-            if (stat == null || stat.getDataLength() <= 0) {
-                return null;
-            }
-            byte[] data = zk.getData(node, false, null);
-            LOG.debug("Got data for node: {}", node);
-
-            EndpointDescription endpoint = parser.readEndpoint(new ByteArrayInputStream(data));
-            if (endpoint != null) {
-                return endpoint;
-            }
-            LOG.warn("No Discovery information found for node: {}", node);
-        } catch (Exception e) {
-            LOG.error("Problem getting EndpointDescription from node " + node, e);
-        }
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
index 0aa98b3..7b5c7d2 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
@@ -18,23 +18,13 @@
  */
 package org.apache.aries.rsa.discovery.zookeeper.subscribe;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Dictionary;
 import java.util.HashMap;
-import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
 import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
 import org.apache.aries.rsa.util.StringPlus;
-import org.apache.zookeeper.ZooKeeper;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Filter;
 import org.osgi.framework.ServiceReference;
 import org.osgi.service.remoteserviceadmin.EndpointDescription;
 import org.osgi.service.remoteserviceadmin.EndpointEvent;
@@ -50,165 +40,95 @@ import org.slf4j.LoggerFactory;
  * These events are then forwarded to all interested EndpointEventListeners.
  */
 @SuppressWarnings({"deprecation", "rawtypes"})
-public class InterfaceMonitorManager {
+public class InterfaceMonitorManager implements EndpointEventListener {
     private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitorManager.class);
-    private static final Pattern OBJECTCLASS_PATTERN = Pattern.compile(".*\\(objectClass=([^)]+)\\).*");
 
-    private final BundleContext bctx;
-    private final ZooKeeper zk;
-    // map of EndpointEventListeners and the scopes they are interested in
-    private final Map<ServiceReference, List<String>> epListenerScopes =
-            new HashMap<ServiceReference, List<String>>();
-    // map of scopes and their interest data
-    private final Map<String, Interest> interests = new HashMap<String, Interest>();
+    private final ZookeeperEndpointRepository repository;
+    private final Map<ServiceReference, Interest> interests = new HashMap<ServiceReference, Interest>();
 
     protected static class Interest {
-        List<ServiceReference> epListeners = 
-            new CopyOnWriteArrayList<ServiceReference>();
-        InterfaceMonitor monitor;
+        List<String> scopes;
+        Object epListener;
     }
 
-    public InterfaceMonitorManager(BundleContext bctx, ZooKeeper zk) {
-        this.bctx = bctx;
-        this.zk = zk;
+    public InterfaceMonitorManager(ZookeeperEndpointRepository repository) {
+        this.repository = repository;
     }
 
-    public void addInterest(ServiceReference<?> eplistener) {
-        if (isOurOwnEndpointEventListener(eplistener)) {
+    public void addInterest(ServiceReference<?> sref, Object epListener) {
+        if (isOurOwnEndpointEventListener(sref)) {
             LOG.debug("Skipping our own EndpointEventListener");
             return;
         }
-        List<String> scopes = getScopes(eplistener);
+        List<String> scopes = getScopes(sref);
         LOG.debug("adding Interests: {}", scopes);
         
-        for (String scope : scopes) {
-            String objClass = getObjectClass(scope);
-            addInterest(eplistener, scope, objClass);
-        }
-    }
-
-    private static boolean isOurOwnEndpointEventListener(ServiceReference<?> EndpointEventListener) {
-        return Boolean.parseBoolean(String.valueOf(
-                EndpointEventListener.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)));
-    }
-
-    public synchronized void addInterest(ServiceReference epListener, 
-                                         String scope, String objClass) {
         // get or create interest for given scope and add listener to it
-        Interest interest = interests.get(scope);
+        Interest interest = interests.get(epListener);
         if (interest == null) {
             // create interest, add listener and start monitor
             interest = new Interest();
-            interests.put(scope, interest);
-            interest.epListeners.add(epListener); // add it before monitor starts so we don't miss events
-            interest.monitor = createInterfaceMonitor(scope, objClass, interest);
-            interest.monitor.start();
-        } else {
-            // interest already exists, so just add listener to it
-            if (!interest.epListeners.contains(epListener)) {
-                interest.epListeners.add(epListener);
-            }
-            // notify listener of all known endpoints for given scope
-            // (as EndpointEventListener contract requires of all added/modified listeners)
-            for (EndpointDescription endpoint : interest.monitor.getEndpoints()) {
-                EndpointEvent event = new EndpointEvent(EndpointEvent.ADDED, endpoint);
-                notifyListeners(event, scope, Arrays.asList(epListener));
-            }
-        }
-
-        // add scope to listener's scopes list
-        List<String> scopes = epListenerScopes.get(epListener);
-        if (scopes == null) {
-            scopes = new ArrayList<String>(1);
-            epListenerScopes.put(epListener, scopes);
-        }
-        if (!scopes.contains(scope)) {
-            scopes.add(scope);
+            interest.epListener = epListener;
+            interest.scopes = scopes;
+            interests.put(sref, interest);
+            sendExistingEndpoints(scopes, epListener);
         }
     }
 
-    public synchronized void removeInterest(ServiceReference<EndpointEventListener> EndpointEventListener) {
-        LOG.info("removing EndpointEventListener interests: {}", EndpointEventListener);
-        List<String> scopes = epListenerScopes.get(EndpointEventListener);
-        if (scopes == null) {
-            return;
+    private void sendExistingEndpoints(List<String> scopes, Object epListener) {
+        for (EndpointDescription endpoint : repository.getAll()) {
+            EndpointEvent event = new EndpointEvent(EndpointEvent.ADDED, endpoint);
+            notifyListener(event, scopes, epListener);
         }
+    }
 
-        for (String scope : scopes) {
-            Interest interest = interests.get(scope);
-            if (interest != null) {
-                interest.epListeners.remove(EndpointEventListener);
-                if (interest.epListeners.isEmpty()) {
-                    interest.monitor.close();
-                    interests.remove(scope);
-                }
-            }
-        }
-        epListenerScopes.remove(EndpointEventListener);
+    private static boolean isOurOwnEndpointEventListener(ServiceReference<?> EndpointEventListener) {
+        return Boolean.parseBoolean(String.valueOf(
+                EndpointEventListener.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)));
     }
 
-    protected InterfaceMonitor createInterfaceMonitor(final String scope, String objClass, final Interest interest) {
-        // holding this object's lock in the callbacks can lead to a deadlock with InterfaceMonitor
-        EndpointEventListener listener = new EndpointEventListener() {
+    public synchronized void removeInterest(ServiceReference<EndpointEventListener> epListenerRef) {
+        LOG.info("removing EndpointEventListener interests: {}", epListenerRef);
+        interests.remove(epListenerRef);
+    }
 
-            @Override
-            public void endpointChanged(EndpointEvent event, String filter) {
-                notifyListeners(event, scope, interest.epListeners);
-            }
-        };
-        return new InterfaceMonitor(zk, objClass, listener, scope);
+    @Override
+    public void endpointChanged(EndpointEvent event, String filter) {
+        for (Interest interest : interests.values()) {
+            notifyListener(event, interest.scopes, interest.epListener);
+        }
     }
 
-    private void notifyListeners(EndpointEvent event, String currentScope,
-            List<ServiceReference> epListeners) {
+    private void notifyListener(EndpointEvent event, List<String> scopes, Object service) {
         EndpointDescription endpoint = event.getEndpoint();
-        for (ServiceReference<?> epListenerRef : epListeners) {
-            if (epListenerRef.getBundle() == null) {
-                LOG.info("listening service was unregistered, ignoring");
-            }
-            Object service = bctx.getService(epListenerRef);
-            LOG.trace("matching {} against {}", endpoint, currentScope);
-            if (matchFilter(bctx, currentScope, endpoint)) {
-                LOG.debug("Matched {} against {}", endpoint, currentScope);
-            try {
-                if (service instanceof EndpointEventListener) {
-                    EndpointEventListener epeListener = (EndpointEventListener)service;
-                    notifyListener(event, currentScope, epeListener);
-                } else if (service instanceof EndpointListener) {
-                    EndpointListener epListener = (EndpointListener)service;
-                    notifyListener(event, currentScope, epListener);
-                }
-            } finally {
-                if (service != null) {
-                    bctx.ungetService(epListenerRef);
-                }
-            }
-            }
+        String currentScope = getFirstMatch(scopes, endpoint);
+        if (currentScope == null) {
+            return;
         }
-    }
-    
-    private static boolean matchFilter(BundleContext bctx, String filter, EndpointDescription endpoint) {
-        if (filter == null) {
-            return false;
+        LOG.debug("Matched {} against {}", endpoint, currentScope);
+        if (service instanceof EndpointEventListener) {
+            notifyEEListener(event, currentScope, (EndpointEventListener)service);
+        } else if (service instanceof EndpointListener) {
+            notifyEListener(event, currentScope, (EndpointListener)service);
         }
+    }
     
-        try {
-            Filter f = bctx.createFilter(filter);
-            Dictionary<String, Object> dict = new Hashtable<String, Object>(endpoint.getProperties());
-            return f.match(dict);
-        } catch (Exception e) {
-            return false;
+    private String getFirstMatch(List<String> scopes, EndpointDescription endpoint) {
+        for (String scope : scopes) {
+            if (endpoint.matches(scope)) {
+                return scope;
+            }
         }
+        return null;
     }
 
-
-    private void notifyListener(EndpointEvent event, String currentScope, EndpointEventListener listener) {
+    private void notifyEEListener(EndpointEvent event, String currentScope, EndpointEventListener listener) {
         EndpointDescription endpoint = event.getEndpoint();
         LOG.info("Calling endpointchanged on class {} for filter {}, type {}, endpoint {} ", listener, currentScope, endpoint);
         listener.endpointChanged(event, currentScope);
     }
     
-    private void notifyListener(EndpointEvent event, String currentScope, EndpointListener listener) {
+    private void notifyEListener(EndpointEvent event, String currentScope, EndpointListener listener) {
         EndpointDescription endpoint = event.getEndpoint();
         LOG.info("Calling old listener on class {} for filter {}, type {}, endpoint {} ", listener, currentScope, endpoint);
         switch (event.getType()) {
@@ -228,49 +148,18 @@ public class InterfaceMonitorManager {
     }
 
     public synchronized void close() {
-        for (Interest interest : interests.values()) {
-            interest.monitor.close();
-        }
         interests.clear();
-        epListenerScopes.clear();
     }
 
     /**
      * Only for test case!
      */
-    protected synchronized Map<String, Interest> getInterests() {
+    protected synchronized Map<ServiceReference, Interest> getInterests() {
         return interests;
     }
 
-    /**
-     * Only for test case!
-     */
-    protected synchronized Map<ServiceReference, List<String>> getEndpointListenerScopes() {
-        return epListenerScopes;
-    }
-
     protected List<String> getScopes(ServiceReference<?> sref) {
         return StringPlus.normalize(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE));
     }
-    
-    public static String getObjectClass(String scope) {
-        Matcher m = OBJECTCLASS_PATTERN.matcher(scope);
-        return m.matches() ? m.group(1) : null;
-    }
 
-    /**
-     * Returns a service's properties as a map.
-     *
-     * @param serviceReference a service reference
-     * @return the service's properties as a map
-     */
-    public static Map<String, Object> getProperties(ServiceReference<?> serviceReference) {
-        String[] keys = serviceReference.getPropertyKeys();
-        Map<String, Object> props = new HashMap<String, Object>(keys.length);
-        for (String key : keys) {
-            Object val = serviceReference.getProperty(key);
-            props.put(key, val);
-        }
-        return props;
-    }
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
index 3a20f5a..d9f23e6 100644
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
@@ -1,15 +1,22 @@
 package org.apache.aries.rsa.discovery.zookeeper.repository;
 
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.samePropertyValuesAs;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -32,6 +39,7 @@ public class ZookeeperEndpointRepositoryTest {
     private ZooKeeperServer server;
     private ZooKeeper zk;
     private ServerCnxnFactory factory;
+    private List<EndpointEvent> events = new ArrayList<>();
 
     @Before
     public void before() throws IOException, InterruptedException, KeeperException {
@@ -62,25 +70,38 @@ public class ZookeeperEndpointRepositoryTest {
 
     @After
     public void after() throws InterruptedException {
-        zk.close();
+        //zk.close(); // Seems to cause SessionTimeout error 
         factory.shutdown();
     }
 
     @Test
     public void test() throws IOException, URISyntaxException, KeeperException, InterruptedException {
+        final Semaphore sem = new Semaphore(0);
         EndpointEventListener listener = new EndpointEventListener() {
             
             @Override
             public void endpointChanged(EndpointEvent event, String filter) {
-                System.out.println(event);
+                events.add(event);
+                sem.release();
             }
         };
         ZookeeperEndpointRepository repository = new ZookeeperEndpointRepository(zk, listener);
+        
         EndpointDescription endpoint = createEndpoint();
         repository.add(endpoint);
         
+        assertThat(sem.tryAcquire(1000, TimeUnit.SECONDS), equalTo(true));
+
         String path = "/osgi/service_registry/java/lang/Runnable/test.de#-1##service1";
         EndpointDescription ep2 = repository.read(path);
+        assertNotNull(ep2);
+
+        repository.remove(endpoint);
+
+        assertThat(sem.tryAcquire(1000, TimeUnit.SECONDS), equalTo(true));
+        assertThat(events.get(0), samePropertyValuesAs(new EndpointEvent(EndpointEvent.ADDED, endpoint)));
+        assertThat(events.get(1), samePropertyValuesAs(new EndpointEvent(EndpointEvent.REMOVED, endpoint)));
+        
         repository.close();
     }
     

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java
index 84eca09..41b0795 100644
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java
@@ -18,95 +18,68 @@
  */
 package org.apache.aries.rsa.discovery.zookeeper.subscribe;
 
-import static org.easymock.EasyMock.getCurrentArguments;
+import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 
-import java.util.Collections;
-import java.util.Dictionary;
-import java.util.Hashtable;
+import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
+import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
 import org.easymock.EasyMock;
-import org.easymock.IAnswer;
 import org.easymock.IMocksControl;
 import org.junit.Test;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
 import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
 import org.osgi.service.remoteserviceadmin.EndpointEventListener;
 
 public class InterfaceMonitorManagerTest {
 
     @Test
     public void testEndpointListenerTrackerCustomizer() {
-        IMocksControl c = EasyMock.createNiceControl();
-        BundleContext ctx = c.createMock(BundleContext.class);
-        ServiceReference<EndpointEventListener> sref = createService(c, "(objectClass=mine)", "mine");
-        ServiceReference<EndpointEventListener> sref2 = createService(c, "(objectClass=mine)", "mine");
-        ZooKeeper zk = c.createMock(ZooKeeper.class);
-        InterfaceMonitorManager eltc = new InterfaceMonitorManager(ctx, zk);
+        IMocksControl c = EasyMock.createControl();
+        ServiceReference<EndpointEventListener> sref = createService(c, "(objectClass=mine)");
+        ServiceReference<EndpointEventListener> sref2 = createService(c, "(objectClass=mine)");
+        ZookeeperEndpointRepository repository = c.createMock(ZookeeperEndpointRepository.class);
+        List<EndpointDescription> endpoints = new ArrayList<>();
+        expect(repository.getAll()).andReturn(endpoints).atLeastOnce();
+        EndpointEventListener epListener1 = c.createMock(EndpointEventListener.class); 
+        EndpointEventListener epListener2 = c.createMock(EndpointEventListener.class); 
 
         c.replay();
 
+        InterfaceMonitorManager eltc = new InterfaceMonitorManager(repository);
         // sref has no scope -> nothing should happen
-        assertEquals(0, eltc.getEndpointListenerScopes().size());
         assertEquals(0, eltc.getInterests().size());
 
-        eltc.addInterest(sref);
-        assertScopeIncludes(sref, eltc);
-        assertEquals(1, eltc.getEndpointListenerScopes().size());
-        assertEquals(1, eltc.getInterests().size());
 
-        eltc.addInterest(sref);
-        assertScopeIncludes(sref, eltc);
-        assertEquals(1, eltc.getEndpointListenerScopes().size());
+        eltc.addInterest(sref, epListener1);
         assertEquals(1, eltc.getInterests().size());
 
-        eltc.addInterest(sref2);
-        assertScopeIncludes(sref, eltc);
-        assertScopeIncludes(sref2, eltc);
-        assertEquals(2, eltc.getEndpointListenerScopes().size());
+        eltc.addInterest(sref, epListener1);
         assertEquals(1, eltc.getInterests().size());
 
+        eltc.addInterest(sref2, epListener2);
+        assertEquals(2, eltc.getInterests().size());
+
         eltc.removeInterest(sref);
-        assertScopeIncludes(sref2, eltc);
-        assertEquals(1, eltc.getEndpointListenerScopes().size());
         assertEquals(1, eltc.getInterests().size());
 
         eltc.removeInterest(sref);
-        assertScopeIncludes(sref2, eltc);
-        assertEquals(1, eltc.getEndpointListenerScopes().size());
         assertEquals(1, eltc.getInterests().size());
 
         eltc.removeInterest(sref2);
-        assertEquals(0, eltc.getEndpointListenerScopes().size());
         assertEquals(0, eltc.getInterests().size());
 
         c.verify();
     }
 
     @SuppressWarnings("unchecked")
-    private ServiceReference<EndpointEventListener> createService(IMocksControl c, String scope, String objectClass) {
+    private ServiceReference<EndpointEventListener> createService(IMocksControl c, String scope) {
         ServiceReference<EndpointEventListener> sref = c.createMock(ServiceReference.class);
-        final Dictionary<String, String> props = new Hashtable<>();
-        props.put(EndpointEventListener.ENDPOINT_LISTENER_SCOPE, scope);
-        props.put(Constants.OBJECTCLASS, objectClass);
-        String[] keys = Collections.list(props.keys()).toArray(new String[]{});
-        EasyMock.expect(sref.getPropertyKeys()).andReturn(keys).anyTimes();
-        EasyMock.expect(sref.getProperty((String)EasyMock.anyObject())).andAnswer(new IAnswer<Object>() {
-            public Object answer() throws Throwable {
-                return props.get(getCurrentArguments()[0]);
-            }
-        }).anyTimes();
+        expect(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE)).andReturn(scope).atLeastOnce();
+        expect(sref.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)).andReturn(null).atLeastOnce();
         return sref;
     }
 
-    private void assertScopeIncludes(ServiceReference<EndpointEventListener> sref, InterfaceMonitorManager imm) {
-        List<String> srefScope = imm.getEndpointListenerScopes().get(sref);
-        assertEquals(1, srefScope.size());
-        assertEquals("(objectClass=mine)", srefScope.get(0));
-        
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
deleted file mode 100644
index e09cfbf..0000000
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.subscribe;
-
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-
-import java.util.Collections;
-
-import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.AsyncCallback.DataCallback;
-import org.apache.zookeeper.data.Stat;
-import org.easymock.EasyMock;
-import org.easymock.IMocksControl;
-import org.osgi.service.remoteserviceadmin.EndpointEventListener;
-
-import junit.framework.TestCase;
-
-public class InterfaceMonitorTest extends TestCase {
-
-    public void testInterfaceMonitor() throws KeeperException, InterruptedException {
-        IMocksControl c = EasyMock.createControl();
-
-        ZooKeeper zk = c.createMock(ZooKeeper.class);
-        expect(zk.getState()).andReturn(ZooKeeper.States.CONNECTED).anyTimes();
-
-        String scope = "(myProp=test)";
-        String interf = "es.schaaf.test";
-        String node = ZookeeperEndpointRepository.getZooKeeperPath(interf);
-
-        EndpointEventListener endpointListener = c.createMock(EndpointEventListener.class);
-        InterfaceMonitor im = new InterfaceMonitor(zk, interf, endpointListener, scope);
-        zk.exists(eq(node), eq(im), eq(im), EasyMock.anyObject());
-        EasyMock.expectLastCall().once();
-        zk.getData(eq(node), eq(im), EasyMock.anyObject(DataCallback.class), EasyMock.anyObject());
-        expectLastCall();
-
-        expect(zk.exists(eq(node), eq(false))).andReturn(new Stat()).anyTimes();
-        expect(zk.getChildren(eq(node), eq(false))).andReturn(Collections.<String> emptyList()).once();
-        expect(zk.getChildren(eq(node), eq(im))).andReturn(Collections.<String> emptyList()).once();
-
-        c.replay();
-        im.start();
-        // simulate a zk callback
-        WatchedEvent we = new WatchedEvent(EventType.NodeCreated, KeeperState.SyncConnected, node);
-        im.process(we);
-        c.verify();
-    }
-}


Mime
View raw message