ace-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject svn commit: r1521918 - in /ace/trunk: org.apache.ace.agent.itest/src/org/apache/ace/agent/itest/ org.apache.ace.agent/src/org/apache/ace/agent/ org.apache.ace.agent/src/org/apache/ace/agent/impl/ org.apache.ace.agent/test/org/apache/ace/agent/impl/
Date Wed, 11 Sep 2013 16:04:55 GMT
Author: jawi
Date: Wed Sep 11 16:04:54 2013
New Revision: 1521918

URL: http://svn.apache.org/r1521918
Log:
ACE-323 - initial set of improvements for the MA update logic:

- keep track whether the last attempt to install a version
  failed or not, and if so, do not try to install it again,
  unless a newer version is available;
- added specific events when trying to install & complete 
  the installation of a deployment package. This way, we can
  better keep track when a DP is installed and whether or not
  it is completed successfully. The events of the DA are not
  sufficient for this, as the DA does not always sent events
  (particulary in case of failures);
- refactored the itests of the new MA to make them more 
  specific/granular and fixed several issues to make the tests
  more robust and less prone to previous failures (in case a
  test might fail).


Modified:
    ace/trunk/org.apache.ace.agent.itest/src/org/apache/ace/agent/itest/AgentDeploymentTest.java
    ace/trunk/org.apache.ace.agent.itest/src/org/apache/ace/agent/itest/BaseAgentTest.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadResult.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/Activator.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadResultImpl.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventLoggerImpl.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventsHandlerImpl.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/InternalConstants.java
    ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/CustomControllerTest.java
    ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/DownloadHandlerTest.java

Modified: ace/trunk/org.apache.ace.agent.itest/src/org/apache/ace/agent/itest/AgentDeploymentTest.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent.itest/src/org/apache/ace/agent/itest/AgentDeploymentTest.java?rev=1521918&r1=1521917&r2=1521918&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent.itest/src/org/apache/ace/agent/itest/AgentDeploymentTest.java (original)
+++ ace/trunk/org.apache.ace.agent.itest/src/org/apache/ace/agent/itest/AgentDeploymentTest.java Wed Sep 11 16:04:54 2013
@@ -20,15 +20,15 @@ package org.apache.ace.agent.itest;
 
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintWriter;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
 
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
@@ -38,18 +38,16 @@ import javax.servlet.http.HttpServletRes
 import org.apache.ace.agent.AgentConstants;
 import org.apache.ace.agent.AgentControl;
 import org.apache.ace.agent.ConfigurationHandler;
+import org.apache.ace.agent.DeploymentHandler;
 import org.apache.ace.agent.EventListener;
-import org.apache.ace.agent.LoggingHandler;
-import org.apache.ace.builder.DeploymentPackageBuilder;
+import org.apache.ace.agent.LoggingHandler.Levels;
 import org.apache.felix.dm.Component;
+import org.osgi.framework.Bundle;
 import org.osgi.framework.Constants;
 import org.osgi.framework.ServiceReference;
 import org.osgi.framework.Version;
 import org.osgi.service.http.HttpService;
 
