qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject qpid-jms git commit: https://issues.apache.org/jira/browse/QPIDJMS-46
Date Thu, 07 May 2015 23:19:33 GMT
Repository: qpid-jms
Updated Branches:
  refs/heads/master 8c9456bfd -> 5b3c02920


https://issues.apache.org/jira/browse/QPIDJMS-46

Improvements in the discovery module. 

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/5b3c0292
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/5b3c0292
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/5b3c0292

Branch: refs/heads/master
Commit: 5b3c02920f336df0596ef3c7aa93283a4f369387
Parents: 8c9456b
Author: Timothy Bish <tabish121@gmail.com>
Authored: Thu May 7 19:19:17 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Thu May 7 19:19:17 2015 -0400

----------------------------------------------------------------------
 .../jms/provider/discovery/DiscoveryAgent.java  |  20 ++
 .../jms/provider/discovery/DiscoveryEvent.java  |  45 ----
 .../provider/discovery/DiscoveryListener.java   |  14 +-
 .../provider/discovery/DiscoveryProvider.java   |  99 ++++---
 .../discovery/DiscoveryProviderFactory.java     |  52 ++--
 .../file/FileWatcherDiscoveryAgent.java         | 260 +++++++++++++++++++
 .../file/FileWatcherDiscoveryAgentFactory.java  |  55 ++++
 .../discovery/multicast/DiscoveryEvent.java     |  47 ++++
 .../multicast/MulticastDiscoveryAgent.java      |  24 +-
 .../MulticastDiscoveryAgentFactory.java         |  11 +-
 .../discovery/multicast/PacketParser.java       |   2 -
 .../multicast/parsers/ActiveMQPacketParser.java |  27 +-
 .../org/apache/qpid/jms/provider/agents/file    |  17 ++
 .../jms/discovery/FileWatcherDiscoveryTest.java | 220 ++++++++++++++++
 .../src/test/resources/log4j.properties         |   2 +-
 15 files changed, 775 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryAgent.java
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryAgent.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryAgent.java
index 051f567..faa393b 100644
--- a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryAgent.java
+++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryAgent.java
@@ -17,6 +17,7 @@
 package org.apache.qpid.jms.provider.discovery;
 
 import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
 
 /**
  * Interface for all agents used to detect instances of remote peers on the network.
@@ -24,6 +25,25 @@ import java.io.IOException;
 public interface DiscoveryAgent {
 
     /**
+     * Indicates if this DiscoveryAgent requires a ScheduledExecutorService in order
+     * to perform its discovery work.
+     *
+     * @returns true if the agent requires that its parent provide it with a scheduler.
+     */
+    boolean isSchedulerRequired();
+
+    /**
+     * Provider a ScheduledExecutorService to the DiscoveryAgent that requires a
+     * scheduler to perform its discovery work.  If the agent performs long polling
+     * style operations such as a socket read then it should not use the provided
+     * scheduler as that could block other agents from performing their own work.
+     *
+     * @param scheduler
+     *        An initialized Scheduler service that this agent can use for its work.
+     */
+    void setScheduler(ScheduledExecutorService scheduler);
+
+    /**
      * Sets the discovery listener
      *
      * @param listener

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryEvent.java
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryEvent.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryEvent.java
deleted file mode 100644
index 0fc2f29..0000000
--- a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryEvent.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.provider.discovery;
-
-/**
- * Event class used to convey discovered remote peer information to the
- * DiscoveryProvider.
- */
-public class DiscoveryEvent {
-
-    public enum EventType {
-        ALIVE,
-        SHUTDOWN
-    };
-
-    private final String peerUri;
-    private final EventType type;
-
-    public DiscoveryEvent(String peerUri, EventType type) {
-        this.peerUri = peerUri;
-        this.type = type;
-    }
-
-    public String getPeerUri() {
-        return peerUri;
-    }
-
-    public EventType getType() {
-        return type;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryListener.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryListener.java
index 07e9895..e92bbb2 100644
--- a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryListener.java
+++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryListener.java
@@ -16,6 +16,8 @@
  */
 package org.apache.qpid.jms.provider.discovery;
 
+import java.net.URI;
+
 /**
  * A listener of services being added or removed from a network
  */
@@ -24,17 +26,17 @@ public interface DiscoveryListener {
     /**
      * Called when a DiscoveryAgent becomes aware of a new remote peer.
      *
-     * @param event
-     *        the event data which contains the peer address and optional name.
+     * @param remoteURI
+     *        the URI of the newly discovered peer.
      */
-    void onServiceAdd(DiscoveryEvent event);
+    void onServiceAdd(URI remoteURI);
 
     /**
      * Called when a DiscoveryAgent can no longer detect a previously known remote peer.
      *
-     * @param event
-     *        the event data which contains the peer address and optional name.
+     * @param remoteURI
+     *        the URI of the previously discovered peer that is no longer active.
      */
-    void onServiceRemove(DiscoveryEvent event);
+    void onServiceRemove(URI remoteURI);
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProvider.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProvider.java
index 1d3a0f0..739a7a0 100644
--- a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProvider.java
+++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProvider.java
@@ -18,26 +18,31 @@ package org.apache.qpid.jms.provider.discovery;
 
 import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.qpid.jms.provider.ProviderWrapper;
 import org.apache.qpid.jms.provider.failover.FailoverProvider;
+import org.apache.qpid.jms.util.ThreadPoolUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * An AsyncProvider instance that wraps the FailoverProvider and listens for
- * events about discovered remote peers using a configured DiscoveryAgent
- * instance.
+ * An Provider instance that wraps the FailoverProvider and listens for events
+ * about discovered remote peers using a configured set of DiscoveryAgent instance.
  */
 public class DiscoveryProvider extends ProviderWrapper<FailoverProvider> implements DiscoveryListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(DiscoveryProviderFactory.class);
 
     private final URI discoveryUri;
-    private DiscoveryAgent discoveryAgent;
-    private final ConcurrentHashMap<String, URI> serviceURIs = new ConcurrentHashMap<String, URI>();
+    private final List<DiscoveryAgent> discoveryAgents = new ArrayList<DiscoveryAgent>();
+
+    private ScheduledExecutorService sharedScheduler;
 
     /**
      * Creates a new instance of the DiscoveryProvider.
@@ -56,19 +61,29 @@ public class DiscoveryProvider extends ProviderWrapper<FailoverProvider> impleme
 
     @Override
     public void start() throws IOException, IllegalStateException {
-        if (this.discoveryAgent == null) {
+        if (discoveryAgents.isEmpty()) {
             throw new IllegalStateException("No DiscoveryAgent configured.");
         }
 
-        discoveryAgent.setDiscoveryListener(this);
-        discoveryAgent.start();
+        for (DiscoveryAgent discoveryAgent : discoveryAgents) {
+            discoveryAgent.setDiscoveryListener(this);
+            if (discoveryAgent.isSchedulerRequired()) {
+                discoveryAgent.setScheduler(getSharedScheduler());
+            }
+            discoveryAgent.start();
+        }
 
         super.start();
     }
 
     @Override
     public void close() {
-        discoveryAgent.close();
+        ThreadPoolUtils.shutdownGraceful(sharedScheduler);
+
+        for (DiscoveryAgent discoveryAgent : discoveryAgents) {
+            discoveryAgent.close();
+        }
+
         super.close();
     }
 
@@ -82,10 +97,10 @@ public class DiscoveryProvider extends ProviderWrapper<FailoverProvider> impleme
     }
 
     /**
-     * @return the configured DiscoveryAgent instance used by this DiscoveryProvider.
+     * @return a list of configured DiscoveryAgent instances used by this DiscoveryProvider.
      */
-    public DiscoveryAgent getDiscoveryAgent() {
-        return this.discoveryAgent;
+    public List<DiscoveryAgent> getDiscoveryAgents() {
+        return Collections.unmodifiableList(discoveryAgents);
     }
 
     /**
@@ -94,32 +109,25 @@ public class DiscoveryProvider extends ProviderWrapper<FailoverProvider> impleme
      * @param agent
      *        the agent to use to discover remote peers
      */
-    public void setDiscoveryAgent(DiscoveryAgent agent) {
-        this.discoveryAgent = agent;
+    public void setDiscoveryAgents(List<DiscoveryAgent> agents) {
+        discoveryAgents.addAll(agents);
     }
 
     //------------------- Discovery Event Handlers ---------------------------//
 
     @Override
-    public void onServiceAdd(DiscoveryEvent event) {
-        String url = event.getPeerUri();
-        if (url != null) {
-            try {
-                URI uri = new URI(url);
-                LOG.info("Adding new peer connection URL: {}", uri);
-                serviceURIs.put(event.getPeerUri(), uri);
-                next.add(uri);
-            } catch (URISyntaxException e) {
-                LOG.warn("Could not add remote URI: {} due to bad URI syntax: {}", url, e.getMessage());
-            }
+    public void onServiceAdd(URI remoteURI) {
+        if (remoteURI != null) {
+            LOG.debug("Adding URI of remote peer: {}", remoteURI);
+            next.add(remoteURI);
         }
     }
 
     @Override
-    public void onServiceRemove(DiscoveryEvent event) {
-        URI uri = serviceURIs.get(event.getPeerUri());
-        if (uri != null) {
-            next.remove(uri);
+    public void onServiceRemove(URI remoteURI) {
+        if (remoteURI != null) {
+            LOG.debug("Removing URI of remote peer: {}", remoteURI);
+            next.remove(remoteURI);
         }
     }
 
@@ -127,13 +135,38 @@ public class DiscoveryProvider extends ProviderWrapper<FailoverProvider> impleme
 
     @Override
     public void onConnectionInterrupted(URI remoteURI) {
-        this.discoveryAgent.resume();
+        for (DiscoveryAgent discoveryAgent : discoveryAgents) {
+            discoveryAgent.resume();
+        }
+
         super.onConnectionInterrupted(remoteURI);
     }
 
     @Override
     public void onConnectionRestored(URI remoteURI) {
-        this.discoveryAgent.suspend();
+        for (DiscoveryAgent discoveryAgent : discoveryAgents) {
+            discoveryAgent.suspend();
+        }
+
         super.onConnectionRestored(remoteURI);
     }
+
+    //----- Internal implementation ------------------------------------------//
+
+    private ScheduledExecutorService getSharedScheduler() {
+        if (sharedScheduler == null) {
+            sharedScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+
+                @Override
+                public Thread newThread(Runnable runner) {
+                    Thread serial = new Thread(runner);
+                    serial.setDaemon(true);
+                    serial.setName(DiscoveryProvider.this.getClass().getSimpleName() + ":[" + getDiscoveryURI() + "]");
+                    return serial;
+                }
+            });
+        }
+
+        return sharedScheduler;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProviderFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProviderFactory.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProviderFactory.java
index 33d4990..a989f49 100644
--- a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProviderFactory.java
+++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProviderFactory.java
@@ -17,7 +17,8 @@
 package org.apache.qpid.jms.provider.discovery;
 
 import java.net.URI;
-import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.qpid.jms.provider.Provider;
@@ -32,7 +33,15 @@ import org.apache.qpid.jms.util.URISupport.CompositeData;
  */
 public class DiscoveryProviderFactory extends ProviderFactory {
 
-    private static final String DISCOVERED_OPTION_PREFIX = "discovered.";
+    /**
+     * Prefix used for all properties that apply specifically to the DiscoveryProvider
+     */
+    public static final String DISCOVERY_OPTION_PREFIX = "discovery.";
+
+    /**
+     * Prefix used for all properties that should be applied to any discovered remote URIs.
+     */
+    public static final String DISCOVERED_OPTION_PREFIX = "discovered.";
 
     @Override
     public Provider createProvider(URI remoteURI) throws Exception {
@@ -40,28 +49,33 @@ public class DiscoveryProviderFactory extends ProviderFactory {
         CompositeData composite = URISupport.parseComposite(remoteURI);
         Map<String, String> options = composite.getParameters();
 
-        // We currently only allow for one agent to feed URIs to the embedded FailoverProvider
-        // in the DiscoveryProvider.  We could allow more in the future if we found that to be
-        // a useful feature.
-        if (composite.getComponents().size() > 1) {
-            throw new URISyntaxException(remoteURI.toString(), "Only one discovery agent can be specified");
-        }
+        Map<String, String> discoveryOptions = PropertyUtil.filterProperties(options, DISCOVERY_OPTION_PREFIX);
+        Map<String, String> discoveredOptions = PropertyUtil.filterProperties(options, DISCOVERED_OPTION_PREFIX);
 
         // Failover will apply the nested options to each URI while attempting to connect.
-        Map<String, String> nested = PropertyUtil.filterProperties(options, DISCOVERED_OPTION_PREFIX);
-        FailoverProvider failover = new FailoverProvider(nested);
-        PropertyUtil.setProperties(failover, options);
-
-        // TODO - Revisit URI options setting and enhance the ProperyUtils to provide a
-        //        means of setting some properties on a object and obtaining the leftovers
-        //        so we can pass those along to the next until we consume them all or we
-        //        have leftovers which implies a bad URI.
+        FailoverProvider failover = new FailoverProvider(discoveredOptions);
+        discoveryOptions = PropertyUtil.setProperties(failover, discoveryOptions);
 
         DiscoveryProvider discovery = new DiscoveryProvider(remoteURI, failover);
-        PropertyUtil.setProperties(discovery, options);
+        discoveryOptions = PropertyUtil.setProperties(discovery, discoveryOptions);
+
+        if (!discoveryOptions.isEmpty()) {
+            String msg = ""
+                + " Not all options could be set on the Discovery provider."
+                + " Check the options are spelled correctly."
+                + " Unused parameters=[" + discoveryOptions + "]."
+                + " This Provider cannot be started.";
+            throw new IllegalArgumentException(msg);
+        }
+
+        List<URI> agentURIs = composite.getComponents();
+        List<DiscoveryAgent> discoveryAgents = new ArrayList<DiscoveryAgent>(agentURIs.size());
+
+        for (URI agentURI : agentURIs) {
+            discoveryAgents.add(DiscoveryAgentFactory.createAgent(agentURI));
+        }
 
-        DiscoveryAgent agent = DiscoveryAgentFactory.createAgent(composite.getComponents().get(0));
-        discovery.setDiscoveryAgent(agent);
+        discovery.setDiscoveryAgents(discoveryAgents);
 
         return discovery;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/file/FileWatcherDiscoveryAgent.java
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/file/FileWatcherDiscoveryAgent.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/file/FileWatcherDiscoveryAgent.java
new file mode 100644
index 0000000..832d64c
--- /dev/null
+++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/file/FileWatcherDiscoveryAgent.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.discovery.file;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.jms.provider.discovery.DiscoveryAgent;
+import org.apache.qpid.jms.provider.discovery.DiscoveryListener;
+import org.apache.qpid.jms.util.ThreadPoolUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Discovery agent that watches a file and periodically reads in remote URIs
+ * from that file.
+ */
+public class FileWatcherDiscoveryAgent implements DiscoveryAgent {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileWatcherDiscoveryAgent.class);
+
+    private static final int DEFAULT_UPDATE_INTERVAL = 30000;
+
+    private ScheduledExecutorService scheduler;
+    private final Set<URI> discovered = new LinkedHashSet<URI>();
+
+    private final URI discoveryURI;
+    private final AtomicBoolean started = new AtomicBoolean(false);
+
+    private DiscoveryListener listener;
+    private int updateInterval = DEFAULT_UPDATE_INTERVAL;
+    private boolean warnOnWatchedReadError;
+
+    public FileWatcherDiscoveryAgent(URI discoveryURI) {
+        this.discoveryURI = discoveryURI;
+    }
+
+    @Override
+    public void setDiscoveryListener(DiscoveryListener listener) {
+        this.listener = listener;
+    }
+
+    public DiscoveryListener getDiscoveryListener() {
+        return this.listener;
+    }
+
+    @Override
+    public boolean isSchedulerRequired() {
+        return true;
+    }
+
+    @Override
+    public void setScheduler(ScheduledExecutorService scheduler) {
+        this.scheduler = scheduler;
+    }
+
+    @Override
+    public void start() throws IOException, IllegalStateException {
+        if (listener == null) {
+            throw new IllegalStateException("No DiscoveryListener configured.");
+        }
+
+        if (scheduler == null) {
+            throw new IllegalStateException("No scheduler service has been provided.");
+        }
+
+        if (started.compareAndSet(false, true)) {
+            scheduler.scheduleAtFixedRate(new Runnable() {
+
+                @Override
+                public void run() {
+                    LOG.debug("Performing watched resources scheduled update: {}", getDiscvoeryURI());
+                    updateWatchedResources();
+                }
+            }, 0, getUpdateInterval(), TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Override
+    public void close() {
+        if (started.compareAndSet(true, false)) {
+            ThreadPoolUtils.shutdownGraceful(scheduler);
+        }
+    }
+
+    @Override
+    public void suspend() {
+        // We don't suspend watching the file updates are passive.
+    }
+
+    @Override
+    public void resume() {
+        // We don't suspend watching the file updates are passive.
+    }
+
+    @Override
+    public String toString() {
+        return "FileWatcherDiscoveryAgent: listener:" + getDiscvoeryURI();
+    }
+
+    // ---------- Property Accessors ------------------------------------------//
+
+    /**
+     * @return the original URI used to create the Discovery Agent.
+     */
+    public URI getDiscvoeryURI() {
+        return this.discoveryURI;
+    }
+
+    /**
+     * @return the configured resource update interval.
+     */
+    public int getUpdateInterval() {
+        return updateInterval;
+    }
+
+    /**
+     * @param updateInterval
+     *        the update interval to use for watching resources for changes.
+     */
+    public void setUpdateInterval(int updateInterval) {
+        this.updateInterval = updateInterval;
+    }
+
+    //----- Internal implementation ------------------------------------------//
+
+    private void updateWatchedResources() {
+        String fileURL = getDiscvoeryURI().toString();
+        if (fileURL != null) {
+            BufferedReader in = null;
+            String newUris = null;
+            StringBuffer buffer = new StringBuffer();
+
+            try {
+                in = new BufferedReader(getURLStream(fileURL));
+                while (true) {
+                    String line = in.readLine();
+                    if (line == null) {
+                        break;
+                    }
+                    buffer.append(line);
+                }
+                newUris = buffer.toString();
+            } catch (IOException ioe) {
+                if (!warnOnWatchedReadError) {
+                    LOG.warn("Failed to read watched resource: " + fileURL);
+                    LOG.trace("Resource read error:", ioe);
+                } else {
+                    LOG.debug("Failed to read watched resource: " + fileURL);
+                    LOG.trace("Resource read error:", ioe);
+                }
+            } finally {
+                if (in != null) {
+                    try {
+                        in.close();
+                    } catch (IOException ioe) {
+                        // ignore
+                    }
+                }
+            }
+
+            processURIs(newUris);
+        }
+    }
+
+    private InputStreamReader getURLStream(String path) throws IOException {
+        InputStreamReader result = null;
+        URL url = null;
+
+        try {
+            url = new URL(path);
+            result = new InputStreamReader(url.openStream());
+        } catch (MalformedURLException e) {
+            // ignore - it could be a path to a a local file
+        }
+
+        if (result == null) {
+            result = new FileReader(path);
+        }
+
+        return result;
+    }
+
+    private final void processURIs(String updatedURIs) {
+        if (updatedURIs != null) {
+            updatedURIs = updatedURIs.trim();
+            if (!updatedURIs.isEmpty()) {
+                List<URI> list = new ArrayList<URI>();
+                StringTokenizer tokenizer = new StringTokenizer(updatedURIs, ",");
+                while (tokenizer.hasMoreTokens()) {
+                    String str = tokenizer.nextToken();
+                    try {
+                        URI uri = new URI(str);
+                        list.add(uri);
+                    } catch (Exception e) {
+                        LOG.error("Failed to parse broker address: " + str, e);
+                    }
+                }
+                if (list.isEmpty() == false) {
+                    try {
+                        updateURIs(list);
+                    } catch (IOException e) {
+                        LOG.error("Failed to update transport URI's from: " + updatedURIs, e);
+                    }
+                }
+            }
+        }
+    }
+
+    private void updateURIs(List<URI> updates) throws IOException {
+
+        // Remove any previously discovered URIs that are no longer in the watched resource
+        HashSet<URI> removedPeers = new HashSet<URI>(discovered);
+        removedPeers.removeAll(updates);
+
+        for (URI removed : removedPeers) {
+            listener.onServiceRemove(removed);
+        }
+
+        // Only add the newly discovered remote peers
+        HashSet<URI> addedPeers = new HashSet<URI>(updates);
+        addedPeers.removeAll(discovered);
+
+        for (URI addition : addedPeers) {
+            listener.onServiceAdd(addition);
+        }
+
+        // Now just store the new set of URIs and advertise them.
+        discovered.clear();
+        discovered.addAll(updates);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/file/FileWatcherDiscoveryAgentFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/file/FileWatcherDiscoveryAgentFactory.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/file/FileWatcherDiscoveryAgentFactory.java
new file mode 100644
index 0000000..e1108b4
--- /dev/null
+++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/file/FileWatcherDiscoveryAgentFactory.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.discovery.file;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.qpid.jms.provider.discovery.DiscoveryAgent;
+import org.apache.qpid.jms.provider.discovery.DiscoveryAgentFactory;
+import org.apache.qpid.jms.util.PropertyUtil;
+import org.apache.qpid.jms.util.URISupport;
+
+/**
+ * Creates and configures a new instance of the file watcher based agent.
+ */
+public class FileWatcherDiscoveryAgentFactory extends DiscoveryAgentFactory {
+
+    @Override
+    public DiscoveryAgent createDiscoveryAgent(URI discoveryURI) throws Exception {
+        FileWatcherDiscoveryAgent agent = new FileWatcherDiscoveryAgent(discoveryURI);
+
+        Map<String, String> options = URISupport.parseParameters(discoveryURI);
+        options = PropertyUtil.setProperties(agent, options);
+
+        if (!options.isEmpty()) {
+            String msg = ""
+                + " Not all options could be set on the File Watcher discovery."
+                + " agent.  Check the options are spelled correctly."
+                + " Unused parameters=[" + options + "]."
+                + " This agent cannot be started.";
+            throw new IllegalArgumentException(msg);
+        }
+
+        return agent;
+    }
+
+    @Override
+    public String getName() {
+        return "File-Watcher";
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/DiscoveryEvent.java
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/DiscoveryEvent.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/DiscoveryEvent.java
new file mode 100644
index 0000000..99b40fb
--- /dev/null
+++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/DiscoveryEvent.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.discovery.multicast;
+
+import java.net.URI;
+
+/**
+ * Event class used to convey discovered remote peer information to the
+ * DiscoveryProvider.
+ */
+public class DiscoveryEvent {
+
+    public enum EventType {
+        ALIVE,
+        SHUTDOWN
+    };
+
+    private final URI peerUri;
+    private final EventType type;
+
+    public DiscoveryEvent(URI peerUri, EventType type) {
+        this.peerUri = peerUri;
+        this.type = type;
+    }
+
+    public URI getPeerUri() {
+        return peerUri;
+    }
+
+    public EventType getType() {
+        return type;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgent.java
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgent.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgent.java
index 885f70c..9a67458 100644
--- a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgent.java
+++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgent.java
@@ -35,12 +35,12 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.qpid.jms.provider.discovery.DiscoveryAgent;
-import org.apache.qpid.jms.provider.discovery.DiscoveryEvent;
-import org.apache.qpid.jms.provider.discovery.DiscoveryEvent.EventType;
 import org.apache.qpid.jms.provider.discovery.DiscoveryListener;
+import org.apache.qpid.jms.provider.discovery.multicast.DiscoveryEvent.EventType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,7 +70,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
     private URI discoveryURI;
     private int timeToLive = 1;
     private boolean loopBackMode;
-    private final Map<String, RemoteBrokerData> brokersByService = new ConcurrentHashMap<String, RemoteBrokerData>();
+    private final Map<URI, RemoteBrokerData> brokersByService = new ConcurrentHashMap<URI, RemoteBrokerData>();
     private String group = "default";
     private InetAddress inetAddress;
     private SocketAddress sockAddress;
@@ -98,6 +98,16 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
     }
 
     @Override
+    public void setScheduler(ScheduledExecutorService scheduler) {
+        // Not needed for this agent
+    }
+
+    @Override
+    public boolean isSchedulerRequired() {
+        return false;
+    }
+
+    @Override
     public void start() throws IOException, IllegalStateException {
         if (listener == null) {
             throw new IllegalStateException("No DiscoveryListener configured.");
@@ -233,7 +243,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
     private void processAlive(DiscoveryEvent event) {
         RemoteBrokerData data = brokersByService.get(event.getPeerUri());
         if (data == null) {
-            String peerUri = event.getPeerUri();
+            URI peerUri = event.getPeerUri();
             data = new RemoteBrokerData(event.getPeerUri());
             brokersByService.put(peerUri, data);
             fireServiceAddEvent(data);
@@ -261,13 +271,13 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
 
     private void fireServiceRemovedEvent(final RemoteBrokerData data) {
         if (listener != null && started.get()) {
-            listener.onServiceRemove(data);
+            listener.onServiceRemove(data.getPeerUri());
         }
     }
 
     private void fireServiceAddEvent(final RemoteBrokerData data) {
         if (listener != null && started.get()) {
-            listener.onServiceAdd(data);
+            listener.onServiceAdd(data.getPeerUri());
         }
     }
 
@@ -436,7 +446,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
 
         long lastHeartBeat;
 
-        public RemoteBrokerData(String peerUri) {
+        public RemoteBrokerData(URI peerUri) {
             super(peerUri, EventType.ALIVE);
             this.lastHeartBeat = System.currentTimeMillis();
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgentFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgentFactory.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgentFactory.java
index 6730f33..a20a4b2 100644
--- a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgentFactory.java
+++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/MulticastDiscoveryAgentFactory.java
@@ -35,7 +35,16 @@ public class MulticastDiscoveryAgentFactory extends DiscoveryAgentFactory {
     public DiscoveryAgent createDiscoveryAgent(URI discoveryURI) throws Exception {
         MulticastDiscoveryAgent agent = new MulticastDiscoveryAgent(discoveryURI);
         Map<String, String> options = URISupport.parseParameters(discoveryURI);
-        PropertyUtil.setProperties(agent, options);
+
+        options = PropertyUtil.setProperties(agent, options);
+        if (!options.isEmpty()) {
+            String msg = ""
+                + " Not all options could be set on the Multicast discovery."
+                + " agent.  Check the options are spelled correctly."
+                + " Unused parameters=[" + options + "]."
+                + " This agent cannot be started.";
+            throw new IllegalArgumentException(msg);
+        }
 
         String service = agent.getService();
         if (service == null || service.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/PacketParser.java
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/PacketParser.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/PacketParser.java
index 294234e..3c45bd6 100644
--- a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/PacketParser.java
+++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/PacketParser.java
@@ -16,8 +16,6 @@
  */
 package org.apache.qpid.jms.provider.discovery.multicast;
 
-import org.apache.qpid.jms.provider.discovery.DiscoveryEvent;
-
 /**
  * Interface for a DatagramPacket parser object which is used by the
  * MulticastDiscoveryAget to parse incoming packets to determine the

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/parsers/ActiveMQPacketParser.java
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/parsers/ActiveMQPacketParser.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/parsers/ActiveMQPacketParser.java
index 1cb2935..e2fa1d1 100644
--- a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/parsers/ActiveMQPacketParser.java
+++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/multicast/parsers/ActiveMQPacketParser.java
@@ -16,15 +16,22 @@
  */
 package org.apache.qpid.jms.provider.discovery.multicast.parsers;
 
-import org.apache.qpid.jms.provider.discovery.DiscoveryEvent;
-import org.apache.qpid.jms.provider.discovery.DiscoveryEvent.EventType;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.qpid.jms.provider.discovery.multicast.DiscoveryEvent;
+import org.apache.qpid.jms.provider.discovery.multicast.DiscoveryEvent.EventType;
 import org.apache.qpid.jms.provider.discovery.multicast.PacketParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Parser instance for ActiveMQ multicast discovery processing.
  */
 public class ActiveMQPacketParser implements PacketParser {
 
+    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQPacketParser.class);
+
     private static final String TYPE_SUFFIX = "ActiveMQ-4.";
     private static final String ALIVE = "alive.";
     private static final String DEAD = "dead.";
@@ -50,12 +57,20 @@ public class ActiveMQPacketParser implements PacketParser {
             String payload = str.substring(getType().length());
             if (payload.startsWith(ALIVE)) {
                 String brokerName = getBrokerName(payload.substring(ALIVE.length()));
-                String brokerUri = payload.substring(ALIVE.length() + brokerName.length() + 2);
-                event = new DiscoveryEvent(brokerUri, EventType.ALIVE);
+                try {
+                    String brokerUri = payload.substring(ALIVE.length() + brokerName.length() + 2);
+                    event = new DiscoveryEvent(new URI(brokerUri), EventType.ALIVE);
+                } catch (URISyntaxException ex) {
+                    LOG.warn("Published URI has invalid URI syntax, ignoring: {}", payload);
+                }
             } else {
                 String brokerName = getBrokerName(payload.substring(DEAD.length()));
-                String brokerUri = payload.substring(DEAD.length() + brokerName.length() + 2);
-                event = new DiscoveryEvent(brokerUri, EventType.SHUTDOWN);
+                try {
+                    String brokerUri = payload.substring(DEAD.length() + brokerName.length() + 2);
+                    event = new DiscoveryEvent(new URI(brokerUri), EventType.SHUTDOWN);
+                } catch (URISyntaxException ex) {
+                    LOG.warn("Published URI has invalid URI syntax, ignoring: {}", payload);
+                }
             }
         }
         return event;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-discovery/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/agents/file
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/agents/file b/qpid-jms-discovery/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/agents/file
new file mode 100644
index 0000000..0bf8b3a
--- /dev/null
+++ b/qpid-jms-discovery/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/agents/file
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.qpid.jms.provider.discovery.file.FileWatcherDiscoveryAgentFactory

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java
new file mode 100644
index 0000000..1c22e25
--- /dev/null
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/FileWatcherDiscoveryTest.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.discovery;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsConnectionListener;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test that the file watcher Discovery Provider finds a broker URI in
+ * the file it is directed to watch.
+ */
+public class FileWatcherDiscoveryTest extends AmqpTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileWatcherDiscoveryTest.class);
+
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder(new File("./target"));
+
+    private CountDownLatch connected;
+    private CountDownLatch interrupted;
+    private CountDownLatch restored;
+    private JmsConnection jmsConnection;
+
+    private File primaryBrokerList;
+    private File secondaryBrokerList;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+
+        connected = new CountDownLatch(1);
+        interrupted = new CountDownLatch(1);
+        restored = new CountDownLatch(1);
+
+        primaryBrokerList = folder.newFile("primaryBrokerURIsFile.txt");
+        secondaryBrokerList = folder.newFile("secondaryBrokerURIsFile.txt");
+
+        LOG.info("Broker URIs going to file: {}", primaryBrokerList);
+
+        writeOutBrokerURIsToFile(primaryBrokerList);
+    }
+
+    @Test(timeout = 60000)
+    public void testConnectedToStoredBrokerURI() throws Exception {
+        assertTrue(primaryBrokerList.exists());
+
+        connection = createConnection();
+        connection.start();
+
+        assertTrue("connection never connected.", connected.await(30, TimeUnit.SECONDS));
+    }
+
+    @Test(timeout = 60000)
+    public void testReconnectWhenURIUpdates() throws Exception {
+        assertTrue(primaryBrokerList.exists());
+
+        connection = createConnection();
+        connection.start();
+
+        assertTrue("connection never connected.", connected.await(30, TimeUnit.SECONDS));
+
+        stopPrimaryBroker();
+
+        assertTrue("connection should be interrupted.", interrupted.await(30, TimeUnit.SECONDS));
+
+        startPrimaryBroker();
+
+        writeOutBrokerURIsToFile(primaryBrokerList);
+
+        assertTrue("connection should have been reestablished.", restored.await(30, TimeUnit.SECONDS));
+    }
+
+    @Test(timeout = 60000)
+    public void testReconnectUsingTwoFiles() throws Exception {
+        assertTrue(primaryBrokerList.exists());
+        assertTrue(secondaryBrokerList.exists());
+
+        connection = createConnection(new File[]{ primaryBrokerList, secondaryBrokerList });
+        connection.start();
+
+        assertTrue("connection never connected.", connected.await(30, TimeUnit.SECONDS));
+
+        stopPrimaryBroker();
+
+        assertTrue("connection should be interrupted.", interrupted.await(30, TimeUnit.SECONDS));
+
+        startPrimaryBroker();
+
+        writeOutBrokerURIsToFile(secondaryBrokerList);
+
+        assertTrue("connection should have been reestablished.", restored.await(30, TimeUnit.SECONDS));
+    }
+
+    @Test(timeout = 60000)
+    public void testWithInitiallyNonExistingFile() throws Exception {
+        assertTrue(primaryBrokerList.exists());
+
+        final String FILENAME = "nonExistentFile.txt";
+
+        File nonExistentFile = new File(folder.getRoot(), FILENAME);
+        assertFalse(nonExistentFile.exists());
+
+        connection = createConnection(new File[]{ primaryBrokerList, nonExistentFile });
+        connection.start();
+
+        assertTrue("connection never connected.", connected.await(30, TimeUnit.SECONDS));
+
+        stopPrimaryBroker();
+
+        assertTrue("connection should be interrupted.", interrupted.await(30, TimeUnit.SECONDS));
+
+        startPrimaryBroker();
+
+        folder.newFile(FILENAME);
+        assertTrue(nonExistentFile.exists());
+
+        writeOutBrokerURIsToFile(nonExistentFile);
+
+        assertTrue("connection should have been reestablished.", restored.await(30, TimeUnit.SECONDS));
+    }
+
+    protected Connection createConnection() throws Exception {
+        return createConnection(new File[]{ primaryBrokerList });
+    }
+
+    protected Connection createConnection(File[] filesToWatch) throws Exception {
+
+        StringBuilder fileURIs = new StringBuilder();
+        for (File file : filesToWatch) {
+            if (fileURIs.length() == 0) {
+                fileURIs.append(file.toURI());
+                fileURIs.append("?updateInterval=1000");
+            } else {
+                fileURIs.append(",");
+                fileURIs.append(file.toURI());
+                fileURIs.append("?updateInterval=1000");
+            }
+        }
+
+        JmsConnectionFactory factory = new JmsConnectionFactory(
+            "discovery:(" + fileURIs.toString() + ")?discovery.maxReconnectDelay=500");
+        connection = factory.createConnection();
+
+        jmsConnection = (JmsConnection) connection;
+        jmsConnection.addConnectionListener(new JmsConnectionListener() {
+
+            @Override
+            public void onConnectionEstablished(URI remoteURI) {
+                LOG.info("Connection reports established.  Connected to -> {}", remoteURI);
+                connected.countDown();
+            }
+
+            @Override
+            public void onConnectionInterrupted(URI remoteURI) {
+                LOG.info("Connection reports interrupted. Lost connection to -> {}", remoteURI);
+                interrupted.countDown();
+            }
+
+            @Override
+            public void onConnectionRestored(URI remoteURI) {
+                LOG.info("Connection reports restored.  Connected to -> {}", remoteURI);
+                restored.countDown();
+            }
+
+            @Override
+            public void onConnectionFailure(Throwable error) {
+            }
+
+            @Override
+            public void onInboundMessage(JmsInboundMessageDispatch envelope) {
+            }
+        });
+
+        return connection;
+    }
+
+    protected void writeOutBrokerURIsToFile(File targetFile) throws Exception {
+        try (FileOutputStream out = new FileOutputStream(targetFile);) {
+            for (URI brokerURI : getBrokerURIs()) {
+                LOG.info("Broker URI being written: {}", brokerURI);
+                out.write(brokerURI.toString().getBytes("UTF-8"));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5b3c0292/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties
index f69efa0..2b107ef 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties
@@ -21,7 +21,7 @@
 log4j.rootLogger=INFO, out, stdout
 
 log4j.logger.org.apache.qpid.jms=INFO
-log4j.logger.org.apache.qpid.jms.provider.failover=TRACE
+log4j.logger.org.apache.qpid.jms.provider=DEBUG
 
 # Tune the ActiveMQ and it's AMQP transport as needed for debugging.
 log4j.logger.org.apache.activemq=INFO


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message