[ARIES-1774] Centralize Zookeeper logic in ZookeeperEndpointRepository
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/f07ee8b5
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/f07ee8b5
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/f07ee8b5
Branch: refs/heads/master
Commit: f07ee8b59e226579a55c3463329abf912a2ae291
Parents: fa57fb2
Author: Christian Schneider <cschneid@adobe.com>
Authored: Wed Feb 7 16:52:07 2018 +0100
Committer: Christian Schneider <cschneid@adobe.com>
Committed: Thu Feb 8 17:40:34 2018 +0100
----------------------------------------------------------------------
.../discovery/zookeeper/ZooKeeperDiscovery.java | 7 +-
.../publish/PublishingEndpointListener.java | 178 ++------------
.../repository/ZookeeperEndpointRepository.java | 239 +++++++++++++++++++
.../zookeeper/subscribe/InterfaceMonitor.java | 4 +-
.../subscribe/InterfaceMonitorManager.java | 4 +-
.../rsa/discovery/zookeeper/util/Utils.java | 57 -----
.../publish/PublishingEndpointListenerTest.java | 23 +-
.../ZookeeperEndpointRepositoryTest.java | 116 +++++++++
.../subscribe/InterfaceMonitorTest.java | 4 +-
.../rsa/discovery/zookeeper/util/UtilsTest.java | 37 ---
10 files changed, 383 insertions(+), 286 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
index c8b020d..d265a22 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.aries.rsa.discovery.zookeeper.publish.PublishingEndpointListener;
+import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
import org.apache.aries.rsa.discovery.zookeeper.subscribe.EndpointListenerTracker;
import org.apache.aries.rsa.discovery.zookeeper.subscribe.InterfaceMonitorManager;
import org.apache.zookeeper.WatchedEvent;
@@ -55,6 +56,8 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
private Dictionary<String, ?> curConfiguration;
+ private ZookeeperEndpointRepository repository;
+
public ZooKeeperDiscovery(BundleContext bctx) {
this.bctx = bctx;
}
@@ -92,7 +95,8 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
return;
}
LOG.debug("starting ZookeeperDiscovery");
- endpointListener = new PublishingEndpointListener(zkClient, bctx);
+ repository = new ZookeeperEndpointRepository(zkClient);
+ endpointListener = new PublishingEndpointListener(repository);
endpointListener.start(bctx);
imManager = new InterfaceMonitorManager(bctx, zkClient);
endpointListenerTracker = new EndpointListenerTracker(bctx, imManager);
@@ -108,7 +112,6 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService {
closed |= close;
if (endpointListener != null) {
endpointListener.stop();
- endpointListener.close();
}
if (endpointListenerTracker != null) {
endpointListenerTracker.close();
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
index c3bf01f..ceef6b0 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
@@ -18,29 +18,11 @@
*/
package org.apache.aries.rsa.discovery.zookeeper.publish;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Dictionary;
-import java.util.HashMap;
import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
-import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
-import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.ServiceRegistration;
@@ -49,28 +31,23 @@ import org.osgi.service.remoteserviceadmin.EndpointEvent;
import org.osgi.service.remoteserviceadmin.EndpointEventListener;
import org.osgi.service.remoteserviceadmin.EndpointListener;
import org.osgi.service.remoteserviceadmin.RemoteConstants;
-import org.osgi.util.tracker.ServiceTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Listens for local Endpoints and publishes them to ZooKeeper.
+ * Listens for local EndpointEvents using old and new style listeners and publishes changes
to
+ * the ZooKeeperEndpointRepository
*/
@SuppressWarnings("deprecation")
public class PublishingEndpointListener implements EndpointEventListener, EndpointListener
{
private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListener.class);
- private final ZooKeeper zk;
- private final List<EndpointDescription> endpoints = new ArrayList<EndpointDescription>();
- private boolean closed;
- private final EndpointDescriptionParser endpointDescriptionParser;
-
private ServiceRegistration<?> listenerReg;
+ private ZookeeperEndpointRepository repository;
- public PublishingEndpointListener(ZooKeeper zk, BundleContext bctx) {
- this.zk = zk;
- endpointDescriptionParser = new EndpointDescriptionParser();
+ public PublishingEndpointListener(ZookeeperEndpointRepository repository) {
+ this.repository = repository;
}
public void start(BundleContext bctx) {
@@ -109,155 +86,28 @@ public class PublishingEndpointListener implements EndpointEventListener,
Endpoi
private void endpointModified(EndpointDescription endpoint, String filter) {
try {
- modifyEndpoint(endpoint);
+ repository.modify(endpoint);
} catch (Exception e) {
LOG.error("Error modifying endpoint data in zookeeper for endpoint {}", endpoint.getId(),
e);
}
}
- private void modifyEndpoint(EndpointDescription endpoint) throws URISyntaxException,
KeeperException, InterruptedException {
- Collection<String> interfaces = endpoint.getInterfaces();
- String endpointKey = getKey(endpoint);
- Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties());
-
- LOG.info("Changing endpoint in zookeeper: {}", endpoint);
- for (String name : interfaces) {
- String path = Utils.getZooKeeperPath(name);
- String fullPath = path + '/' + endpointKey;
- LOG.info("Changing ZooKeeper node for service with path {}", fullPath);
- createPath(path, zk);
- zk.setData(fullPath, getData(endpoint), -1);
- }
- }
-
@Override
public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
- synchronized (endpoints) {
- if (closed) {
- return;
- }
- if (endpoints.contains(endpoint)) {
- // TODO -> Should the published endpoint be updated here?
- return;
- }
-
- try {
- addEndpoint(endpoint);
- endpoints.add(endpoint);
- } catch (Exception ex) {
- LOG.error("Exception while processing the addition of an endpoint.", ex);
- }
- }
- }
-
- private byte[] getData(EndpointDescription epd) {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- endpointDescriptionParser.writeEndpoint(epd, bos);
- return bos.toByteArray();
- }
-
- private void addEndpoint(EndpointDescription endpoint) throws URISyntaxException, KeeperException,
- InterruptedException, IOException
{
- Collection<String> interfaces = endpoint.getInterfaces();
- String endpointKey = getKey(endpoint);
- LOG.info("Exporting endpoint to zookeeper: {}", endpoint);
- for (String name : interfaces) {
- String path = Utils.getZooKeeperPath(name);
- String fullPath = path + '/' + endpointKey;
- LOG.info("Creating ZooKeeper node for service with path {}", fullPath);
- createPath(path, zk);
- createEphemeralNode(fullPath, getData(endpoint));
- }
- }
-
- private void createEphemeralNode(String fullPath, byte[] data) throws KeeperException,
InterruptedException {
try {
- zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- } catch (NodeExistsException nee) {
- // this sometimes happens after a ZooKeeper node dies and the ephemeral node
- // that belonged to the old session was not yet deleted. We need to make our
- // session the owner of the node so it won't get deleted automatically -
- // we do this by deleting and recreating it ourselves.
- LOG.info("node for endpoint already exists, recreating: {}", fullPath);
- try {
- zk.delete(fullPath, -1);
- } catch (NoNodeException nne) {
- // it's a race condition, but as long as it got deleted - it's ok
- }
- zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ repository.add(endpoint);
+ } catch (Exception ex) {
+ LOG.error("Exception while processing the addition of an endpoint.", ex);
}
}
@Override
public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
- LOG.info("Local EndpointDescription removed: {}", endpoint);
-
- synchronized (endpoints) {
- if (closed) {
- return;
- }
- if (!endpoints.contains(endpoint)) {
- return;
- }
-
- try {
- removeEndpoint(endpoint);
- endpoints.remove(endpoint);
- } catch (Exception ex) {
- LOG.error("Exception while processing the removal of an endpoint", ex);
- }
- }
- }
-
- private void removeEndpoint(EndpointDescription endpoint) throws UnknownHostException,
URISyntaxException {
- Collection<String> interfaces = endpoint.getInterfaces();
- String endpointKey = getKey(endpoint);
- for (String name : interfaces) {
- String path = Utils.getZooKeeperPath(name);
- String fullPath = path + '/' + endpointKey;
- LOG.debug("Removing ZooKeeper node: {}", fullPath);
- try {
- zk.delete(fullPath, -1);
- } catch (Exception ex) {
- LOG.debug("Error while removing endpoint: {}", ex); // e.g. session expired
- }
- }
- }
-
- private static void createPath(String path, ZooKeeper zk) throws KeeperException, InterruptedException
{
- StringBuilder current = new StringBuilder();
- List<String> parts = Utils.removeEmpty(Arrays.asList(path.split("/")));
- for (String part : parts) {
- current.append('/');
- current.append(part);
- try {
- if (zk.exists(current.toString(), false) == null) {
- zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- } catch (NodeExistsException nee) {
- // it's not the first node with this path to ever exist - that's normal
- }
+ try {
+ repository.remove(endpoint);
+ } catch (Exception ex) {
+ LOG.error("Exception while processing the removal of an endpoint", ex);
}
}
- private static String getKey(EndpointDescription endpoint) throws URISyntaxException
{
- URI uri = new URI(endpoint.getId());
- return new StringBuilder().append(uri.getHost()).append("#").append(uri.getPort())
- .append("#").append(uri.getPath().replace('/', '#')).toString();
- }
-
- public void close() {
- LOG.debug("closing - removing all endpoints");
- synchronized (endpoints) {
- closed = true;
- for (EndpointDescription endpoint : endpoints) {
- try {
- removeEndpoint(endpoint);
- } catch (Exception ex) {
- LOG.error("Exception while removing endpoint during close", ex);
- }
- }
- endpoints.clear();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
new file mode 100644
index 0000000..2349c45
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
@@ -0,0 +1,239 @@
+package org.apache.aries.rsa.discovery.zookeeper.repository;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointEvent;
+import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZookeeperEndpointRepository implements Closeable, Watcher {
+ private static final Logger LOG = LoggerFactory.getLogger(ZookeeperEndpointRepository.class);
+ private final ZooKeeper zk;
+ private final EndpointDescriptionParser parser;
+ private EndpointEventListener listener;
+ public static final String PATH_PREFIX = "/osgi/service_registry";
+
+ private Map<String, EndpointDescription> nodes = new ConcurrentHashMap<String,
EndpointDescription>();
+
+ public ZookeeperEndpointRepository(ZooKeeper zk) {
+ this(zk, null);
+ }
+
+ public ZookeeperEndpointRepository(ZooKeeper zk, EndpointEventListener listener) {
+ this.zk = zk;
+ this.listener = listener;
+ this.parser = new EndpointDescriptionParser();
+ try {
+ createPath(PATH_PREFIX);
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to create base path");
+ }
+ // Not yet needed
+ //this.registerWatcher();
+ }
+
+ private void registerWatcher() {
+ try {
+ List<String> children = zk.getChildren(ZookeeperEndpointRepository.PATH_PREFIX,
this);
+ System.out.println(children);
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ protected void notifyListener(WatchedEvent wevent) {
+ EndpointDescription ep = read(wevent.getPath());
+ if (ep != null) {
+ int type = getEndpointEventType(wevent);
+ EndpointEvent event = new EndpointEvent(type, ep);
+ listener.endpointChanged(event, null);
+ }
+ }
+
+ private int getEndpointEventType(WatchedEvent wevent) {
+ EventType type = wevent.getType();
+ return EndpointEvent.ADDED;
+ }
+
+ /**
+ * Retrieves data from the given node and parses it into an EndpointDescription.
+ *
+ * @param path a node path
+ * @return endpoint found in the node or null if no endpoint was found
+ */
+ public EndpointDescription read(String path) {
+ try {
+ Stat stat = zk.exists(path, false);
+ if (stat == null || stat.getDataLength() <= 0) {
+ return null;
+ }
+ byte[] data = zk.getData(path, false, null);
+ LOG.debug("Got data for node: {}", path);
+
+ EndpointDescription endpoint = parser.readEndpoint(new ByteArrayInputStream(data));
+ if (endpoint != null) {
+ return endpoint;
+ }
+ LOG.warn("No Discovery information found for node: {}", path);
+ } catch (Exception e) {
+ LOG.error("Problem getting EndpointDescription from node " + path, e);
+ }
+ return null;
+ }
+
+ public void add(EndpointDescription endpoint) throws URISyntaxException, KeeperException,
+ InterruptedException, IOException {
+ Collection<String> interfaces = endpoint.getInterfaces();
+ String endpointKey = getKey(endpoint);
+
+ LOG.info("Exporting endpoint to zookeeper: {}", endpoint);
+ for (String name : interfaces) {
+ String path = ZookeeperEndpointRepository.getZooKeeperPath(name);
+ String fullPath = path + '/' + endpointKey;
+ LOG.info("Creating ZooKeeper node for service with path {}", fullPath);
+ createPath(path);
+ createEphemeralNode(fullPath, getData(endpoint));
+ }
+ }
+
+ public void modify(EndpointDescription endpoint) throws URISyntaxException, KeeperException,
InterruptedException {
+ Collection<String> interfaces = endpoint.getInterfaces();
+ String endpointKey = getKey(endpoint);
+
+ LOG.info("Changing endpoint in zookeeper: {}", endpoint);
+ for (String name : interfaces) {
+ String path = ZookeeperEndpointRepository.getZooKeeperPath(name);
+ String fullPath = path + '/' + endpointKey;
+ LOG.info("Changing ZooKeeper node for service with path {}", fullPath);
+ createPath(path);
+ zk.setData(fullPath, getData(endpoint), -1);
+ }
+ }
+
+ public void remove(EndpointDescription endpoint) throws UnknownHostException, URISyntaxException
{
+ Collection<String> interfaces = endpoint.getInterfaces();
+ String endpointKey = getKey(endpoint);
+ for (String name : interfaces) {
+ String path = ZookeeperEndpointRepository.getZooKeeperPath(name);
+ String fullPath = path + '/' + endpointKey;
+ LOG.debug("Removing ZooKeeper node: {}", fullPath);
+ try {
+ zk.delete(fullPath, -1);
+ } catch (Exception ex) {
+ LOG.debug("Error while removing endpoint: {}", ex); // e.g. session expired
+ }
+ }
+ }
+
+ public List<EndpointDescription> getAll() throws KeeperException, InterruptedException
{
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ private byte[] getData(EndpointDescription epd) {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ parser.writeEndpoint(epd, bos);
+ return bos.toByteArray();
+ }
+
+ private void createEphemeralNode(String fullPath, byte[] data) throws KeeperException,
InterruptedException {
+ try {
+ zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ } catch (NodeExistsException nee) {
+ // this sometimes happens after a ZooKeeper node dies and the ephemeral node
+ // that belonged to the old session was not yet deleted. We need to make our
+ // session the owner of the node so it won't get deleted automatically -
+ // we do this by deleting and recreating it ourselves.
+ LOG.info("node for endpoint already exists, recreating: {}", fullPath);
+ try {
+ zk.delete(fullPath, -1);
+ } catch (NoNodeException nne) {
+ // it's a race condition, but as long as it got deleted - it's ok
+ }
+ zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ }
+ }
+
+ private void createPath(String path) throws KeeperException, InterruptedException {
+ StringBuilder current = new StringBuilder();
+ List<String> parts = ZookeeperEndpointRepository.removeEmpty(Arrays.asList(path.split("/")));
+ for (String part : parts) {
+ current.append('/');
+ current.append(part);
+ try {
+ if (zk.exists(current.toString(), false) == null) {
+ zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ } catch (NodeExistsException nee) {
+ // it's not the first node with this path to ever exist - that's normal
+ }
+ }
+ }
+
+ /**
+ * Removes nulls and empty strings from the given string array.
+ *
+ * @param strings an array of strings
+ * @return a new array containing the non-null and non-empty
+ * elements of the original array in the same order
+ */
+ public static List<String> removeEmpty(List<String> strings) {
+ List<String> result = new ArrayList<String>();
+ if (strings == null) {
+ return result;
+ }
+ for (String s : strings) {
+ if (s != null && !s.isEmpty()) {
+ result.add(s);
+ }
+ }
+ return result;
+ }
+
+ public static String getZooKeeperPath(String name) {
+ return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + name.replace('.',
'/');
+ }
+
+ private static String getKey(EndpointDescription endpoint) throws URISyntaxException
{
+ URI uri = new URI(endpoint.getId());
+ return new StringBuilder().append(uri.getHost()).append("#").append(uri.getPort())
+ .append("#").append(uri.getPath().replace('/', '#')).toString();
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
index 7078bb8..2c90b3c 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java
@@ -25,7 +25,7 @@ import java.util.List;
import java.util.Map;
import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
-import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
+import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
@@ -67,7 +67,7 @@ public class InterfaceMonitor implements Watcher, StatCallback {
public InterfaceMonitor(ZooKeeper zk, String objClass, EndpointEventListener endpointListener,
String scope) {
this.zk = zk;
- this.znode = Utils.getZooKeeperPath(objClass);
+ this.znode = ZookeeperEndpointRepository.getZooKeeperPath(objClass);
this.recursive = objClass == null || objClass.isEmpty();
this.endpointListener = endpointListener;
this.parser = new EndpointDescriptionParser();
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
index 7d6e4ae..0aa98b3 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java
@@ -30,7 +30,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
-import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
+import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
import org.apache.aries.rsa.util.StringPlus;
import org.apache.zookeeper.ZooKeeper;
import org.osgi.framework.BundleContext;
@@ -250,7 +250,7 @@ public class InterfaceMonitorManager {
}
protected List<String> getScopes(ServiceReference<?> sref) {
- return Utils.removeEmpty(StringPlus.normalize(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE)));
+ return StringPlus.normalize(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE));
}
public static String getObjectClass(String scope) {
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java
b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java
deleted file mode 100644
index 289ae32..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/util/Utils.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.util;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public final class Utils {
-
- static final String PATH_PREFIX = "/osgi/service_registry";
-
- private Utils() {
- // never constructed
- }
-
- public static String getZooKeeperPath(String name) {
- return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + name.replace('.',
'/');
- }
-
- /**
- * Removes nulls and empty strings from the given string array.
- *
- * @param strings an array of strings
- * @return a new array containing the non-null and non-empty
- * elements of the original array in the same order
- */
- public static List<String> removeEmpty(List<String> strings) {
- List<String> result = new ArrayList<String>();
- if (strings == null) {
- return result;
- }
- for (String s : strings) {
- if (s != null && !s.isEmpty()) {
- result.add(s);
- }
- }
- return result;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
index b7debf6..a61cf76 100644
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
@@ -23,13 +23,13 @@ import static org.easymock.EasyMock.expect;
import java.util.HashMap;
import java.util.Map;
+import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.easymock.EasyMock;
import org.easymock.IMocksControl;
-import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.service.remoteserviceadmin.EndpointDescription;
import org.osgi.service.remoteserviceadmin.EndpointEvent;
@@ -44,7 +44,6 @@ public class PublishingEndpointListenerTest extends TestCase {
public void testEndpointRemovalAdding() throws KeeperException, InterruptedException
{
IMocksControl c = EasyMock.createNiceControl();
- BundleContext ctx = c.createMock(BundleContext.class);
ZooKeeper zk = c.createMock(ZooKeeper.class);
String path = ENDPOINT_PATH;
@@ -53,7 +52,8 @@ public class PublishingEndpointListenerTest extends TestCase {
c.replay();
- PublishingEndpointListener eli = new PublishingEndpointListener(zk, ctx);
+ ZookeeperEndpointRepository repository = new ZookeeperEndpointRepository(zk);
+ PublishingEndpointListener eli = new PublishingEndpointListener(repository);
EndpointDescription endpoint = createEndpoint();
eli.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), null);
eli.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), null); // should
do nothing
@@ -63,23 +63,6 @@ public class PublishingEndpointListenerTest extends TestCase {
c.verify();
}
- public void testClose() throws KeeperException, InterruptedException {
- IMocksControl c = EasyMock.createNiceControl();
- BundleContext ctx = c.createMock(BundleContext.class);
- ZooKeeper zk = c.createMock(ZooKeeper.class);
- expectCreated(zk, ENDPOINT_PATH);
- expectDeleted(zk, ENDPOINT_PATH);
-
- c.replay();
-
- PublishingEndpointListener eli = new PublishingEndpointListener(zk, ctx);
- EndpointDescription endpoint = createEndpoint();
- eli.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), null);
- eli.close(); // should result in zk.delete(...)
-
- c.verify();
- }
-
private void expectCreated(ZooKeeper zk, String path) throws KeeperException, InterruptedException
{
expect(zk.create(EasyMock.eq(path),
(byte[])EasyMock.anyObject(),
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
new file mode 100644
index 0000000..3a20f5a
--- /dev/null
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
@@ -0,0 +1,116 @@
+package org.apache.aries.rsa.discovery.zookeeper.repository;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.osgi.framework.Constants;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointEvent;
+import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+
+public class ZookeeperEndpointRepositoryTest {
+
+ private ZooKeeperServer server;
+ private ZooKeeper zk;
+ private ServerCnxnFactory factory;
+
+ @Before
+ public void before() throws IOException, InterruptedException, KeeperException {
+ File target = new File("target");
+ File zookeeperDir = new File(target, "zookeeper");
+ server = new ZooKeeperServer(zookeeperDir, zookeeperDir, 2000);
+ factory = new NIOServerCnxnFactory();
+ int clientPort = getClientPort();
+ factory.configure(new InetSocketAddress(clientPort), 10);
+ factory.startup(server);
+ Watcher watcher = new Watcher() {
+
+ @Override
+ public void process(WatchedEvent event) {
+ System.out.println(event);
+ }
+
+ };
+ zk = new ZooKeeper("localhost:" + server.getClientPort(), 1000, watcher);
+ printNodes("/");
+ }
+
+ private int getClientPort() throws IOException {
+ try (ServerSocket serverSocket = new ServerSocket(0)) {
+ return serverSocket.getLocalPort();
+ }
+ }
+
+ @After
+ public void after() throws InterruptedException {
+ zk.close();
+ factory.shutdown();
+ }
+
+ @Test
+ public void test() throws IOException, URISyntaxException, KeeperException, InterruptedException
{
+ EndpointEventListener listener = new EndpointEventListener() {
+
+ @Override
+ public void endpointChanged(EndpointEvent event, String filter) {
+ System.out.println(event);
+ }
+ };
+ ZookeeperEndpointRepository repository = new ZookeeperEndpointRepository(zk, listener);
+ EndpointDescription endpoint = createEndpoint();
+ repository.add(endpoint);
+
+ String path = "/osgi/service_registry/java/lang/Runnable/test.de#-1##service1";
+ EndpointDescription ep2 = repository.read(path);
+ repository.close();
+ }
+
+ @Test
+ public void testGetZooKeeperPath() {
+ assertEquals(ZookeeperEndpointRepository.PATH_PREFIX + '/' + "org/example/Test",
+ ZookeeperEndpointRepository.getZooKeeperPath("org.example.Test"));
+
+ // used for the recursive discovery
+ assertEquals(ZookeeperEndpointRepository.PATH_PREFIX, ZookeeperEndpointRepository.getZooKeeperPath(null));
+ assertEquals(ZookeeperEndpointRepository.PATH_PREFIX, ZookeeperEndpointRepository.getZooKeeperPath(""));
+ }
+
+ private EndpointDescription createEndpoint() {
+ Map<String, Object> props = new HashMap<>();
+ props.put(Constants.OBJECTCLASS, new String[] {Runnable.class.getName()});
+ props.put(RemoteConstants.ENDPOINT_ID, "http://test.de/service1");
+ props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "my");
+
+ EndpointDescription endpoint = new EndpointDescription(props);
+ return endpoint;
+ }
+
+ public void printNodes(String path) throws KeeperException, InterruptedException {
+ List<String> children = zk.getChildren(path, false);
+ for (String child : children) {
+ String newPath = path.endsWith("/") ? path : path + "/";
+ String fullPath = newPath + child;
+ System.out.println(fullPath);
+ printNodes(fullPath);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
index 53ddbc4..e09cfbf 100644
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java
@@ -24,7 +24,7 @@ import static org.easymock.EasyMock.expectLastCall;
import java.util.Collections;
-import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
+import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
@@ -48,7 +48,7 @@ public class InterfaceMonitorTest extends TestCase {
String scope = "(myProp=test)";
String interf = "es.schaaf.test";
- String node = Utils.getZooKeeperPath(interf);
+ String node = ZookeeperEndpointRepository.getZooKeeperPath(interf);
EndpointEventListener endpointListener = c.createMock(EndpointEventListener.class);
InterfaceMonitor im = new InterfaceMonitor(zk, interf, endpointListener, scope);
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/f07ee8b5/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/util/UtilsTest.java
----------------------------------------------------------------------
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/util/UtilsTest.java
b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/util/UtilsTest.java
deleted file mode 100644
index 4d41fb0..0000000
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/util/UtilsTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.util;
-
-import org.apache.aries.rsa.discovery.zookeeper.util.Utils;
-
-import junit.framework.TestCase;
-
-public class UtilsTest extends TestCase {
-
- public void testGetZooKeeperPath() {
- assertEquals(Utils.PATH_PREFIX + '/' + "org/example/Test",
- Utils.getZooKeeperPath("org.example.Test"));
-
- // used for the recursive discovery
- assertEquals(Utils.PATH_PREFIX, Utils.getZooKeeperPath(null));
- assertEquals(Utils.PATH_PREFIX, Utils.getZooKeeperPath(""));
- }
-
-
-}
|