-import aQute.lib.osgi.Builder;
-import aQute.lib.osgi.Jar;
-
 /**
  * Tests updating the management agent. In fact it tests different failure paths first, and finally gets to update the
  * agent. The tests it does are:
@@ -66,185 +64,29 @@ public class AgentDeploymentTest extends
         EMPTY_STREAM, CORRUPT_STREAM, ABORT_STREAM, VERSIONS_RETRY_AFTER, DEPLOYMENT_RETRY_AFTER
     }
 
-    private volatile TestDeploymentServlet m_servlet;
-    private volatile HttpService m_http;
-    private volatile TestEventListener m_listener;
-
-    private final Version version1 = Version.parseVersion("1.0.0");
-    private final Version version2 = Version.parseVersion("2.0.0");
-    private final Version version3 = Version.parseVersion("3.0.0");
-    private final Version version4 = Version.parseVersion("4.0.0");
-    private final Version version5 = Version.parseVersion("5.0.0");
-    private final Version version6 = Version.parseVersion("6.0.0");
-
-    private TestPackage m_package1;
-    private TestPackage m_package2;
-    private TestPackage m_package3;
-    private TestPackage m_package4;
-    private TestPackage m_package5;
-    private TestPackage m_package6;
-
-    @Override
-    protected Component[] getDependencies() {
-        m_listener = new TestEventListener();
-        return new Component[] {
-            createComponent()
-                .setImplementation(this)
-                .add(createServiceDependency().setService(HttpService.class).setRequired(true)),
-            createComponent()
-                .setInterface(EventListener.class.getName(), null)
-                .setImplementation(m_listener)
-        };
-    }
-
-    @Override
-    public void configureAdditionalServices() throws Exception {
-
-        TestBundle bundle1v1 = new TestBundle("bundle1", version1);
-        TestBundle bundle1v2 = new TestBundle("bundle1", version2);
-        TestBundle bundle2v1 = new TestBundle("bundle2", version1);
-        TestBundle bundle2v2 = new TestBundle("bundle2", version2);
-
-        TestBundle bundle3v1 = new TestBundle("bundle3", version1, Constants.BUNDLE_ACTIVATOR, "no.Such.Class");
-        TestBundle bundle3v2 = new TestBundle("bundle3", version2);
-
-        m_package1 = new TestPackage("007", version1, bundle1v1);
-        m_package2 = new TestPackage("007", version2, bundle1v2);
-        m_package3 = new TestPackage("007", version3, bundle1v2, bundle2v1);
-        m_package4 = new TestPackage("007", version4, bundle1v2, bundle2v2);
-        m_package5 = new TestPackage("007", version5, bundle1v2, bundle2v2, bundle3v1);
-        m_package6 = new TestPackage("007", version6, bundle1v2, bundle2v2, bundle3v2);
-
-        m_servlet = new TestDeploymentServlet("007");
-        m_http.registerServlet("/deployment", m_servlet, null, null);
-        m_http.registerServlet("/agent", new TestUpdateServlet(), null, null);
-        m_http.registerServlet("/auditlog", new TestAuditlogServlet(), null, null);
-
-    }
-
-    public void tearDown() throws Exception {
-        m_http.unregister("/deployment");
-        m_http.unregister("/agent");
-        m_http.unregister("/auditlog");
-
-        resetAgentBundleState();
-    }
-
-    public void testStreamingDeployment() throws Exception {
-        AgentControl control = getService(AgentControl.class);
-
-        Map<String, String> props = new HashMap<String, String>();
-        props.put(AgentConstants.CONFIG_LOGGING_LEVEL, LoggingHandler.Levels.DEBUG.name());
-        props.put(AgentConstants.CONFIG_IDENTIFICATION_AGENTID, "007");
-        props.put(AgentConstants.CONFIG_CONTROLLER_STREAMING, "true");
-        props.put(AgentConstants.CONFIG_CONTROLLER_SYNCDELAY, "1");
-        props.put(AgentConstants.CONFIG_CONTROLLER_SYNCINTERVAL, "1");
-        props.put(AgentConstants.CONFIG_CONTROLLER_RETRIES, "2");
-
-        ConfigurationHandler configurationHandler = control.getConfigurationHandler();
-        configurationHandler.putAll(props);
-
-        waitForInstalledVersion(Version.emptyVersion);
-
-        expectSuccessfulDeployment(m_package1, Failure.VERSIONS_RETRY_AFTER);
-        expectSuccessfulDeployment(m_package2, Failure.DEPLOYMENT_RETRY_AFTER);
-        expectSuccessfulDeployment(m_package3, Failure.EMPTY_STREAM);
-        expectSuccessfulDeployment(m_package4, Failure.CORRUPT_STREAM);
-        expectSuccessfulDeployment(m_package5, Failure.ABORT_STREAM);
-        expectSuccessfulDeployment(m_package6, null);
-    }
-
-    public void testNonStreamingDeployment() throws Exception {
-        AgentControl control = getService(AgentControl.class);
-
-        Map<String, String> props = new HashMap<String, String>();
-        props.put(AgentConstants.CONFIG_LOGGING_LEVEL, LoggingHandler.Levels.DEBUG.name());
-        props.put(AgentConstants.CONFIG_IDENTIFICATION_AGENTID, "007");
-        props.put(AgentConstants.CONFIG_CONTROLLER_STREAMING, "false");
-        props.put(AgentConstants.CONFIG_CONTROLLER_SYNCDELAY, "1");
-        props.put(AgentConstants.CONFIG_CONTROLLER_SYNCINTERVAL, "1");
-        props.put(AgentConstants.CONFIG_CONTROLLER_RETRIES, "2");
-
-        ConfigurationHandler configurationHandler = control.getConfigurationHandler();
-        configurationHandler.putAll(props);
-
-        waitForInstalledVersion(Version.emptyVersion);
-
-        expectSuccessfulDeployment(m_package1, Failure.VERSIONS_RETRY_AFTER);
-        expectSuccessfulDeployment(m_package2, Failure.DEPLOYMENT_RETRY_AFTER);
-        expectSuccessfulDeployment(m_package3, Failure.EMPTY_STREAM);
-        expectSuccessfulDeployment(m_package4, Failure.CORRUPT_STREAM);
-        expectSuccessfulDeployment(m_package5, Failure.ABORT_STREAM);
-        expectSuccessfulDeployment(m_package6, null);
-    }
-
-    private void expectSuccessfulDeployment(TestPackage dpackage, Failure failure) throws Exception {
-        synchronized (m_servlet) {
-            if (failure != null) {
-                m_servlet.setFailure(Failure.VERSIONS_RETRY_AFTER);
-            }
-            m_servlet.addPackage(dpackage);
-            m_listener.getTopics().clear();
-        }
-        waitForEventReceived("org/osgi/service/deployment/INSTALL");
-        waitForEventReceived("org/osgi/service/deployment/COMPLETE");
-        waitForInstalledVersion(dpackage.getVersion());
+    private static class TestAuditlogServlet extends HttpServlet {
+        private static final long serialVersionUID = 1L;
 
-        System.out.println("---");
-    }
+        // FIXME Ignoring auditlog.. but why do we get and empty send if we set range to high?
 
-    private void waitForInstalledVersion(Version version) throws Exception {
-        ServiceReference reference = m_bundleContext.getServiceReference(AgentControl.class.getName());
-        AgentControl control = (AgentControl) m_bundleContext.getService(reference);
-        int timeout = 100;
-        while (!control.getDeploymentHandler().getInstalledVersion().equals(version)) {
-            Thread.sleep(100);
-            if (timeout-- <= 0) {
-                m_bundleContext.ungetService(reference);
-                fail("Timed out while waiting for deployment " + version);
-            }
+        @Override
+        protected void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+            response.setContentType("text/plain");
+            PrintWriter writer = response.getWriter();
+            writer.println(request.getParameter("tid") + "," + request.getParameter("logid") + ",0-10");
+            writer.close();
         }
-        m_bundleContext.ungetService(reference);
-    }
 
-    private void waitForEventReceived(String topic) throws Exception {
-        int timeout = 100;
-        while (!m_listener.getTopics().contains(topic)) {
-            Thread.sleep(100);
-            if (timeout-- <= 0) {
-                fail("Timed out while waiting for event " + topic);
+        @Override
+        protected void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException {
+            InputStream is = request.getInputStream();
+            while (is.read() != -1) {
             }
+            is.close();
+            response.setContentType("text/plain");
         }
     }
 
-    private static File createPackage(String name, Version version, File... bundles) throws Exception {
-        DeploymentPackageBuilder builder = DeploymentPackageBuilder.createDeploymentPackage(name, version.toString());
-        for (File bundle : bundles) {
-            builder.addBundle(bundle.toURI().toURL());
-        }
-        File file = File.createTempFile("testpackage", ".jar");
-        OutputStream fos = new FileOutputStream(file);
-        builder.generate(fos);
-        fos.close();
-        return file;
-    }
-
-    private static File createBundle(String bsn, Version version, String... headers) throws Exception {
-        Builder b = new Builder();
-        b.setProperty("Bundle-SymbolicName", bsn);
-        b.setProperty("Bundle-Version", version.toString());
-        for (int i = 0; i < headers.length; i += 2) {
-            b.setProperty(headers[i], headers[i + 1]);
-        }
-        b.setProperty("Include-Resource", "bnd.bnd"); // prevent empty jar bug
-        Jar jar = b.build();
-        jar.getManifest(); // Not sure whether this is needed...
-        File file = File.createTempFile("testbundle", ".jar");
-        jar.write(file);
-        b.close();
-        return file;
-    }
-
     private static class TestBundle {
         private final File m_file;
 
@@ -257,43 +99,6 @@ public class AgentDeploymentTest extends
         }
     }
 
-    private static class TestPackage {
-        private final Version m_version;
-        private final File m_file;
-
-        public TestPackage(String name, Version version, TestBundle... bundles) throws Exception {
-            m_version = version;
-
-            File[] files = new File[bundles.length];
-            for (int i = 0; i < bundles.length; i++) {
-                files[i] = bundles[i].getFile();
-            }
-            m_file = createPackage(name, version, files);
-        }
-
-        public Version getVersion() {
-            return m_version;
-        }
-
-        public File getFile() {
-            return m_file;
-        }
-    }
-
-    private static class TestEventListener implements EventListener {
-        private final CopyOnWriteArrayList<String> m_topics = new CopyOnWriteArrayList<String>();
-
-        @Override
-        public void handle(String topic, Map<String, String> payload) {
-            System.out.println("Event: " + topic + " => " + payload);
-            m_topics.add(topic);
-        }
-
-        public List<String> getTopics() {
-            return m_topics;
-        }
-    }
-
     private static class TestDeploymentServlet extends HttpServlet {
         private static final long serialVersionUID = 1L;
 
@@ -307,6 +112,19 @@ public class AgentDeploymentTest extends
             m_agentId = agentId;
         }
 
+        public synchronized void addPackage(TestPackage testPackage) {
+            m_packages.put(testPackage.getVersion().toString(), testPackage);
+        }
+
+        public synchronized void setFailure(Failure failure) {
+            m_failure = failure;
+        }
+
+        public synchronized void reset() {
+            m_failure = null;
+            m_packages.clear();
+        }
+
         @Override
         protected synchronized void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
             String pathinfoTail = req.getPathInfo().replaceFirst("/" + m_agentId + "/versions/?", "");
@@ -322,14 +140,6 @@ public class AgentDeploymentTest extends
             }
         }
 
-        public synchronized void addPackage(TestPackage testPackage) {
-            m_packages.put(testPackage.getVersion().toString(), testPackage);
-        }
-
-        public synchronized void setFailure(Failure failure) {
-            m_failure = failure;
-        }
-
         private void sendPackage(TestPackage dpackage, HttpServletResponse resp) throws IOException {
             if (m_failure == Failure.DEPLOYMENT_RETRY_AFTER) {
                 resp.addHeader("Retry-After", BACKOFF_TIME);
@@ -346,13 +156,11 @@ public class AgentDeploymentTest extends
                 os = resp.getOutputStream();
 
                 if (m_failure == Failure.EMPTY_STREAM) {
-                    m_failure = null;
                     return;
                 }
 
                 if (m_failure == Failure.CORRUPT_STREAM) {
                     os.write("garbage".getBytes());
-                    m_failure = null;
                 }
 
                 int b;
@@ -360,7 +168,6 @@ public class AgentDeploymentTest extends
                 while ((b = fis.read()) != -1) {
                     os.write(b);
                     if (count++ == middle && m_failure == Failure.ABORT_STREAM) {
-                        m_failure = null;
                         break;
                     }
                 }
@@ -391,6 +198,97 @@ public class AgentDeploymentTest extends
         }
     }
 
+    private static class TestEventListener implements EventListener {
+        private final Map<String, List<Map<String, String>>> m_topics = new HashMap<String, List<Map<String, String>>>();
+
+        public Map<String, List<Map<String, String>>> getTopics() {
+            Map<String, List<Map<String, String>>> result;
+            synchronized (m_topics) {
+                result = new HashMap<String, List<Map<String, String>>>(m_topics);
+            }
+            return result;
+        }
+
+        public boolean containsTopic(String topic) {
+            synchronized (m_topics) {
+                return m_topics.containsKey(topic);
+            }
+        }
+
+        public boolean containsTopic(String topic, Map<String, String> expectedProperties) {
+            synchronized (m_topics) {
+                List<Map<String, String>> payloads = m_topics.get(topic);
+                if (payloads == null || payloads.isEmpty()) {
+                    return expectedProperties.isEmpty();
+                }
+                for (Map<String, String> payload : payloads) {
+                    if (matches(expectedProperties, payload)) {
+                        return true;
+                    }
+                }
+                return false;
+            }
+        }
+
+        @Override
+        public void handle(String topic, Map<String, String> payload) {
+            if (LOGLEVEL == Levels.DEBUG) {
+                System.out.printf("Handling event: %s => %s.%n", topic, payload);
+            }
+
+            synchronized (m_topics) {
+                List<Map<String, String>> payloads = m_topics.get(topic);
+                if (payloads == null) {
+                    payloads = new ArrayList<Map<String, String>>();
+                    m_topics.put(topic, payloads);
+                }
+                payloads.add(payload);
+            }
+        }
+
+        private static boolean matches(Map<String, String> source, Map<String, String> target) {
+            for (Map.Entry<String, String> sourceEntry : source.entrySet()) {
+                String sourceKey = sourceEntry.getKey();
+                String sourceValue = sourceEntry.getValue();
+
+                if (!target.containsKey(sourceKey)) {
+                    return false;
+                }
+                String targetValue = target.get(sourceKey);
+                if (!sourceValue.equals(targetValue)) {
+                    return false;
+                }
+            }
+
+            return true;
+        }
+    }
+
+    private static class TestPackage {
+        private final String m_name;
+        private final Version m_version;
+        private final File m_file;
+
+        public TestPackage(String name, Version version, TestBundle... bundles) throws Exception {
+            m_name = name;
+            m_version = version;
+
+            File[] files = new File[bundles.length];
+            for (int i = 0; i < bundles.length; i++) {
+                files[i] = bundles[i].getFile();
+            }
+            m_file = createPackage(m_name, m_version, files);
+        }
+
+        public File getFile() {
+            return m_file;
+        }
+
+        public Version getVersion() {
+            return m_version;
+        }
+    }
+
     private static class TestUpdateServlet extends HttpServlet {
         private static final long serialVersionUID = 1L;
 
@@ -400,26 +298,341 @@ public class AgentDeploymentTest extends
         }
     }
 
-    private static class TestAuditlogServlet extends HttpServlet {
-        private static final long serialVersionUID = 1L;
+    private static final String AGENT_DEPLOYMENT_COMPLETE = "agent/deployment/COMPLETE";
+    private static final String AGENT_DEPLOYMENT_INSTALL = "agent/deployment/INSTALL";
 
-        // FIXME Ignoring auditlog.. but why do we get and empty send if we set range to high?
+    private static final String AGENT_ID = "007";
+    private static final String TEST_BUNDLE_NAME_PREFIX = "test.bundle";
+    private static final Levels LOGLEVEL = Levels.INFO;
+
+    private static final Version V1_0_0 = Version.parseVersion("1.0.0");
+    private static final Version V2_0_0 = Version.parseVersion("2.0.0");
+    private static final Version V3_0_0 = Version.parseVersion("3.0.0");
+    private static final Version V4_0_0 = Version.parseVersion("4.0.0");
+    private static final Version V5_0_0 = Version.parseVersion("5.0.0");
+    private static final Version V6_0_0 = Version.parseVersion("6.0.0");
 
-        @Override
-        protected void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
-            response.setContentType("text/plain");
-            PrintWriter writer = response.getWriter();
-            writer.println(request.getParameter("tid") + "," + request.getParameter("logid") + ",0-10");
-            writer.close();
+    private volatile TestDeploymentServlet m_servlet;
+    private volatile HttpService m_http;
+    private volatile TestEventListener m_listener;
+
+    private TestPackage m_package1;
+    private TestPackage m_package2;
+    private TestPackage m_package3;
+    private TestPackage m_package4;
+    private TestPackage m_package5;
+    private TestPackage m_package6;
+
+    /**
+     * Test case for ACE-323: when a version of a DP was downloaded correctly, but did not install correctly, we should
+     * not keep trying, unless a newer version of that DP is available.
+     */
+    public void testFailedDeploymentWithoutRetrying() throws Exception {
+        setupAgentForNonStreamingDeployment();
+
+        expectSuccessfulDeployment(m_package1, null);
+
+        // Try to install a DP that fails due to an aborted stream...
+        expectFailedDeployment(m_package2, Failure.ABORT_STREAM);
+        waitForInstalledVersion(V1_0_0);
+
+        // The failed DP should not be installed again...
+        TimeUnit.SECONDS.sleep(2); // sleep a little while to show the retry in the log...
+
+        // If we install a newer version, it should succeed...
+        expectSuccessfulDeployment(m_package6, null);
+
+        // Check our event log, should contain all handled events...
+        Map<String, List<Map<String, String>>> topics = m_listener.getTopics();
+
+        List<Map<String, String>> events = topics.get(AGENT_DEPLOYMENT_INSTALL);
+        // should contain exactly three different elements...
+        assertEquals(events.toString(), 3, events.size());
+
+        events = topics.get(AGENT_DEPLOYMENT_COMPLETE);
+        // should contain exactly three different elements...
+        assertEquals(events.toString(), 3, events.size());
+    }
+
+    /**
+     * Tests that we can install upgrades for an earlier installed DP.
+     */
+    public void testInstallUpgradeDeploymentPackage() throws Exception {
+        setupAgentForNonStreamingDeployment();
+
+        // Try to install a DP that fails at bundle-starting due to a non-existing class, but this does not revert the
+        // installation of the DP itself...
+        expectSuccessfulDeployment(m_package5, null);
+
+        // If we install a newer version, it should succeed...
+        expectSuccessfulDeployment(m_package6, null);
+    }
+
+    /**
+     * Tests the deployment of "non-streamed" deployment packages in various situations.
+     */
+    public void testNonStreamingDeployment() throws Exception {
+        setupAgentForNonStreamingDeployment();
+
+        expectSuccessfulDeployment(m_package6, null);
+    }
+
+    /**
+     * Tests the deployment of "non-streamed" deployment packages in various situations.
+     */
+    public void testNonStreamingDeployment_AbortedStream() throws Exception {
+        setupAgentForNonStreamingDeployment();
+
+        expectFailedDeployment(m_package5, Failure.ABORT_STREAM);
+    }
+
+    /**
+     * Tests the deployment of "non-streamed" deployment packages in various situations.
+     */
+    public void testNonStreamingDeployment_CorruptStream() throws Exception {
+        setupAgentForNonStreamingDeployment();
+
+        expectFailedDeployment(m_package4, Failure.CORRUPT_STREAM);
+    }
+
+    /**
+     * Tests the deployment of "non-streamed" deployment packages in various situations.
+     */
+    public void testNonStreamingDeployment_DeploymentRetryAfter() throws Exception {
+        setupAgentForNonStreamingDeployment();
+
+        expectSuccessfulDeployment(m_package2, Failure.DEPLOYMENT_RETRY_AFTER);
+    }
+
+    /**
+     * Tests the deployment of "non-streamed" deployment packages in various situations.
+     */
+    public void testNonStreamingDeployment_EmptyStream() throws Exception {
+        setupAgentForNonStreamingDeployment();
+
+        expectFailedDeployment(m_package3, Failure.EMPTY_STREAM);
+    }
+
+    /**
+     * Tests the deployment of "non-streamed" deployment packages in various situations.
+     */
+    public void testNonStreamingDeployment_VersionsRetryAfter() throws Exception {
+        setupAgentForNonStreamingDeployment();
+
+        expectSuccessfulDeployment(m_package1, Failure.VERSIONS_RETRY_AFTER);
+    }
+
+    /**
+     * Tests the deployment of "streamed" deployment packages in various situations.
+     */
+    public void testStreamingDeployment() throws Exception {
+        setupAgentForStreamingDeployment();
+
+        expectSuccessfulDeployment(m_package6, null);
+    }
+
+    /**
+     * Tests the deployment of "streamed" deployment packages in various situations.
+     */
+    public void testStreamingDeployment_AbortStream() throws Exception {
+        setupAgentForStreamingDeployment();
+
+        expectFailedDeployment(m_package5, Failure.ABORT_STREAM);
+    }
+
+    /**
+     * Tests the deployment of "streamed" deployment packages in various situations.
+     */
+    public void testStreamingDeployment_CorruptStream() throws Exception {
+        setupAgentForStreamingDeployment();
+
+        expectFailedDeployment(m_package4, Failure.CORRUPT_STREAM);
+    }
+
+    /**
+     * Tests the deployment of "streamed" deployment packages in various situations.
+     */
+    public void testStreamingDeployment_DeploymentRetryAfter() throws Exception {
+        setupAgentForStreamingDeployment();
+
+        expectSuccessfulDeployment(m_package2, Failure.DEPLOYMENT_RETRY_AFTER);
+    }
+
+    /**
+     * Tests the deployment of "streamed" deployment packages in various situations.
+     */
+    public void testStreamingDeployment_EmptyStream() throws Exception {
+        setupAgentForStreamingDeployment();
+
+        expectFailedDeployment(m_package3, Failure.EMPTY_STREAM);
+    }
+
+    /**
+     * Tests the deployment of "streamed" deployment packages in various situations.
+     */
+    public void testStreamingDeployment_VersionsRetryAfter() throws Exception {
+        setupAgentForStreamingDeployment();
+
+        expectSuccessfulDeployment(m_package1, Failure.VERSIONS_RETRY_AFTER);
+    }
+
+    @Override
+    protected void configureAdditionalServices() throws Exception {
+        TestBundle bundle1v1 = new TestBundle(TEST_BUNDLE_NAME_PREFIX.concat("1"), V1_0_0);
+        TestBundle bundle1v2 = new TestBundle(TEST_BUNDLE_NAME_PREFIX.concat("1"), V2_0_0);
+        TestBundle bundle2v1 = new TestBundle(TEST_BUNDLE_NAME_PREFIX.concat("2"), V1_0_0);
+        TestBundle bundle2v2 = new TestBundle(TEST_BUNDLE_NAME_PREFIX.concat("2"), V2_0_0);
+        TestBundle bundle3v1 = new TestBundle(TEST_BUNDLE_NAME_PREFIX.concat("3"), V1_0_0, Constants.BUNDLE_ACTIVATOR, "no.such.Class");
+        TestBundle bundle3v2 = new TestBundle(TEST_BUNDLE_NAME_PREFIX.concat("3"), V2_0_0);
+
+        m_package1 = new TestPackage(AGENT_ID, V1_0_0, bundle1v1);
+        m_package2 = new TestPackage(AGENT_ID, V2_0_0, bundle1v2);
+        m_package3 = new TestPackage(AGENT_ID, V3_0_0, bundle1v2, bundle2v1);
+        m_package4 = new TestPackage(AGENT_ID, V4_0_0, bundle1v2, bundle2v2);
+        m_package5 = new TestPackage(AGENT_ID, V5_0_0, bundle1v2, bundle2v2, bundle3v1);
+        m_package6 = new TestPackage(AGENT_ID, V6_0_0, bundle1v2, bundle2v2, bundle3v2);
+
+        m_servlet = new TestDeploymentServlet(AGENT_ID);
+
+        m_http.registerServlet("/deployment", m_servlet, null, null);
+        m_http.registerServlet("/agent", new TestUpdateServlet(), null, null);
+        m_http.registerServlet("/auditlog", new TestAuditlogServlet(), null, null);
+    }
+
+    @Override
+    protected Component[] getDependencies() {
+        m_listener = new TestEventListener();
+        return new Component[] {
+            createComponent()
+                .setImplementation(this)
+                .add(createServiceDependency().setService(HttpService.class).setRequired(true)),
+            createComponent()
+                .setInterface(EventListener.class.getName(), null)
+                .setImplementation(m_listener)
+        };
+    }
+
+    protected void tearDown() throws Exception {
+        // Remove all provisioned components...
+        m_dependencyManager.clear();
+
+        m_http.unregister("/deployment");
+        m_http.unregister("/agent");
+        m_http.unregister("/auditlog");
+
+        // Force an uninstall of all remaining test bundles...
+        for (Bundle bundle : m_bundleContext.getBundles()) {
+            String bsn = bundle.getSymbolicName();
+            if (bsn.startsWith(TEST_BUNDLE_NAME_PREFIX)) {
+                bundle.uninstall();
+            }
         }
 
-        @Override
-        protected void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException {
-            InputStream is = request.getInputStream();
-            while (is.read() != -1) {
+        resetAgentBundleState();
+    }
+
+    private Map<String, String> createAgentConfiguration(boolean useStreaming) {
+        Map<String, String> props = new HashMap<String, String>();
+        props.put(AgentConstants.CONFIG_IDENTIFICATION_AGENTID, AGENT_ID);
+        props.put(AgentConstants.CONFIG_LOGGING_LEVEL, LOGLEVEL.name());
+        props.put(AgentConstants.CONFIG_CONTROLLER_STREAMING, Boolean.toString(useStreaming));
+        props.put(AgentConstants.CONFIG_CONTROLLER_SYNCDELAY, "1");
+        props.put(AgentConstants.CONFIG_CONTROLLER_SYNCINTERVAL, "1");
+        props.put(AgentConstants.CONFIG_CONTROLLER_RETRIES, "2");
+        return props;
+    }
+
+    private void expectSuccessfulDeployment(TestPackage dpackage, Failure failure) throws Exception {
+        deployPackage(dpackage, failure);
+
+        waitForEventReceived(AGENT_DEPLOYMENT_INSTALL);
+        waitForEventReceived(AGENT_DEPLOYMENT_COMPLETE, "successful", "true");
+
+        waitForInstalledVersion(dpackage.getVersion());
+    }
+
+    private void expectFailedDeployment(TestPackage dpackage, Failure failure) throws Exception {
+        deployPackage(dpackage, failure);
+
+        waitForEventReceived(AGENT_DEPLOYMENT_INSTALL);
+        waitForEventReceived(AGENT_DEPLOYMENT_COMPLETE, "successful", "false");
+    }
+
+    private void deployPackage(TestPackage dpackage, Failure failure) {
+        synchronized (m_servlet) {
+            m_servlet.setFailure(failure);
+            m_servlet.addPackage(dpackage);
+        }
+    }
+
+    private void setupAgentForNonStreamingDeployment() throws Exception {
+        AgentControl control = getService(AgentControl.class);
+
+        Map<String, String> props = createAgentConfiguration(false /* useStreaming */);
+
+        ConfigurationHandler configurationHandler = control.getConfigurationHandler();
+        configurationHandler.putAll(props);
+
+        synchronized (m_servlet) {
+            m_servlet.reset();
+        }
+
+        waitForInstalledVersion(Version.emptyVersion);
+    }
+
+    private void setupAgentForStreamingDeployment() throws Exception {
+        AgentControl control = getService(AgentControl.class);
+
+        Map<String, String> props = createAgentConfiguration(true /* useStreaming */);
+
+        ConfigurationHandler configurationHandler = control.getConfigurationHandler();
+        configurationHandler.putAll(props);
+
+        waitForInstalledVersion(Version.emptyVersion);
+    }
+
+    private void waitForEventReceived(String topic) throws Exception {
+        int timeout = 100;
+        while (!m_listener.containsTopic(topic)) {
+            Thread.sleep(100);
+            if (timeout-- <= 0) {
+                fail("Timed out while waiting for event " + topic);
             }
-            is.close();
-            response.setContentType("text/plain");
+        }
+    }
+
+    private void waitForEventReceived(String topic, String... properties) throws Exception {
+        Map<String, String> props = new HashMap<String, String>();
+        for (int i = 0; i < properties.length; i += 2) {
+            props.put(properties[i], properties[i + 1]);
+        }
+
+        int timeout = 100;
+        while (!m_listener.containsTopic(topic, props)) {
+            Thread.sleep(100);
+            if (timeout-- <= 0) {
+                fail("Timed out while waiting for event " + topic);
+            }
+        }
+    }
+
+    private void waitForInstalledVersion(Version version) throws Exception {
+        ServiceReference reference = m_bundleContext.getServiceReference(AgentControl.class.getName());
+
+        try {
+            AgentControl control = (AgentControl) m_bundleContext.getService(reference);
+            DeploymentHandler deploymentHandler = control.getDeploymentHandler();
+
+            int timeout = 100;
+            while (!deploymentHandler.getInstalledVersion().equals(version)) {
+                Thread.sleep(100);
+                if (timeout-- <= 0) {
+                    fail("Timed out while waiting for deployment " + version);
+                }
+            }
+        }
+        finally {
+            m_bundleContext.ungetService(reference);
         }
     }
 }

Modified: ace/trunk/org.apache.ace.agent.itest/src/org/apache/ace/agent/itest/BaseAgentTest.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent.itest/src/org/apache/ace/agent/itest/BaseAgentTest.java?rev=1521918&r1=1521917&r2=1521918&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent.itest/src/org/apache/ace/agent/itest/BaseAgentTest.java (original)
+++ ace/trunk/org.apache.ace.agent.itest/src/org/apache/ace/agent/itest/BaseAgentTest.java Wed Sep 11 16:04:54 2013
@@ -1,30 +1,95 @@
 package org.apache.ace.agent.itest;
 
 import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.Stack;
 
 import org.apache.ace.agent.AgentConstants;
 import org.apache.ace.agent.AgentControl;
+import org.apache.ace.builder.DeploymentPackageBuilder;
 import org.apache.ace.it.IntegrationTestBase;
 import org.osgi.framework.Bundle;
+import org.osgi.framework.Version;
+
+import aQute.lib.osgi.Builder;
+import aQute.lib.osgi.Jar;
 
 public abstract class BaseAgentTest extends IntegrationTestBase {
 
+    protected static File createBundle(String bsn, Version version, String... headers) throws Exception {
+        Builder b = new Builder();
+
+        try {
+            b.setProperty("Bundle-SymbolicName", bsn);
+            b.setProperty("Bundle-Version", version.toString());
+            for (int i = 0; i < headers.length; i += 2) {
+                b.setProperty(headers[i], headers[i + 1]);
+            }
+            b.setProperty("Include-Resource", "bnd.bnd"); // prevent empty jar bug
+
+            Jar jar = b.build();
+            jar.getManifest(); // Not sure whether this is needed...
+
+            File file = File.createTempFile("testbundle", ".jar");
+            file.deleteOnExit();
+
+            jar.write(file);
+            return file;
+        }
+        finally {
+            b.close();
+        }
+    }
+
+    protected static File createPackage(String name, Version version, File... bundles) throws Exception {
+        DeploymentPackageBuilder builder = DeploymentPackageBuilder.createDeploymentPackage(name, version.toString());
+
+        OutputStream fos = null;
+        try {
+            for (File bundle : bundles) {
+                builder.addBundle(bundle.toURI().toURL());
+            }
+
+            File file = File.createTempFile("testpackage", ".jar");
+            file.deleteOnExit();
+
+            fos = new FileOutputStream(file);
+            builder.generate(fos);
+
+            return file;
+        }
+        finally {
+            if (fos != null) {
+                fos.close();
+            }
+        }
+    }
+
     @Override
-    public void configureProvisionedServices() throws Exception {
+    protected void configureProvisionedServices() throws Exception {
         resetAgentBundleState();
     }
 
+    protected Bundle getAgentBundle() {
+        for (Bundle bundle : m_bundleContext.getBundles()) {
+            if (bundle.getSymbolicName().equals(AgentControl.class.getPackage().getName())) {
+                return bundle;
+            }
+        }
+        throw new IllegalStateException("No agentBundle found");
+    }
+
     protected void resetAgentBundleState() throws Exception {
         Bundle agentBundle = getAgentBundle();
-        System.out.println("BaseAgentTest: Stopping agent bundle");
         File dataDir = agentBundle.getBundleContext().getDataFile("");
+
+//        System.out.println("BaseAgentTest: Stopping agent bundle");
         agentBundle.stop();
-        System.out.println("BaseAgentTest: Cleaning bundle data dir");
+//        System.out.println("BaseAgentTest: Cleaning bundle data dir (" + dataDir + ")");
         cleanDir(dataDir);
-        System.out.println("BaseAgentTest: Cleaning system properties");
+//        System.out.println("BaseAgentTest: Cleaning system properties");
         Set<String> keysBeRemoved = new HashSet<String>();
         for (Object key : System.getProperties().keySet()) {
             if (key instanceof String && ((String) key).startsWith(AgentConstants.CONFIG_KEY_NAMESPACE)) {
@@ -34,36 +99,20 @@ public abstract class BaseAgentTest exte
         for (String removeKey : keysBeRemoved) {
             System.clearProperty(removeKey);
         }
-        System.out.println("BaseAgentTest: Starting agent bundle");
+//        System.out.println("BaseAgentTest: Starting agent bundle");
         agentBundle.start();
     }
 
-    protected Bundle getAgentBundle() {
-        for (Bundle bundle : m_bundleContext.getBundles()) {
-            if (bundle.getSymbolicName().equals(AgentControl.class.getPackage().getName())) {
-                return bundle;
-            }
-        }
-        throw new IllegalStateException("No agentBundle found");
-    }
-
     private void cleanDir(File dir) {
         if (!dir.isDirectory()) {
             throw new IllegalStateException();
         }
-        Stack<File> dirs = new Stack<File>();
-        dirs.push(dir);
-        while (!dirs.isEmpty()) {
-            File currentDir = dirs.pop();
-            File[] files = currentDir.listFiles();
-            for (File file : files) {
-                if (file.isDirectory()) {
-                    dirs.push(file);
-                }
-                else {
-                    file.delete();
-                }
+        File[] files = dir.listFiles();
+        for (File file : files) {
+            if (file.isDirectory()) {
+                cleanDir(file);
             }
+            file.delete();
         }
     }
 }

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadResult.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadResult.java?rev=1521918&r1=1521917&r2=1521918&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadResult.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadResult.java Wed Sep 11 16:04:54 2013
@@ -18,7 +18,8 @@
  */
 package org.apache.ace.agent;
 
-import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.List;
 import java.util.Map;
 
@@ -36,13 +37,15 @@ public interface DownloadResult {
     DownloadState getState();
 
     /**
-     * Returns the download file.
+     * Returns an input stream to the downloaded result.
      * 
-     * @return The file, <code>null</code> if the download was unsuccessful
+     * @return an input stream, can be <code>null</code> if the download was unsuccessful.
      */
-    // TODO inputstream
-    File getFile();
+    InputStream getInputStream() throws IOException;
 
+    /**
+     * @return
+     */
     int getCode();
 
     Map<String, List<String>> getHeaders();

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/Activator.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/Activator.java?rev=1521918&r1=1521917&r2=1521918&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/Activator.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/Activator.java Wed Sep 11 16:04:54 2013
@@ -65,7 +65,7 @@ public class Activator implements Bundle
      */
     @Override
     public void start(BundleContext bundleContext) throws Exception {
-        m_executorService = Executors.newScheduledThreadPool(1, new InternalThreadFactory());
+        m_executorService = Executors.newScheduledThreadPool(5, new InternalThreadFactory());
 
         m_dependencyTracker = new DependencyTrackerImpl(bundleContext, this);
 

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java?rev=1521918&r1=1521917&r2=1521918&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java Wed Sep 11 16:04:54 2013
@@ -24,12 +24,15 @@ import static org.apache.ace.agent.Agent
 import static org.apache.ace.agent.AgentConstants.CONFIG_CONTROLLER_STREAMING;
 import static org.apache.ace.agent.AgentConstants.CONFIG_CONTROLLER_SYNCDELAY;
 import static org.apache.ace.agent.AgentConstants.CONFIG_CONTROLLER_SYNCINTERVAL;
+import static org.apache.ace.agent.impl.ConnectionUtil.closeSilently;
 import static org.apache.ace.agent.impl.InternalConstants.AGENT_CONFIG_CHANGED;
-import static org.apache.ace.agent.impl.ConnectionUtil.*;
-import java.io.FileInputStream;
+import static org.apache.ace.agent.impl.InternalConstants.AGENT_DEPLOYMENT_COMPLETE;
+import static org.apache.ace.agent.impl.InternalConstants.AGENT_DEPLOYMENT_INSTALL;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
@@ -39,6 +42,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.ace.agent.AgentUpdateHandler;
 import org.apache.ace.agent.DeploymentHandler;
 import org.apache.ace.agent.DownloadHandle;
 import org.apache.ace.agent.DownloadResult;
@@ -54,6 +58,242 @@ import org.osgi.service.deploymentadmin.
  */
 public class DefaultController extends ComponentBase implements Runnable, EventListener {
 
+    /**
+     * UpdateInstaller that provides download deployment package install. The install is non-blocking. Upon download
+     * completion this installer will reschedule the controller.
+     */
+    static class DownloadUpdateInstaller extends UpdateInstaller implements DownloadHandle.ProgressListener, DownloadHandle.ResultListener {
+        // active download state
+        private volatile DownloadHandle m_downloadHandle;
+        private volatile DownloadResult m_downloadResult = null;
+        private volatile Version m_downloadVersion;
+        private volatile long m_downloadLength = 0;
+        private volatile long m_downloadProgress = 0;
+
+        public DownloadUpdateInstaller(DefaultController controller) {
+            super(controller);
+        }
+
+        @Override
+        public void completed(DownloadResult result) {
+            int delay = 1; // seconds
+            m_downloadResult = result;
+            getController().logInfo("Deployment package download completed for version %s. Rescheduling the controller to run in %d seconds", m_downloadVersion, delay);
+            getController().scheduleRun(delay);
+        }
+
+        @Override
+        public void doInstallUpdate(Version fromVersion, Version toVersion, boolean fixPackage) throws RetryAfterException, DeploymentException, IOException {
+            DeploymentHandler deploymentHandler = getController().getDeploymentHandler();
+
+            if (m_downloadHandle != null && !m_downloadVersion.equals(toVersion)) {
+                getController().logInfo("Cancelling deployment package download for %s because a newer version is available...", m_downloadVersion);
+                m_downloadHandle.discard();
+                m_downloadHandle = null;
+            }
+
+            if (m_downloadHandle == null) {
+                getController().logInfo("Starting deployment package download %s => %s...", fromVersion, toVersion);
+
+                m_downloadVersion = toVersion;
+
+                m_downloadHandle = deploymentHandler.getDownloadHandle(toVersion, fixPackage);
+                m_downloadHandle.setProgressListener(this).setCompletionListener(this).start();
+                return;
+            }
+
+            if (m_downloadResult == null) {
+                getController().logInfo("Deployment package download for %s is in progress %d / %d", toVersion, m_downloadProgress, m_downloadLength);
+                return;
+            }
+
+            DownloadState state = m_downloadResult.getState();
+            if (DownloadState.FAILED == state) {
+                getController().logWarning("Deployment package download for %s is FAILED. Restarting download...");
+
+                m_downloadHandle.discard();
+                m_downloadHandle = null;
+
+                throw new IOException("Download failed for deployment update " + fromVersion + " => " + toVersion);
+            }
+            else if (DownloadState.STOPPED == state) {
+                getController().logInfo("Deployment package download for %s is STOPPED. Trying to resume download...");
+
+                m_downloadResult = null;
+                m_downloadHandle.start();
+            }
+            else if (DownloadState.SUCCESSFUL == state) {
+                getController().logInfo("Installing downloaded deployment update %s => %s...", fromVersion, toVersion);
+
+                getController().sendDeploymentInstallEvent(fromVersion, toVersion, fixPackage);
+                
+                InputStream inputStream = null;
+                boolean success = false;
+
+                try {
+                    inputStream = m_downloadResult.getInputStream();
+
+                    deploymentHandler.deployPackage(inputStream);
+
+                    success = true;
+                }
+                finally {
+                    m_downloadHandle.discard();
+                    m_downloadHandle = null;
+
+                    closeSilently(inputStream);
+
+                    getController().sendDeploymentCompletedEvent(fromVersion, toVersion, fixPackage, success);
+                }
+            }
+        }
+
+        @Override
+        public void doReset() {
+            if (m_downloadHandle != null) {
+                getController().logInfo("Cancelling deployment package download for version %s because of reset...", m_downloadVersion);
+                m_downloadHandle.discard();
+            }
+            clearDownloadState();
+        }
+
+        @Override
+        public void progress(long contentLength, long progress) {
+            m_downloadLength = contentLength;
+            m_downloadProgress = progress;
+        }
+
+        private void clearDownloadState() {
+            if (m_downloadHandle != null) {
+                m_downloadHandle.discard();
+            }
+            m_downloadHandle = null;
+            m_downloadResult = null;
+            m_downloadVersion = null;
+        }
+    }
+
+    /**
+     * UpdateInstaller that provides streaming deployment package install. The install is blocking.
+     */
+    static class StreamingUpdateInstaller extends UpdateInstaller {
+        public StreamingUpdateInstaller(DefaultController controller) {
+            super(controller);
+        }
+
+        @Override
+        public void doInstallUpdate(Version from, Version to, boolean fix) throws RetryAfterException, DeploymentException, IOException {
+            getController().logInfo("Installing streaming deployment update %s => %s", from, to);
+
+            getController().sendDeploymentInstallEvent(from, to, fix);
+
+            DeploymentHandler deploymentHandler = getController().getDeploymentHandler();
+
+            InputStream inputStream = null;
+            boolean success = false;
+
+            try {
+                inputStream = deploymentHandler.getInputStream(to, fix);
+
+                deploymentHandler.deployPackage(inputStream);
+
+                success = true;
+            }
+            finally {
+                closeSilently(inputStream);
+
+                getController().sendDeploymentCompletedEvent(from, to, fix, success);
+            }
+        }
+
+        @Override
+        protected void doReset() {
+            // Nop
+        }
+    }
+
+    /**
+     * Base class for internal installer strategies. This implementation handles max update retry contraints and
+     * delegates the rest to concrete implementations.
+     */
+    abstract static class UpdateInstaller {
+        private final DefaultController m_controller;
+        private Version m_lastVersionTried = null;
+        private boolean m_lastVersionSuccessful = true;
+        private int m_failureCount = 0;
+
+        public UpdateInstaller(DefaultController controller) {
+            m_controller = controller;
+        }
+
+        public final boolean canInstallUpdate(Version fromVersion, Version toVersion, long maxRetries) {
+            if (toVersion.compareTo(fromVersion) > 0) {
+                // Possible newer version, lets check our administration whether we actually need to do something...
+                if (m_lastVersionTried != null && toVersion.equals(m_lastVersionTried)) {
+                    if (m_failureCount >= maxRetries) {
+                        m_controller.logDebug("Ignoring update %s => %s because max retries (%d) reached!", fromVersion, toVersion, maxRetries);
+                        return false;
+                    }
+                    if (!m_lastVersionSuccessful) {
+                        m_controller.logDebug("Ignoring update %s => %s because it failed previously!", fromVersion, toVersion);
+                        return false;
+                    }
+                }
+
+                m_controller.logDebug("Need to install update: newer deployment version available!");
+                return true;
+            }
+            else {
+                m_controller.logDebug("No need to install update: no newer deployment version available!");
+                return false;
+            }
+        }
+
+        public final void installUpdate(Version fromVersion, Version toVersion, boolean fixPackage) throws RetryAfterException {
+            if (m_lastVersionTried == null || !toVersion.equals(m_lastVersionTried)) {
+                m_lastVersionTried = toVersion;
+                m_lastVersionSuccessful = true;
+                m_failureCount = 0;
+            }
+
+            try {
+                doInstallUpdate(fromVersion, toVersion, fixPackage);
+                // As we've just successfully installed this update, reset the failure counter...
+                m_failureCount = 0;
+                m_lastVersionSuccessful = true;
+            }
+            catch (RetryAfterException e) {
+                // The server is busy. Re-throw so the controller can abort the sync and reschedule.
+                throw (e);
+            }
+            catch (DeploymentException e) {
+                getController().logWarning("Exception while deploying the package", e);
+                e.printStackTrace();
+                m_failureCount++;
+                m_lastVersionSuccessful = false;
+            }
+            catch (IOException e) {
+                getController().logWarning("Exception opening/streaming package inputstream", e);
+                e.printStackTrace();
+                m_failureCount++;
+            }
+        }
+
+        public final void reset() {
+            m_lastVersionTried = null;
+            m_failureCount = 0;
+            doReset();
+        }
+
+        protected abstract void doInstallUpdate(Version from, Version to, boolean fix) throws RetryAfterException, DeploymentException, IOException;
+
+        protected abstract void doReset();
+
+        protected final DefaultController getController() {
+            return m_controller;
+        }
+    }
+
     private volatile ScheduledFuture<?> m_scheduledFuture;
     private volatile UpdateInstaller m_updateInstaller;
 
@@ -123,7 +363,7 @@ public class DefaultController extends C
                     // Ignore...
                 }
             }
-            
+
             logDebug("Config changed: disabled: %s, update: %s, fixPkg: %s, syncDelay: %d, syncInterval: %d, maxRetries: %d", m_disabled.get(), m_updateStreaming.get(), m_fixPackage.get(), m_syncDelay.get(), m_interval.get(), m_maxRetries.get());
         }
     }
@@ -188,36 +428,30 @@ public class DefaultController extends C
         unscheduleRun();
     }
 
-    private void runFeedback() throws RetryAfterException {
-        logDebug("Synchronizing feedback channels");
-
-        Set<String> names = getFeedbackChannelNames();
-        for (String name : names) {
-            FeedbackChannel channel = getFeedbackChannel(name);
-            if (channel != null) {
-                try {
-                    channel.sendFeedback();
-                    logDebug("Feedback send succesfully for channel: %s", name);
-                }
-                catch (IOException e) {
-                    // Hopefully temporary problem due to remote IO or configuration. No cause to abort the sync so we
-                    // just log it as a warning.
-                    logWarning("Exception while sending feedback on channel: %s", e, name);
-                }
-            }
-        }
+    protected void scheduleRun(long seconds) {
+        unscheduleRun();
+        m_scheduledFuture = getExecutorService().schedule(this, seconds, TimeUnit.SECONDS);
     }
 
-    private Set<String> getFeedbackChannelNames() {
-        try {
-            return getFeedbackHandler().getChannelNames();
-        }
-        catch (IOException e) {
-            // Probably a serious problem due to local IO related to feedback. No cause to abort the sync so we just log
-            // it as an error.
-            logError("Exception while looking up feedback channel names.");
-        }
-        return Collections.emptySet();
+    protected void sendDeploymentCompletedEvent(Version from, Version to, boolean fixPackage, boolean success) {
+        Map<String, String> eventProps = new HashMap<String, String>();
+        eventProps.put("name", getIdentificationHandler().getAgentId());
+        eventProps.put("fromVersion", from.toString());
+        eventProps.put("toVersion", to.toString());
+        eventProps.put("fixPackage", Boolean.toString(fixPackage));
+        eventProps.put("successful", Boolean.toString(success));
+
+        getEventsHandler().postEvent(AGENT_DEPLOYMENT_COMPLETE, eventProps);
+    }
+
+    protected void sendDeploymentInstallEvent(Version from, Version to, boolean fixPackage) {
+        Map<String, String> eventProps = new HashMap<String, String>();
+        eventProps.put("name", getIdentificationHandler().getAgentId());
+        eventProps.put("fromVersion", from.toString());
+        eventProps.put("toVersion", to.toString());
+        eventProps.put("fixPackage", Boolean.toString(fixPackage));
+
+        getEventsHandler().postEvent(AGENT_DEPLOYMENT_INSTALL, eventProps);
     }
 
     private FeedbackChannel getFeedbackChannel(String name) {
@@ -232,83 +466,52 @@ public class DefaultController extends C
         return null;
     }
 
-    private void runAgentUpdate() throws RetryAfterException {
-        logDebug("Checking for agent update");
-
-        Version current = getAgentUpdateHandler().getInstalledVersion();
-        SortedSet<Version> available = getAvailableAgentVersions();
-        Version highest = Version.emptyVersion;
-        if (available != null && !available.isEmpty()) {
-            highest = available.last();
-        }
-
-        if (highest.compareTo(current) < 1) {
-            logDebug("No agent update available for version %s", current);
-            return;
-        }
-
-        logInfo("Installing agent update %s => %s", current, highest);
-
-        InputStream inputStream = null;
+    private Set<String> getFeedbackChannelNames() {
         try {
-            inputStream = getAgentUpdateHandler().getInputStream(highest);
-            getAgentUpdateHandler().install(inputStream);
+            return getFeedbackHandler().getChannelNames();
         }
         catch (IOException e) {
-            // Hopefully temporary problem due to remote IO or configuration. No cause to abort the sync so we
-            // just log it as a warning.
-            // FIXME Does not cover failed updates and should handle retries
-            logWarning("Exception while installing agent update %s", e, highest);
-        }
-        finally {
-            closeSilently(inputStream);
+            // Probably a serious problem due to local IO related to feedback. No cause to abort the sync so we just log
+            // it as an error.
+            logError("Exception while looking up feedback channel names.");
         }
+        return Collections.emptySet();
     }
 
-    private SortedSet<Version> getAvailableAgentVersions() throws RetryAfterException {
+    private Version getHighestAvailableAgentVersion() throws RetryAfterException {
+        SortedSet<Version> available = new TreeSet<Version>();
         try {
-            return getAgentUpdateHandler().getAvailableVersions();
+            available = getAgentUpdateHandler().getAvailableVersions();
         }
         catch (IOException e) {
             // Hopefully temporary problem due to remote IO or configuration. No cause to abort the sync so we just
             // log it as a warning.
             logWarning("Exception while retrieving agent versions", e);
         }
-        return new TreeSet<Version>();
-    }
-
-    private void runDeploymentUpdate() throws RetryAfterException {
-        logDebug("Checking for deployment update");
-
-        Version current = getDeploymentHandler().getInstalledVersion();
-        SortedSet<Version> available = getAvailableDeploymentVersions();
-        Version highest = Version.emptyVersion;
-        if (available != null && !available.isEmpty()) {
-            highest = available.last();
-        }
-
-        if (highest.compareTo(current) < 1) {
-            logDebug("No deployment update available for version %s", current);
-            return;
-        }
-
-        boolean updateStreaming = m_updateStreaming.get();
-        boolean fixPackage = m_fixPackage.get();
-        long maxRetries = m_maxRetries.get();
 
-        getUpdateInstaller(updateStreaming).installUpdate(current, highest, fixPackage, maxRetries);
+        return getHighestVersion(available);
     }
 
-    private SortedSet<Version> getAvailableDeploymentVersions() throws RetryAfterException {
+    private Version getHighestAvailableDeploymentVersion() throws RetryAfterException {
+        SortedSet<Version> available = new TreeSet<Version>();
         try {
-            return getDeploymentHandler().getAvailableVersions();
+            available = getDeploymentHandler().getAvailableVersions();
         }
         catch (IOException e) {
             // Hopefully temporary problem due to remote IO or configuration. No cause to abort the sync so we just
             // log it as a warning.
             logWarning("Exception while retrieving deployment versions", e);
         }
-        return new TreeSet<Version>();
+
+        return getHighestVersion(available);
+    }
+
+    private Version getHighestVersion(SortedSet<Version> available) {
+        Version highest = Version.emptyVersion;
+        if (available != null && !available.isEmpty()) {
+            highest = available.last();
+        }
+        return highest;
     }
 
     private UpdateInstaller getUpdateInstaller(boolean streaming) {
@@ -333,204 +536,78 @@ public class DefaultController extends C
         return m_updateInstaller;
     }
 
-    private void scheduleRun(long seconds) {
-        unscheduleRun();
-        m_scheduledFuture = getExecutorService().schedule(this, seconds, TimeUnit.SECONDS);
-    }
-
-    private void unscheduleRun() {
-        if (m_scheduledFuture != null)
-            m_scheduledFuture.cancel(false /* mayInterruptWhileRunning */);
-    }
+    private void runAgentUpdate() throws RetryAfterException {
+        logDebug("Checking for agent updates...");
 
-    /**
-     * Base class for internal installer strategies. This implementation handles max update retry contraints and
-     * delegates the rest to concrete implementations.
-     */
-    abstract static class UpdateInstaller {
-        private final DefaultController m_controller;
-        private Version m_lastVersion = null;
-        private int m_failureCount = 0;
+        AgentUpdateHandler agentUpdateHandler = getAgentUpdateHandler();
 
-        public UpdateInstaller(DefaultController controller) {
-            m_controller = controller;
-        }
+        Version current = agentUpdateHandler.getInstalledVersion();
+        Version highest = getHighestAvailableAgentVersion();
 
-        protected final DefaultController getController() {
-            return m_controller;
+        if (highest.compareTo(current) < 1) {
+            logDebug("No agent update available for version %s", current);
+            return;
         }
 
-        public final void installUpdate(Version fromVersion, Version toVersion, boolean fixPackage, long maxRetries) throws RetryAfterException {
-            if (m_lastVersion != null && toVersion.equals(m_lastVersion)) {
-                if (m_failureCount >= maxRetries) {
-                    getController().logInfo("Ignoring deployment update %s => %s because max retries reached %d", fromVersion, toVersion, maxRetries);
-                    return;
-                }
-            }
-            else {
-                m_lastVersion = toVersion;
-                m_failureCount = 0;
-            }
-            try {
-                doInstallUpdate(fromVersion, toVersion, fixPackage);
-            }
-            catch (RetryAfterException e) {
-                // The server is busy. Re-throw so the controller can abort the sync and reschedule.
-                throw (e);
+        logInfo("Installing agent update %s => %s", current, highest);
 
-            }
-            catch (DeploymentException e) {
-                getController().logWarning("Exception while deploying the package", e);
-                e.printStackTrace();
-                m_failureCount++;
-            }
-            catch (IOException e) {
-                getController().logWarning("Exception opening/streaming package inputstream", e);
-                e.printStackTrace();
-                m_failureCount++;
-            }
+        InputStream inputStream = null;
+        try {
+            inputStream = agentUpdateHandler.getInputStream(highest);
+            agentUpdateHandler.install(inputStream);
         }
-
-        public final void reset() {
-            m_lastVersion = null;
-            m_failureCount = 0;
-            doReset();
+        catch (IOException e) {
+            // Hopefully temporary problem due to remote IO or configuration. No cause to abort the sync so we
+            // just log it as a warning.
+            // FIXME Does not cover failed updates and should handle retries
+            logWarning("Exception while installing agent update %s", e, highest);
         }
-
-        protected abstract void doInstallUpdate(Version from, Version to, boolean fix) throws RetryAfterException, DeploymentException, IOException;
-
-        protected abstract void doReset();
-    }
-
-    /**
-     * UpdateInstaller that provides streaming deployment package install. The install is blocking.
-     */
-    static class StreamingUpdateInstaller extends UpdateInstaller {
-        public StreamingUpdateInstaller(DefaultController controller) {
-            super(controller);
+        finally {
+            closeSilently(inputStream);
         }
+    }
 
-        @Override
-        public void doInstallUpdate(Version from, Version to, boolean fix) throws RetryAfterException, DeploymentException, IOException {
-            getController().logInfo("Installing streaming deployment update %s => %s", from, to);
+    private void runDeploymentUpdate() throws RetryAfterException {
+        logDebug("Checking for deployment updates...");
 
-            DeploymentHandler deploymentHandler = getController().getDeploymentHandler();
-            InputStream inputStream = null;
-            try {
-                inputStream = deploymentHandler.getInputStream(to, fix);
-                deploymentHandler.deployPackage(inputStream);
-                return;
-            }
-            finally {
-                if (inputStream != null) {
-                    try {
-                        inputStream.close();
-                    }
-                    catch (Exception e) {
-                        getController().logWarning("Exception while closing streaming package inputstream", e);
-                    }
-                }
-            }
-        }
+        DeploymentHandler deploymentHandler = getDeploymentHandler();
 
-        @Override
-        protected void doReset() {
-            // Nop
-        }
-    }
+        Version current = deploymentHandler.getInstalledVersion();
+        Version highest = getHighestAvailableDeploymentVersion();
 
-    /**
-     * UpdateInstaller that provides download deployment package install. The install is non-blocking. Upon download
-     * completion this installer will reschedule the controller.
-     */
-    static class DownloadUpdateInstaller extends UpdateInstaller implements DownloadHandle.ProgressListener, DownloadHandle.ResultListener {
-        // active download state
-        private volatile DownloadHandle m_downloadHandle;
-        private volatile DownloadResult m_downloadResult = null;
-        private volatile Version m_downloadVersion;
-        private volatile long m_downloadLength = 0;
-        private volatile long m_downloadProgress = 0;
+        long maxRetries = m_maxRetries.get();
+        boolean updateUsingStreams = m_updateStreaming.get();
+        
+        UpdateInstaller updateInstaller = getUpdateInstaller(updateUsingStreams);
+        if (updateInstaller.canInstallUpdate(current, highest, maxRetries)) {
+            boolean fixPackage = m_fixPackage.get();
 
-        public DownloadUpdateInstaller(DefaultController controller) {
-            super(controller);
+            updateInstaller.installUpdate(current, highest, fixPackage);
         }
+    }
 
-        @Override
-        public void doInstallUpdate(Version fromVersion, Version toVersion, boolean fixPackage) throws RetryAfterException, DeploymentException, IOException {
-
-            DeploymentHandler deploymentHandler = getController().getDeploymentHandler();
-            if (m_downloadHandle != null && !m_downloadVersion.equals(toVersion)) {
-                getController().logInfo("Cancelling deployment package download for %s because a newer version is available", m_downloadVersion);
-                m_downloadHandle.discard();
-                m_downloadHandle = null;
-            }
+    private void runFeedback() throws RetryAfterException {
+        logDebug("Synchronizing feedback channels");
 
-            if (m_downloadHandle == null) {
-                getController().logInfo("Starting deployment package download %s => %s", fromVersion, toVersion);
-                m_downloadVersion = toVersion;
-                m_downloadHandle = deploymentHandler.getDownloadHandle(toVersion, fixPackage)
-                    .setProgressListener(this).setCompletionListener(this).start();
-            }
-            else {
-                if (m_downloadResult == null) {
-                    getController().logInfo("Deployment package download for %s is in progress %d / %d", toVersion, m_downloadProgress, m_downloadLength);
-                }
-                else if (m_downloadResult.getState() == DownloadState.FAILED) {
-                    getController().logWarning("Deployment package download for %s is FAILED. Clearing for retry");
-                    m_downloadHandle.discard();
-                    m_downloadHandle = null;
-                    throw new IOException("Download failed");
+        Set<String> names = getFeedbackChannelNames();
+        for (String name : names) {
+            FeedbackChannel channel = getFeedbackChannel(name);
+            if (channel != null) {
+                try {
+                    channel.sendFeedback();
+                    logDebug("Feedback send succesfully for channel: %s", name);
                 }
-                else if (m_downloadResult.getState() == DownloadState.STOPPED) {
-                    getController().logWarning("Deployment package download for %s is STOPPED. Trying to resume");
-                    m_downloadResult = null;
-                    m_downloadHandle.start();
-                }
-                else if (m_downloadResult.getState() == DownloadState.SUCCESSFUL) {
-                    getController().logInfo("Installing downloaded deployment update %s => %s", fromVersion, toVersion);
-                    InputStream inputStream = new FileInputStream(m_downloadResult.getFile());
-                    System.out.println(m_downloadResult.getFile().getAbsolutePath());
-                    try {
-                        deploymentHandler.deployPackage(inputStream);
-                    }
-                    finally {
-                        // m_downloadHandle.discard();
-                        m_downloadHandle = null;
-                        inputStream.close();
-                    }
+                catch (IOException e) {
+                    // Hopefully temporary problem due to remote IO or configuration. No cause to abort the sync so we
+                    // just log it as a warning.
+                    logWarning("Exception while sending feedback on channel: %s", e, name);
                 }
             }
         }
+    }
 
-        @Override
-        public void doReset() {
-            if (m_downloadHandle != null) {
-                getController().logInfo("Cancelling deployment package download for version %s because of reset", m_downloadVersion);
-                m_downloadHandle.discard();
-            }
-            clearDownloadState();
-        }
-
-        @Override
-        public void progress(long contentLength, long progress) {
-            m_downloadLength = contentLength;
-            m_downloadProgress = progress;
-        }
-
-        @Override
-        public void completed(DownloadResult result) {
-            m_downloadResult = result;
-            getController().logInfo("Deployment package download completed for version %s. Rescheduling the controller to run in %d seconds", m_downloadVersion, 1);
-            getController().scheduleRun(1);
-        }
-
-        private void clearDownloadState() {
-            if (m_downloadHandle != null) {
-                m_downloadHandle.discard();
-            }
-            m_downloadHandle = null;
-            m_downloadResult = null;
-            m_downloadVersion = null;
-        }
+    private void unscheduleRun() {
+        if (m_scheduledFuture != null)
+            m_scheduledFuture.cancel(false /* mayInterruptWhileRunning */);
     }
 }

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadResultImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadResultImpl.java?rev=1521918&r1=1521917&r2=1521918&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadResultImpl.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadResultImpl.java Wed Sep 11 16:04:54 2013
@@ -19,6 +19,9 @@
 package org.apache.ace.agent.impl;
 
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.List;
 import java.util.Map;
 
@@ -47,8 +50,9 @@ public class DownloadResultImpl implemen
     }
 
     @Override
-    public File getFile() {
-        return m_file;
+    @SuppressWarnings("resource")
+    public InputStream getInputStream() throws IOException {
+        return m_file != null ? new FileInputStream(m_file) : null;
     }
 
     @Override

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventLoggerImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventLoggerImpl.java?rev=1521918&r1=1521917&r2=1521918&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventLoggerImpl.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventLoggerImpl.java Wed Sep 11 16:04:54 2013
@@ -231,7 +231,7 @@ public class EventLoggerImpl extends Com
                 channel.write(eventType, payload);
             }
             else {
-                logWarning("Feedback event *not* written as no channel is available!");
+//                logDebug("Feedback event *not* written as no channel is available!");
             }
         }
         catch (IOException e) {

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventsHandlerImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventsHandlerImpl.java?rev=1521918&r1=1521917&r2=1521918&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventsHandlerImpl.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventsHandlerImpl.java Wed Sep 11 16:04:54 2013
@@ -18,6 +18,7 @@
  */
 package org.apache.ace.agent.impl;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 
@@ -51,13 +52,15 @@ public class EventsHandlerImpl extends C
     }
 
     @Override
-    public void postEvent(final String topic, final Map<String, String> payload) {
+    public void postEvent(final String topic, Map<String, String> payload) {
+        // Make sure that the payload isn't changed while posting events...
+        final Map<String, String> eventPayload = new HashMap<String, String>(payload);
         for (final EventListener listener : m_listeners) {
             getExecutorService().submit(new Runnable() {
                 @Override
                 public void run() {
                     try {
-                        listener.handle(topic, payload);
+                        listener.handle(topic, eventPayload);
                     }
                     catch (Exception e) {
                         logWarning("Exception while posting event", e);
@@ -74,9 +77,11 @@ public class EventsHandlerImpl extends C
 
     @Override
     public void sendEvent(String topic, Map<String, String> payload) {
+        // Make sure that the payload isn't changed while sending events...
+        final Map<String, String> eventPayload = new HashMap<String, String>(payload);
         for (EventListener listener : m_listeners) {
             try {
-                listener.handle(topic, payload);
+                listener.handle(topic, eventPayload);
             }
             catch (Exception e) {
                 logWarning("Exception while sending event", e);

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/InternalConstants.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/InternalConstants.java?rev=1521918&r1=1521917&r2=1521918&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/InternalConstants.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/InternalConstants.java Wed Sep 11 16:04:54 2013
@@ -27,5 +27,12 @@ public interface InternalConstants {
      * Event topic used to report changes in the agent's configuration.
      */
     String AGENT_CONFIG_CHANGED = "agent/config/CHANGED";
-
+    /**
+     * Internal event topic used when a deployment is started.
+     */
+    String AGENT_DEPLOYMENT_INSTALL = "agent/deployment/INSTALL";
+    /**
+     * Internal event topic used when a deployment is complete (either or not successful).
+     */
+    String AGENT_DEPLOYMENT_COMPLETE = "agent/deployment/COMPLETE";
 }

Modified: ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/CustomControllerTest.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/CustomControllerTest.java?rev=1521918&r1=1521917&r2=1521918&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/CustomControllerTest.java (original)
+++ ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/CustomControllerTest.java Wed Sep 11 16:04:54 2013
@@ -82,7 +82,7 @@ public class CustomControllerTest extend
 
         DownloadResult downloadResult = addTestMock(DownloadResult.class);
         expect(downloadResult.getState()).andReturn(DownloadState.SUCCESSFUL).anyTimes();
-        expect(downloadResult.getFile()).andReturn(m_dummyFile).anyTimes();
+        expect(downloadResult.getInputStream()).andReturn(m_dummyInputStream).anyTimes();
 
         DownloadHandle downloadHandle = addTestMock(DownloadHandle.class);
         expect(downloadHandle.start()).andReturn(downloadHandle).anyTimes();
@@ -122,7 +122,7 @@ public class CustomControllerTest extend
             DownloadResult result = handle.start().result();
 
             if (result.getState() == DownloadState.SUCCESSFUL) {
-                InputStream inputStream = new FileInputStream(result.getFile());
+                InputStream inputStream = result.getInputStream();
                 try {
                     m_agentControl.getDeploymentHandler().deployPackage(inputStream);
                 }

Modified: ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/DownloadHandlerTest.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/DownloadHandlerTest.java?rev=1521918&r1=1521917&r2=1521918&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/DownloadHandlerTest.java (original)
+++ ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/DownloadHandlerTest.java Wed Sep 11 16:04:54 2013
@@ -24,9 +24,9 @@ import static org.testng.Assert.assertNu
 import static org.testng.Assert.assertSame;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.math.BigInteger;
 import java.net.URL;
 import java.security.DigestInputStream;
@@ -219,28 +219,28 @@ public class DownloadHandlerTest extends
     private static void assertSuccessFul(final DownloadResult result, int statusCode, String digest) throws Exception {
         assertEquals(result.getState(), DownloadState.SUCCESSFUL, "Expected state SUCCESSFUL after succesful completion");
         assertEquals(result.getCode(), statusCode, "Expected statusCode " + statusCode + " after successful completion");
-        assertNotNull(result.getFile(), "Expected non null file after successful completion");
+        assertNotNull(result.getInputStream(), "Expected non null file after successful completion");
         assertNotNull(result.getHeaders(), "Expected non null headers after successful completion");
         assertNull(result.getCause(), "Excpected null cause after successful completion");
-        assertEquals(getDigest(result.getFile()), digest, "Expected same digest after successful completion");
+        assertEquals(getDigest(result.getInputStream()), digest, "Expected same digest after successful completion");
     }
 
     private static void assertFailed(final DownloadResult result, int statusCode) throws Exception {
         assertEquals(result.getState(), DownloadState.FAILED, "DownloadState must be FAILED after failed completion");
         assertEquals(result.getCode(), statusCode, "Expected statusCode " + statusCode + " after failed completion");
-        assertNull(result.getFile(), "File must not be null after failed completion");
+        assertNull(result.getInputStream(), "File must not be null after failed completion");
     }
 
     private static void assertStopped(final DownloadResult result, int statusCode) throws Exception {
         assertEquals(result.getState(), DownloadState.STOPPED, "DownloadState must be STOPPED after stopped completion");
         assertEquals(result.getCode(), statusCode, "Expected statusCode " + statusCode + " after stopped completion");
         assertNotNull(result.getHeaders(), "Expected headers not to be null after stopped completion");
-        assertNull(result.getFile(), "File must not be null after failed download");
+        assertNull(result.getInputStream(), "File must not be null after failed download");
         assertNull(result.getCause(), "Excpected cause to null null after stopped completion");
     }
 
-    private static String getDigest(File file) throws Exception {
-        DigestInputStream dis = new DigestInputStream(new FileInputStream(file), MessageDigest.getInstance("MD5"));
+    private static String getDigest(InputStream is) throws Exception {
+        DigestInputStream dis = new DigestInputStream(is, MessageDigest.getInstance("MD5"));
         while (dis.read() != -1) {
         }
         dis.close();



Mime
View raw message