ace-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject svn commit: r1524032 [1/2] - in /ace/trunk: org.apache.ace.agent.itest/src/org/apache/ace/agent/itest/ org.apache.ace.agent.update.itest/src/org/apache/ace/agent/itest/ org.apache.ace.agent/src/org/apache/ace/agent/ org.apache.ace.agent/src/org/apache/...
Date Tue, 17 Sep 2013 13:14:33 GMT
Author: jawi
Date: Tue Sep 17 13:14:32 2013
New Revision: 1524032

URL: http://svn.apache.org/r1524032
Log:
ACA-323 - improve MA update logic:

- align the logic for handling content-range headers in both the 
  streaming as downloading variant by pushing all of this logic
  down to a separate input stream implementation;
- added some test cases for the various content-range scenarios;
- refactored the agent update itest to use latches instead of busy-
  wait loops.


Added:
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ContentRangeInputStream.java   (with props)
    ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/ContentRangeInputStreamTest.java   (with props)
Modified:
    ace/trunk/org.apache.ace.agent.itest/src/org/apache/ace/agent/itest/AgentDeploymentTest.java
    ace/trunk/org.apache.ace.agent.update.itest/src/org/apache/ace/agent/itest/AgentUpdateTest.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadHandle.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadResult.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadState.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/RetryAfterException.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ConnectionUtil.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DeploymentHandlerImpl.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadCallableImpl.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadHandleImpl.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadResultImpl.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/UpdateHandlerBase.java
    ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/CustomControllerTest.java
    ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/DownloadHandlerTest.java

Modified: ace/trunk/org.apache.ace.agent.itest/src/org/apache/ace/agent/itest/AgentDeploymentTest.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent.itest/src/org/apache/ace/agent/itest/AgentDeploymentTest.java?rev=1524032&r1=1524031&r2=1524032&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent.itest/src/org/apache/ace/agent/itest/AgentDeploymentTest.java (original)
+++ ace/trunk/org.apache.ace.agent.itest/src/org/apache/ace/agent/itest/AgentDeploymentTest.java Tue Sep 17 13:14:32 2013
@@ -19,11 +19,11 @@
 package org.apache.ace.agent.itest;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintWriter;
+import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -61,7 +61,7 @@ import org.osgi.service.http.HttpService
 public class AgentDeploymentTest extends BaseAgentTest {
 
     private enum Failure {
-        EMPTY_STREAM, CORRUPT_STREAM, ABORT_STREAM, VERSIONS_RETRY_AFTER, DEPLOYMENT_RETRY_AFTER
+        EMPTY_STREAM, CORRUPT_STREAM, ABORT_STREAM, VERSIONS_RETRY_AFTER, DEPLOYMENT_RETRY_AFTER, CONTENT_RANGE
     }
 
     private static class TestAuditlogServlet extends HttpServlet {
@@ -136,11 +136,11 @@ public class AgentDeploymentTest extends
                 if (dpackage == null) {
                     throw new IllegalStateException("Test error! Should never happen... " + pathinfoTail);
                 }
-                sendPackage(dpackage, resp);
+                sendPackage(dpackage, req, resp);
             }
         }
 
-        private void sendPackage(TestPackage dpackage, HttpServletResponse resp) throws IOException {
+        private void sendPackage(TestPackage dpackage, HttpServletRequest req, HttpServletResponse resp) throws IOException {
             if (m_failure == Failure.DEPLOYMENT_RETRY_AFTER) {
                 resp.addHeader("Retry-After", BACKOFF_TIME);
                 resp.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Simulated server overload");
@@ -148,11 +148,34 @@ public class AgentDeploymentTest extends
                 return;
             }
 
-            long middle = dpackage.getFile().length() / 2;
-            FileInputStream fis = null;
+            final long fileLength = dpackage.getFile().length();
+            final long middle = fileLength / 2;
+
+            long start = 0L;
+            long end = fileLength;
+
+            if (m_failure == Failure.CONTENT_RANGE) {
+                String rangeHdr = req.getHeader("Range");
+                if (rangeHdr != null && rangeHdr.startsWith("bytes=")) {
+                    // Continuation...
+                    String[] range = rangeHdr.substring(6).split("-");
+
+                    start = Long.parseLong(range[0]);
+                }
+                else {
+                    // Initial chuck...
+                    end = fileLength / 2;
+                }
+
+                resp.addHeader("Content-Range", String.format("bytes %d-%d/%d", start, end, fileLength));
+
+                resp.setStatus(206); // partial
+            }
+
+            RandomAccessFile raf = null;
             OutputStream os = null;
             try {
-                fis = new FileInputStream(dpackage.getFile());
+                raf = new RandomAccessFile(dpackage.getFile(), "r");
                 os = resp.getOutputStream();
 
                 if (m_failure == Failure.EMPTY_STREAM) {
@@ -163,18 +186,21 @@ public class AgentDeploymentTest extends
                     os.write("garbage".getBytes());
                 }
 
+                if (m_failure == Failure.CONTENT_RANGE) {
+                    raf.seek(start);
+                }
+
                 int b;
                 int count = 0;
-                while ((b = fis.read()) != -1) {
+                while (count < (end - start) && (b = raf.read()) != -1) {
                     os.write(b);
                     if (count++ == middle && m_failure == Failure.ABORT_STREAM) {
                         break;
                     }
                 }
-
             }
             finally {
-                fis.close();
+                raf.close();
                 if (os != null) {
                     os.close();
                 }
@@ -423,6 +449,15 @@ public class AgentDeploymentTest extends
     }
 
     /**
+     * Tests the deployment of "non-streamed" deployment packages in various situations.
+     */
+    public void testNonStreamingDeployment_ChunkedContentRange() throws Exception {
+        setupAgentForNonStreamingDeployment();
+
+        expectSuccessfulDeployment(m_package6, Failure.CONTENT_RANGE);
+    }
+
+    /**
      * Tests the deployment of "streamed" deployment packages in various situations.
      */
     public void testStreamingDeployment() throws Exception {
@@ -477,13 +512,22 @@ public class AgentDeploymentTest extends
     }
 
     /**
+     * Tests the deployment of "streamed" deployment packages in various situations.
+     */
+    public void testStreamingDeployment_ChunkedContentRange() throws Exception {
+        setupAgentForStreamingDeployment();
+
+        expectSuccessfulDeployment(m_package1, Failure.CONTENT_RANGE);
+    }
+
+    /**
      * Tests the deployment of "streamed" deployment packages simulating an "unstable" connection.
      */
     public void testStreamingDeploymentWithUnstableConnection() throws Exception {
         setupAgentForStreamingDeployment();
 
         expectSuccessfulDeployment(m_package1, null);
-        
+
         expectFailedDeployment(m_package6, Failure.EMPTY_STREAM);
         waitForInstalledVersion(V1_0_0);
 
@@ -615,7 +659,7 @@ public class AgentDeploymentTest extends
     }
 
     private void waitForEventReceived(String topic) throws Exception {
-        int timeout = 100;
+        int timeout = 10000;
         while (!m_listener.containsTopic(topic)) {
             Thread.sleep(100);
             if (timeout-- <= 0) {

Modified: ace/trunk/org.apache.ace.agent.update.itest/src/org/apache/ace/agent/itest/AgentUpdateTest.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent.update.itest/src/org/apache/ace/agent/itest/AgentUpdateTest.java?rev=1524032&r1=1524031&r2=1524032&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent.update.itest/src/org/apache/ace/agent/itest/AgentUpdateTest.java (original)
+++ ace/trunk/org.apache.ace.agent.update.itest/src/org/apache/ace/agent/itest/AgentUpdateTest.java Tue Sep 17 13:14:32 2013
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintWriter;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.jar.Attributes;
 import java.util.jar.JarEntry;
 import java.util.jar.JarInputStream;
@@ -52,7 +54,6 @@ import org.osgi.service.http.HttpService
  * </ul>
  */
 public class AgentUpdateTest extends IntegrationTestBase {
-
     private volatile HttpService m_http;
     private volatile AgentUpdateOBRServlet m_servlet;
 
@@ -60,10 +61,6 @@ public class AgentUpdateTest extends Int
         CORRUPT_STREAM, BUNDLE_DOES_NOT_RESOLVE, BUNDLE_DOES_NOT_START, BUNDLE_WORKS
     }
 
-    private enum PhaseStatus {
-        ACTIVE, DONE
-    }
-
     @Override
     protected Component[] getDependencies() {
         return new Component[] {
@@ -85,33 +82,23 @@ public class AgentUpdateTest extends Int
     }
 
     public void testAgentUpdate() throws Exception {
+        final int defaultTimeout = 15;
 
-        int timeout = 50;
-        m_servlet.setPhase(Phase.CORRUPT_STREAM);
-        while (m_servlet.getPhaseStatus() == PhaseStatus.ACTIVE) {
-            Thread.sleep(200);
-            if (timeout-- <= 0) {
-                fail("Timed out while recovering from update with broken stream.");
-            }
-        }
-        timeout = 50;
-        m_servlet.setPhase(Phase.BUNDLE_DOES_NOT_RESOLVE);
-        while (m_servlet.getPhaseStatus() == PhaseStatus.ACTIVE) {
-            Thread.sleep(200);
-            if (timeout-- <= 0) {
-                fail("Timed out while recovering from update with agent that does not resolve.");
-            }
-        }
-        timeout = 50;
-        m_servlet.setPhase(Phase.BUNDLE_DOES_NOT_START);
-        while (m_servlet.getPhaseStatus() == PhaseStatus.ACTIVE) {
-            Thread.sleep(200);
-            if (timeout-- <= 0) {
-                fail("Timed out while recovering from update with agent that does not start.");
-            }
-        }
-        timeout = 50;
-        m_servlet.setPhase(Phase.BUNDLE_WORKS);
+        CountDownLatch latch;
+
+        latch = m_servlet.setPhase(Phase.CORRUPT_STREAM, new CountDownLatch(1));
+        assertTrue("Timed out while recovering from update with broken stream.", latch.await(defaultTimeout, TimeUnit.SECONDS));
+
+        latch = m_servlet.setPhase(Phase.BUNDLE_DOES_NOT_RESOLVE, new CountDownLatch(1));
+        assertTrue("Timed out while recovering from update with agent that does not resolve.", latch.await(defaultTimeout, TimeUnit.SECONDS));
+
+        latch = m_servlet.setPhase(Phase.BUNDLE_DOES_NOT_START, new CountDownLatch(1));
+        assertTrue("Timed out while recovering from update with agent that does not start.", latch.await(defaultTimeout, TimeUnit.SECONDS));
+
+        latch = m_servlet.setPhase(Phase.BUNDLE_WORKS, new CountDownLatch(1));
+        assertTrue("Timed out while starting working bundle?!", latch.await(defaultTimeout, TimeUnit.SECONDS));
+
+        int timeout = defaultTimeout;
         while (timeout-- > 0) {
             Thread.sleep(200);
             for (Bundle b : m_bundleContext.getBundles()) {
@@ -126,10 +113,9 @@ public class AgentUpdateTest extends Int
     }
 
     private static class AgentUpdateOBRServlet extends HttpServlet {
-
         private static final long serialVersionUID = 1L;
         private Phase m_phase;
-        private PhaseStatus m_phaseStatus;
+        private CountDownLatch m_latch;
 
         @Override
         protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
@@ -154,13 +140,11 @@ public class AgentUpdateTest extends Int
             }
         }
 
-        public synchronized void setPhase(Phase phase) {
+        public synchronized CountDownLatch setPhase(Phase phase, CountDownLatch latch) {
             m_phase = phase;
-            m_phaseStatus = PhaseStatus.ACTIVE;
-        }
-
-        public synchronized PhaseStatus getPhaseStatus() {
-            return m_phaseStatus;
+            m_latch = latch;
+            System.out.println("Updating in phase: " + phase);
+            return latch;
         }
 
         private InputStream getBundle() throws IOException {
@@ -195,10 +179,10 @@ public class AgentUpdateTest extends Int
             jis.close();
             jos.close();
             if (m_phase == Phase.BUNDLE_WORKS && "2.0.0".equals(version)) {
-                m_phaseStatus = PhaseStatus.DONE;
+                m_latch.countDown();
             }
             if (m_phase != Phase.BUNDLE_WORKS && "1.0.0".equals(version)) {
-                m_phaseStatus = PhaseStatus.DONE;
+                m_latch.countDown();
             }
         }
     }

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadHandle.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadHandle.java?rev=1524032&r1=1524031&r2=1524032&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadHandle.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadHandle.java Tue Sep 17 13:14:32 2013
@@ -18,7 +18,7 @@
  */
 package org.apache.ace.agent;
 
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Future;
 
 /**
  * A {@link DownloadHandle} provides control over an asynchronous download and access to the resulting file when the it
@@ -46,31 +46,14 @@ public interface DownloadHandle {
          *            The total length of the content or -1 if unknown.
          */
         void progress(long bytesRead, long totalBytes);
-
-        /**
-         * Called when a download terminates.
-         * 
-         * @param result
-         *            The result of the download.
-         */
-        void completed(DownloadResult result);
     }
 
     /**
      * Starts the download, reporting the result and progress to the supplied listeners.
-     */
-    void start(DownloadProgressListener listener);
-
-    /**
-     * Convenience method to start the download and block until it is finished.
      * 
-     * @param timeout
-     *            the timeout to wait for a result;
-     * @param unit
-     *            the unit of the timeout to wait for a result.
-     * @return the download result, never <code>null</code>.
+     * @return a future promise for the download result, never <code>null</code>.
      */
-    DownloadResult startAndAwaitResult(long timeout, TimeUnit unit) throws InterruptedException;
+    Future<DownloadResult> start(DownloadProgressListener listener);
 
     /**
      * Pauses the download.

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadResult.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadResult.java?rev=1524032&r1=1524031&r2=1524032&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadResult.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadResult.java Tue Sep 17 13:14:32 2013
@@ -26,14 +26,6 @@ import java.io.InputStream;
  * 
  */
 public interface DownloadResult {
-
-    /**
-     * Returns the state of the result.
-     * 
-     * @return The state
-     */
-    DownloadState getState();
-
     /**
      * Returns an input stream to the downloaded result.
      * 
@@ -42,14 +34,7 @@ public interface DownloadResult {
     InputStream getInputStream() throws IOException;
 
     /**
-     * @return the result code
-     */
-    int getCode();
-
-    /**
-     * Return the cause of an unsuccessful download.
-     * 
-     * @return The cause, <code>null</code> if the download was successful
+     * @return <code>true</code> if the download is complete, <code>false</code> if not.
      */
-    Throwable getCause();
+    boolean isComplete();
 }

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadState.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadState.java?rev=1524032&r1=1524031&r2=1524032&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadState.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/DownloadState.java Tue Sep 17 13:14:32 2013
@@ -29,9 +29,5 @@ public enum DownloadState {
     /**
      * The handle completed because it was stopped.
      */
-    STOPPED,
-    /**
-     * The handle completed due to an unrecoverable error. It can not be resumed.
-     */
-    FAILED
+    STOPPED;
 }

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/RetryAfterException.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/RetryAfterException.java?rev=1524032&r1=1524031&r2=1524032&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/RetryAfterException.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/RetryAfterException.java Tue Sep 17 13:14:32 2013
@@ -18,12 +18,14 @@
  */
 package org.apache.ace.agent;
 
+import java.io.IOException;
+
 /**
  * Exception that indicates that the upstream server responded with a Retry-After.
  */
-public class RetryAfterException extends Exception {
-
+public class RetryAfterException extends IOException {
     private static final long serialVersionUID = 1L;
+
     private final int m_seconds;
 
     public RetryAfterException(int seconds) {
@@ -31,7 +33,12 @@ public class RetryAfterException extends
         m_seconds = seconds;
     }
 
-    public int getSeconds() {
+    /**
+     * Returns the time to "back off" from accessing the server.
+     * 
+     * @return a back off time, in seconds.
+     */
+    public int getBackoffTime() {
         return m_seconds;
     }
 }

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ConnectionUtil.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ConnectionUtil.java?rev=1524032&r1=1524031&r2=1524032&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ConnectionUtil.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ConnectionUtil.java Tue Sep 17 13:14:32 2013
@@ -34,11 +34,12 @@ class ConnectionUtil {
     /**
      * The HTTP header indicating the 'backoff' time to use. See section 14.37 of HTTP1.1 spec (RFC2616).
      */
-    private static final String HTTP_RETRY_AFTER = "Retry-After";
+    public static final String HTTP_RETRY_AFTER = "Retry-After";
     /**
      * Default backoff time, in seconds.
      */
-    private static final int DEFAULT_RETRY_TIME = 30;
+    public static final int DEFAULT_RETRY_TIME = 30;
+
     /** Default buffer size for use in stream-copying, in bytes. */
     private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
 
@@ -57,12 +58,13 @@ class ConnectionUtil {
         int responseCode = getResponseCode(connection);
         switch (responseCode) {
             case 200:
+            case 206:
                 return;
             case 503:
                 int retry = ((HttpURLConnection) connection).getHeaderFieldInt(HTTP_RETRY_AFTER, DEFAULT_RETRY_TIME);
                 throw new RetryAfterException(retry);
             default:
-                throw new IOException("Unable to handle server responsecode: " + responseCode);
+                throw new IOException("Unable to handle server responsecode: " + responseCode + ", for " + connection.getURL());
         }
     }
 
@@ -71,14 +73,20 @@ class ConnectionUtil {
      * 
      * @param connection
      *            the URL connection to close, can be <code>null</code> in which case this method does nothing.
+     * @return always <code>null</code>, for easy chaining.
      */
-    public static void close(URLConnection connection) {
+    public static URLConnection close(URLConnection connection) {
         if (connection instanceof HttpURLConnection) {
             ((HttpURLConnection) connection).disconnect();
         }
+        return null;
     }
 
-    public static void closeSilently(Closeable closeable) {
+    /**
+     * @param closeable
+     * @return always <code>null</code>, for easy chaining.
+     */
+    public static Closeable closeSilently(Closeable closeable) {
         try {
             if (closeable != null) {
                 closeable.close();
@@ -87,6 +95,7 @@ class ConnectionUtil {
         catch (IOException exception) {
             // Ignore...
         }
+        return null;
     }
 
     public static void copy(InputStream is, OutputStream os) throws IOException {

Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ContentRangeInputStream.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ContentRangeInputStream.java?rev=1524032&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ContentRangeInputStream.java (added)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ContentRangeInputStream.java Tue Sep 17 13:14:32 2013
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.agent.impl;
+
+import static org.apache.ace.agent.impl.ConnectionUtil.*;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+import org.apache.ace.agent.ConnectionHandler;
+import org.apache.ace.agent.RetryAfterException;
+
+/**
+ * Abstraction for {@link HttpURLConnection}s that might use content range headers, or partial responses, to return the
+ * contents of an URL.
+ */
+class ContentRangeInputStream extends InputStream {
+    private static final String HDR_CONTENT_RANGE = "Content-Range";
+    private static final String HDR_RANGE = "Range";
+
+    private static final String BYTES = "bytes";
+    private static final String BYTES_ = BYTES.concat(" ");
+
+    private static final int SC_OK = 200;
+    private static final int SC_PARTIAL_CONTENT = 206;
+    private static final int SC_SERVICE_UNAVAILABLE = 503;
+
+    private static final int ST_EOF = -1;
+    private static final int ST_INITIAL = 0;
+    private static final int ST_OPEN = 1;
+    private static final int ST_CLOSED = 2;
+
+    private final ConnectionHandler m_handler;
+    private final URL m_url;
+    private final int m_chunkSize;
+
+    // see ST_* constants...
+    private volatile int m_state;
+    private volatile HttpURLConnection m_conn;
+    // administration...
+    private volatile long m_readTotal;
+    private volatile long m_readChunk;
+    private volatile long[] m_contentInfo;
+
+    /**
+     * Creates a new {@link ContentRangeInputStream} instance.
+     * 
+     * @param handler
+     *            the connection handler to use, cannot be <code>null</code>;
+     * @param url
+     *            the URL to connect to, cannot be <code>null</code>.
+     */
+    public ContentRangeInputStream(ConnectionHandler handler, URL url) {
+        this(handler, url, 0L);
+    }
+
+    /**
+     * Creates a new {@link ContentRangeInputStream} instance.
+     * 
+     * @param handler
+     *            the connection handler to use, cannot be <code>null</code>;
+     * @param url
+     *            the URL to connect to, cannot be <code>null</code>;
+     * @param startOffset
+     *            the starting offset to start reading from, >= 0.
+     */
+    public ContentRangeInputStream(ConnectionHandler handler, URL url, long startOffset) {
+        this(handler, url, startOffset, -1);
+    }
+
+    /**
+     * Creates a new {@link ContentRangeInputStream} instance.
+     * 
+     * @param handler
+     *            the connection handler to use, cannot be <code>null</code>;
+     * @param url
+     *            the URL to connect to, cannot be <code>null</code>;
+     * @param startOffset
+     *            the starting offset to start reading from, >= 0;
+     * @param chunkSize
+     *            the number of bytes to request in each chunk, use <tt>-1</tt> to read as many bytes as possible in
+     *            each chunk.
+     */
+    public ContentRangeInputStream(ConnectionHandler handler, URL url, long startOffset, int chunkSize) {
+        if (handler == null) {
+            throw new IllegalArgumentException("Handler cannot be null!");
+        }
+        if (url == null) {
+            throw new IllegalArgumentException("URL cannot be null!");
+        }
+        m_handler = handler;
+        m_url = url;
+
+        m_state = ST_INITIAL;
+        m_readTotal = startOffset;
+        m_chunkSize = chunkSize;
+    }
+
+    @Override
+    public int available() throws IOException {
+        assertOpen();
+
+        if (!prepareNextChunk()) {
+            return 0;
+        }
+
+        long chunkSize = m_contentInfo[0];
+        if (chunkSize < 0) {
+            // size not available (yet)...
+            return 0;
+        }
+        return (int) ((chunkSize - m_readChunk) & 0xFFFFFFFFL);
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (m_state != ST_CLOSED) {
+            m_state = ST_CLOSED;
+
+            closeChunk();
+        }
+    }
+
+    /**
+     * Returns the total content size, if available.
+     * 
+     * @return the total length (in bytes) of the content, or <tt>-1</tt> if no content size is known.
+     * @throws IOException
+     *             in case of I/O errors, such as when this stream is already closed.
+     */
+    public long getContentSize() throws IOException {
+        assertOpen();
+
+        if (m_contentInfo == null || m_contentInfo.length < 2) {
+            return -1L;
+        }
+        return m_contentInfo[1];
+    }
+
+    @Override
+    public int read() throws IOException {
+        assertOpen();
+
+        if (!prepareNextChunk()) {
+            return -1;
+        }
+
+        InputStream is = m_conn.getInputStream();
+
+        int result = is.read();
+        if (result > 0) {
+            m_readChunk++;
+            m_readTotal++;
+        }
+        // End of chunk?!
+        if ((m_contentInfo[0] > 0) && (m_readChunk >= m_contentInfo[0])) {
+            closeChunk();
+        }
+
+        return result;
+    }
+
+    /**
+     * {@inheritDoc}
+     * <p>
+     * Overridden for efficiency reasons.
+     * </p>
+     */
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        assertOpen();
+
+        if (!prepareNextChunk()) {
+            return -1;
+        }
+
+        InputStream is = m_conn.getInputStream();
+
+        int read = is.read(b, off, len);
+        if (read >= 0) {
+            m_readChunk += read;
+            m_readTotal += read;
+        }
+        // End of chunk?!
+        if ((m_contentInfo[0] > 0) && (m_readChunk >= m_contentInfo[0])) {
+            closeChunk();
+        }
+        return read;
+    }
+
+    /**
+     * Adds the HTTP-Range header to a given URL connection.
+     * 
+     * @param conn
+     *            the URL connection to add the HTTP-Range header to, cannot be <code>null</code>.
+     */
+    private void applyRangeHeader(HttpURLConnection conn) {
+        if (m_readTotal > 0L || m_chunkSize > 0) {
+            String rangeHeader;
+            if (m_chunkSize > 0) {
+                rangeHeader = String.format("%s=%d-%d", BYTES, m_readTotal, (m_readTotal + m_chunkSize));
+            }
+            else {
+                rangeHeader = String.format("%s=%d-", BYTES, m_readTotal);
+            }
+            conn.setRequestProperty(HDR_RANGE, rangeHeader);
+        }
+    }
+
+    /**
+     * Verifies that this stream (and the underlying URL connection) is open.
+     * 
+     * @throws IOException
+     *             in case this stream is already closed.
+     */
+    private void assertOpen() throws IOException {
+        if (m_state == ST_CLOSED) {
+            throw new IOException("Trying to read from closed stream!");
+        }
+        else if (m_state != ST_EOF) {
+            m_state = ST_OPEN;
+        }
+    }
+
+    /**
+     * Closes the current chunk-connection, if necessary.
+     */
+    private void closeChunk() {
+        if (m_conn != null) {
+            ConnectionUtil.close(m_conn);
+            m_conn = null;
+            m_readChunk = 0;
+        }
+    }
+
+    /**
+     * @return <code>true</code> if there is content remaining to be read, <code>false</code> otherwise.
+     */
+    private boolean contentRemaining() {
+        int state = m_state;
+        if (state == ST_EOF) {
+            return false;
+        }
+        else if (m_contentInfo == null) {
+            // no information yet about the content, so we must read it first...
+            return true;
+        }
+        long totalSize = m_contentInfo[1];
+        if ((totalSize > 0L) && (m_readTotal >= totalSize)) {
+            m_state = ST_EOF;
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * @param conn
+     *            the URL connection to get the range information from, cannot be <code>null</code>.
+     * @return an array of two elements containing: current chunk size & total size (in bytes).
+     * @throws IOException
+     *             in case of I/O problems or unexpected content.
+     */
+    private long[] getContentRangeInfo(HttpURLConnection conn) throws IOException {
+        int rc = conn.getResponseCode();
+        if (rc == SC_OK) {
+            // Non-chunked response...
+            if (m_readTotal > 0) {
+                // this is "bad", as we've read some parts and we cannot tell the consumer of this stream that this is
+                // happening...
+                throw new IOException("Server returned complete content instead of (requested) partial.");
+            }
+
+            long totalBytes = conn.getContentLength();
+
+            return new long[] { totalBytes, totalBytes };
+        }
+        else if (rc == SC_PARTIAL_CONTENT) {
+            String contentRange = conn.getHeaderField(HDR_CONTENT_RANGE);
+            if (contentRange == null) {
+                throw new IOException("Server returned no Content-Range for partial content");
+            }
+            if (!contentRange.startsWith(BYTES_)) {
+                throw new IOException("Server returned non-byte Content-Range " + contentRange);
+            }
+
+            String[] parts = contentRange.substring(6).split("/");
+            String[] rangeDef = parts[0].split("-");
+
+            long start = Long.parseLong(rangeDef[0]);
+            long end = Long.parseLong(rangeDef[1]);
+
+            long totalBytes;
+            if ("*".equals(parts[1])) {
+                totalBytes = -1L;
+            }
+            else {
+                totalBytes = Long.parseLong(parts[1]);
+            }
+
+            return new long[] { (end - start), totalBytes };
+        }
+        else if (rc == SC_SERVICE_UNAVAILABLE) {
+            // Service is unavailable, throw an exception to try it again later...
+            int retry = ((HttpURLConnection) conn).getHeaderFieldInt(HTTP_RETRY_AFTER, DEFAULT_RETRY_TIME);
+
+            throw new RetryAfterException(retry);
+        }
+        else {
+            throw new IOException("Unknown/unexpected status code: " + rc);
+        }
+    }
+
+    /**
+     * Prepares the connection for the next chunk, if needed.
+     * 
+     * @return <code>true</code> if the prepare was successful (there was a next chunk to be read), <code>false</code>
+     *         otherwise.
+     * @throws IOException
+     *             in case of I/O exception.
+     */
+    private boolean prepareNextChunk() throws IOException {
+        if ((m_conn == null) && contentRemaining()) {
+            m_conn = (HttpURLConnection) m_handler.getConnection(m_url);
+
+            applyRangeHeader(m_conn);
+
+            long[] contentInfo = getContentRangeInfo(m_conn);
+
+            // No, not yet, update our local administration...
+            if (m_contentInfo != null) {
+                // verify the total size (paranoia sanity check)...
+                if (m_contentInfo[1] >= 0 && contentInfo[1] >= 0 && m_contentInfo[1] != contentInfo[1]) {
+                    throw new IOException("Stream size mismatch between different chunks!");
+                }
+            }
+
+            m_contentInfo = contentInfo;
+        }
+
+        return (m_conn != null);
+    }
+}

Propchange: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ContentRangeInputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java?rev=1524032&r1=1524031&r2=1524032&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java Tue Sep 17 13:14:32 2013
@@ -25,7 +25,9 @@ import static org.apache.ace.agent.Agent
 import static org.apache.ace.agent.AgentConstants.CONFIG_CONTROLLER_SYNCDELAY;
 import static org.apache.ace.agent.AgentConstants.CONFIG_CONTROLLER_SYNCINTERVAL;
 import static org.apache.ace.agent.impl.ConnectionUtil.closeSilently;
-import static org.apache.ace.agent.impl.InternalConstants.*;
+import static org.apache.ace.agent.impl.InternalConstants.AGENT_CONFIG_CHANGED;
+import static org.apache.ace.agent.impl.InternalConstants.AGENT_INSTALLATION_COMPLETE;
+import static org.apache.ace.agent.impl.InternalConstants.AGENT_INSTALLATION_START;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -33,6 +35,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -41,7 +45,6 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.ace.agent.DownloadHandle;
 import org.apache.ace.agent.DownloadHandle.DownloadProgressListener;
 import org.apache.ace.agent.DownloadResult;
-import org.apache.ace.agent.DownloadState;
 import org.apache.ace.agent.EventListener;
 import org.apache.ace.agent.FeedbackChannel;
 import org.apache.ace.agent.InstallationFailedException;
@@ -57,9 +60,10 @@ public class DefaultController extends C
      * UpdateInstaller that provides download deployment package install. The install is non-blocking. Upon download
      * completion this installer will reschedule the controller.
      */
-    static class DownloadUpdateInstaller extends UpdateInstaller {
+    static class DownloadUpdateInstaller extends UpdateInstaller implements DownloadProgressListener {
         private volatile DownloadHandle m_downloadHandle;
         private volatile UpdateInfo m_updateInfo;
+        private volatile Future<DownloadResult> m_future;
 
         public DownloadUpdateInstaller(DefaultController controller) {
             super(controller);
@@ -69,72 +73,101 @@ public class DefaultController extends C
         public void doInstallUpdate(final UpdateHandler delegate, final UpdateInfo updateInfo) throws RetryAfterException {
             DefaultController controller = getController();
 
-            if (m_downloadHandle != null && m_updateInfo != null && !m_updateInfo.m_to.equals(updateInfo.m_to)) {
-                controller.logInfo("Cancelling download of %s update for %s because a newer version is available...", m_updateInfo.m_type, m_updateInfo.m_to);
-                m_downloadHandle.discard();
-                m_downloadHandle = null;
-            }
+            String type = updateInfo.m_type;
+            Version fromVersion = updateInfo.m_from;
+            Version toVersion = updateInfo.m_to;
 
-            if (m_downloadHandle == null) {
+            if (m_downloadHandle != null) {
+                // Ongoing download?
+                if (m_updateInfo != null && !m_updateInfo.m_to.equals(toVersion)) {
+                    controller.logInfo("Cancelling download of %s update for %s because a newer version is available...", m_updateInfo.m_type, m_updateInfo.m_to);
+
+                    clearDownloadState();
+                }
+            }
+            else {
                 controller.logInfo("Starting download of %s update, %s => %s...", updateInfo.m_type, updateInfo.m_from, updateInfo.m_to);
 
                 m_updateInfo = updateInfo;
+                m_future = null;
 
                 m_downloadHandle = delegate.getDownloadHandle(updateInfo.m_to, updateInfo.m_fixPackage);
-                m_downloadHandle.start(new DownloadProgressListener() {
-                    @Override
-                    public void completed(DownloadResult result) {
-                        DefaultController controller = getController();
-                        DownloadState state = result.getState();
-
-                        String type = updateInfo.m_type;
-                        Version fromVersion = updateInfo.m_from;
-                        Version toVersion = updateInfo.m_to;
+            }
 
-                        if (DownloadState.FAILED == state) {
-                            controller.logInfo("Download of %s update is FAILED. Restarting download...", type);
+            if (m_future == null) {
+                m_future = m_downloadHandle.start(this);
+            }
+            else {
+                if (!m_future.isDone()) {
+                    controller.logDebug("Still awaiting completion of download...");
+                    return;
+                }
+                else if (m_future.isCancelled()) {
+                    controller.logInfo("Download of %s update is CANCELLED. Resuming download...", type);
 
-                            m_downloadHandle.discard();
-                            m_downloadHandle = null;
-                        }
-                        else if (DownloadState.STOPPED == state) {
-                            controller.logInfo("Download of %s update is STOPPED. Resuming download...", type);
+                    // We're stopped early...
+                    m_future = m_downloadHandle.start(this);
+                    return;
+                }
 
-                            m_downloadHandle.start(this);
-                        }
-                        else if (DownloadState.SUCCESSFUL == state) {
+                try {
+                    try {
+                        DownloadResult downloadResult = m_future.get();
+
+                        if (downloadResult.isComplete()) {
                             controller.logInfo("Installing %s update %s => %s...", type, fromVersion, toVersion);
 
-                            boolean success = false;
-                            Exception cause = null;
+                            startInstallation(updateInfo);
 
-                            try {
-                                startInstallation(updateInfo);
+                            delegate.install(downloadResult.getInputStream());
 
-                                delegate.install(result.getInputStream());
+                            endInstallation(updateInfo, true /* success */, null);
 
-                                success = true;
-                            }
-                            catch (Exception exception) {
-                                cause = exception;
-                                controller.logWarning("Installation of %s update failed!", exception, type);
-                            }
-                            finally {
-                                m_downloadHandle.discard();
-                                m_downloadHandle = null;
+                            clearDownloadState();
+                        }
+                        else {
+                            controller.logInfo("Download of %s update is STOPPED. Resuming download...", type);
 
-                                endInstallation(updateInfo, success, cause);
-                            }
+                            // We're stopped early...
+                            m_future = m_downloadHandle.start(this);
                         }
                     }
+                    catch (InterruptedException exception) {
+                        controller.logInfo("Download of %s update is INTERRUPTED. Resuming download...", type);
 
-                    @Override
-                    public void progress(long contentLength, long progress) {
-                        if (updateInfo != null) {
-                            getController().logInfo("Progress of %s update download: %d of %d bytes...", updateInfo.m_type, progress, contentLength);
+                        // We're stopped early...
+                        m_future = m_downloadHandle.start(this);
+                    }
+                    catch (ExecutionException exception) {
+                        clearDownloadState();
+
+                        Throwable cause = exception.getCause();
+                        if (cause instanceof RetryAfterException) {
+                            throw (RetryAfterException) cause;
+                        }
+                        else if (cause instanceof Exception) {
+                            throw (Exception) cause;
+                        }
+                        else {
+                            throw new RuntimeException("Failed to handle cause!", cause);
                         }
                     }
-                });
+                }
+                catch (RetryAfterException ex) {
+                    // Does not cause the installation to end...
+                    throw ex;
+                }
+                catch (Exception ex) {
+                    // All other exceptions cause the installation to end/fail...
+                    endInstallation(updateInfo, false /* success */, ex);
+                }
+            }
+        }
+
+        @Override
+        public void progress(long bytesRead, long totalBytes) {
+            if (m_updateInfo != null) {
+                getController().logInfo("Progress of %s update download: %d of %d bytes...", m_updateInfo.m_type, bytesRead, totalBytes);
             }
         }
 
@@ -152,6 +185,10 @@ public class DefaultController extends C
                 m_downloadHandle.discard();
             }
             m_downloadHandle = null;
+            if (m_future != null && !m_future.isDone()) {
+                m_future.cancel(true /* mayInterruptIfRunning */);
+            }
+            m_future = null;
             m_updateInfo = null;
         }
     }
@@ -185,10 +222,7 @@ public class DefaultController extends C
                 // We aren't ready yet...
                 throw ex;
             }
-            catch (InstallationFailedException ex) {
-                endInstallation(updateInfo, false /* success */, ex);
-            }
-            catch (IOException ex) {
+            catch (Exception ex) {
                 endInstallation(updateInfo, false /* success */, ex);
             }
             finally {
@@ -306,7 +340,7 @@ public class DefaultController extends C
          * @param cause
          *            the (optional) cause why the installation failed.
          */
-        protected final void endInstallation(UpdateInfo updateInfo, boolean success, Exception cause) {
+        protected final void endInstallation(UpdateInfo updateInfo, boolean success, Exception cause) throws RetryAfterException {
             m_lastVersionSuccessful = success;
             if (cause instanceof InstallationFailedException || cause instanceof IOException) {
                 m_failureCount++;
@@ -465,8 +499,8 @@ public class DefaultController extends C
         catch (RetryAfterException e) {
             // any method may throw this causing the sync to abort. The server is busy so no sense in trying
             // anything else until the retry window has passed.
-            interval = e.getSeconds();
-            logWarning("Sync received retry exception from server. Rescheduled in %d seconds", e.getSeconds());
+            interval = e.getBackoffTime();
+            logWarning("Sync received retry exception from server. Rescheduled in %d seconds", e.getBackoffTime());
         }
         catch (Exception e) {
             // serious problem throw by a method that decides this is cause enough to abort the sync. Not much

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DeploymentHandlerImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DeploymentHandlerImpl.java?rev=1524032&r1=1524031&r2=1524032&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DeploymentHandlerImpl.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DeploymentHandlerImpl.java Tue Sep 17 13:14:32 2013
@@ -69,6 +69,7 @@ public class DeploymentHandlerImpl exten
             return payload;
         }
     }
+
     /**
      * Internal LogService that wraps delegates to actual InternalLogger. Used to inject into the DeploymentAdmin only.
      */
@@ -175,6 +176,11 @@ public class DeploymentHandlerImpl exten
             m_deploymentAdmin.installDeploymentPackage(inputStream);
         }
         catch (DeploymentException exception) {
+            Throwable cause = exception.getCause();
+            // Properly handle possible server overload...
+            if (cause instanceof RetryAfterException) {
+                throw (RetryAfterException) cause;
+            }
             throw new InstallationFailedException("Installation of deployment package failed!", exception);
         }
     }

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadCallableImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadCallableImpl.java?rev=1524032&r1=1524031&r2=1524032&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadCallableImpl.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadCallableImpl.java Tue Sep 17 13:14:32 2013
@@ -18,28 +18,23 @@
  */
 package org.apache.ace.agent.impl;
 
-import static org.apache.ace.agent.impl.ConnectionUtil.close;
 import static org.apache.ace.agent.impl.ConnectionUtil.closeSilently;
 
 import java.io.BufferedOutputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
-import java.net.HttpURLConnection;
 import java.util.concurrent.Callable;
 
 import org.apache.ace.agent.DownloadHandle.DownloadProgressListener;
+import org.apache.ace.agent.DownloadResult;
 import org.apache.ace.agent.DownloadState;
 
 /**
  * Responsible for actually downloading content from a download handle.
  */
-final class DownloadCallableImpl implements Callable<Void> {
-    private static final int SC_OK = 200;
-    private static final int SC_PARTIAL_CONTENT = 206;
-
+final class DownloadCallableImpl implements Callable<DownloadResult> {
     private final DownloadHandleImpl m_handle;
     private final DownloadProgressListener m_listener;
     private final File m_target;
@@ -53,108 +48,46 @@ final class DownloadCallableImpl impleme
     }
 
     @Override
-    public Void call() throws Exception {
-        int statusCode = 0;
-        HttpURLConnection httpUrlConnection = null;
-
-        try {
-            boolean partialContent = false;
-            boolean appendTarget = false;
-
-            httpUrlConnection = m_handle.openConnection();
-
-            long targetSize = m_target.length();
-            if (targetSize > 0) {
-                String rangeHeader = String.format("bytes=%d-", targetSize);
-
-                m_handle.logDebug("Requesting Range %s", rangeHeader);
-
-                httpUrlConnection.setRequestProperty("Range", rangeHeader);
-            }
+    public DownloadResult call() throws Exception {
+        ContentRangeInputStream is = null;
+        OutputStream os = null;
 
-            statusCode = httpUrlConnection.getResponseCode();
-            if (statusCode == SC_OK) {
-                partialContent = false;
-            }
-            else if (statusCode == SC_PARTIAL_CONTENT) {
-                partialContent = true;
-            }
-            else {
-                // TODO handle retry-after?!
-                throw new IOException("Unable to handle server response code " + statusCode);
-            }
-
-            long totalBytes = httpUrlConnection.getContentLength();
-            if (partialContent) {
-                String contentRange = httpUrlConnection.getHeaderField("Content-Range");
-                if (contentRange == null) {
-                    throw new IOException("Server returned no Content-Range for partial content");
-                }
-                if (!contentRange.startsWith("bytes ")) {
-                    throw new IOException("Server returned non bytes Content-Range " + contentRange);
-                }
+        long targetLength = m_target.length();
+        boolean appendTarget = (targetLength > 0);
 
-                String tmp = contentRange;
-                tmp = tmp.replace("byes ", "");
-                String[] parts = tmp.split("/");
-                String start = parts[0].split("-")[0];
-                String end = parts[0].split("-")[1];
-
-                m_handle.logDebug("Size: %d, range from %d to %d.", parts[1], start, end);
-
-                if ("*".equals(parts[1])) {
-                    totalBytes = -1;
-                }
-                else {
-                    totalBytes = Long.parseLong(parts[1]);
-                }
-            }
-
-            long bytesRead = 0l;
-            if (partialContent) {
-                bytesRead = targetSize;
-                appendTarget = true;
-            }
-
-            InputStream inputStream = httpUrlConnection.getInputStream();
-            OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(m_target, appendTarget));
+        try {
+            is = new ContentRangeInputStream(m_handle.getConnectionHandler(), m_handle.getURL(), targetLength);
+            os = new BufferedOutputStream(new FileOutputStream(m_target, appendTarget));
 
             byte buffer[] = new byte[m_readBufferSize];
+            long bytesRead = targetLength, totalBytes = -1L;
             int read;
 
-            try {
-                while (!Thread.currentThread().isInterrupted() && (read = inputStream.read(buffer)) >= 0) {
-                    outputStream.write(buffer, 0, read);
-                    bytesRead += read;
+            while (!Thread.currentThread().isInterrupted() && (read = is.read(buffer)) >= 0) {
+                os.write(buffer, 0, read);
+                // update local administration...
+                bytesRead += read;
+                totalBytes = is.getContentSize();
 
+                if (m_listener != null) {
                     m_listener.progress(bytesRead, totalBytes);
                 }
             }
-            finally {
-                closeSilently(outputStream);
-                closeSilently(inputStream);
-            }
 
-            boolean stoppedEarly = (totalBytes > 0L && bytesRead < totalBytes);
+            boolean stoppedEarly = Thread.currentThread().isInterrupted() || (totalBytes > 0L && bytesRead < totalBytes);
             if (stoppedEarly) {
-                m_handle.logDebug("Download stopped early: %d of %d bytes downloaded...", bytesRead, totalBytes);
+                m_handle.logDebug("Download stopped early: %d of %d bytes downloaded... (%d)", bytesRead, totalBytes, targetLength);
 
-                m_listener.completed(new DownloadResultImpl(DownloadState.STOPPED, (File) null, statusCode));
-            } else {
-                m_handle.logDebug("Download completed: %d bytes downloaded...", totalBytes);
-                
-                m_listener.completed(new DownloadResultImpl(DownloadState.SUCCESSFUL, m_target, statusCode));
+                return new DownloadResultImpl(DownloadState.STOPPED);
             }
-        }
-        catch (Exception e) {
-            m_handle.logWarning("Download failed!", e);
 
-            m_listener.completed(new DownloadResultImpl(DownloadState.FAILED, e, statusCode));
+            m_handle.logDebug("Download completed: %d bytes downloaded...", totalBytes);
+
+            return new DownloadResultImpl(DownloadState.SUCCESSFUL, new FileInputStream(m_target));
         }
         finally {
-            close(httpUrlConnection);
+            closeSilently(os);
+            closeSilently(is);
         }
-
-        return null;
     }
 }

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadHandleImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadHandleImpl.java?rev=1524032&r1=1524031&r2=1524032&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadHandleImpl.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadHandleImpl.java Tue Sep 17 13:14:32 2013
@@ -20,18 +20,13 @@ package org.apache.ace.agent.impl;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.HttpURLConnection;
 import java.net.URL;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.ace.agent.ConnectionHandler;
 import org.apache.ace.agent.DownloadHandle;
 import org.apache.ace.agent.DownloadResult;
-import org.apache.ace.agent.DownloadState;
 
 /**
  * A {@link DownloadHandle} implementation that supports pause/resume semantics based on HTTP Range headers assuming the
@@ -47,7 +42,7 @@ class DownloadHandleImpl implements Down
     private final URL m_url;
     private final int m_readBufferSize;
 
-    private volatile Future<Void> m_future;
+    private volatile Future<DownloadResult> m_future;
     private volatile File m_file;
 
     DownloadHandleImpl(DownloadHandlerImpl handler, URL url) {
@@ -66,64 +61,38 @@ class DownloadHandleImpl implements Down
             stop();
         }
         finally {
-            m_file.delete();
+            if (m_file != null) {
+                m_file.delete();
+            }
         }
     }
 
     @Override
-    public void start(DownloadProgressListener listener) {
-        if (listener == null) {
-            throw new IllegalArgumentException("Listener cannot be null!");
-        }
-
+    public Future<DownloadResult> start(DownloadProgressListener listener) {
         if (m_future != null && !m_future.isDone()) {
             throw new IllegalStateException("Can not call start on a handle that is already started!");
         }
+        m_future = null;
 
         if (m_file == null) {
             try {
                 m_file = File.createTempFile("download", ".bin", m_handler.getDataLocation());
             }
             catch (IOException e) {
-                listener.completed(new DownloadResultImpl(DownloadState.FAILED, e, -1));
+                throw new RuntimeException("Failed to create temporary file!", e);
             }
         }
 
-        m_future = getExecutor().submit(new DownloadCallableImpl(this, listener, m_file, m_readBufferSize));
-    }
-
-    @Override
-    public DownloadResult startAndAwaitResult(long timeout, TimeUnit unit) throws InterruptedException {
-        final CountDownLatch latch = new CountDownLatch(1);
-        final AtomicReference<DownloadResult> result = new AtomicReference<DownloadResult>();
-
-        start(new DownloadProgressListener() {
-            @Override
-            public void progress(long bytesRead, long totalBytes) {
-                // Nop
-            }
-
-            @Override
-            public void completed(DownloadResult downloadResult) {
-                result.set(downloadResult);
-                latch.countDown();
-            }
-        });
-        if (!latch.await(timeout, unit)) {
-            throw new InterruptedException("Failed to obtain result within given time constaints!");
-        }
-        return result.get();
+        return m_future = getExecutor().submit(new DownloadCallableImpl(this, listener, m_file, m_readBufferSize));
     }
 
     @Override
     public void stop() {
-        Future<Void> future = m_future;
+        Future<?> future = m_future;
         if (future != null) {
-            if (future.isDone()) {
-                throw new IllegalStateException("Can not call stop on a handle that is not yet started or completed!");
+            if (!future.isDone()) {
+                future.cancel(true /* mayInterruptIfRunning */);
             }
-
-            future.cancel(true /* mayInterruptIfRunning */);
         }
         m_future = null;
     }
@@ -136,13 +105,13 @@ class DownloadHandleImpl implements Down
         m_handler.logWarning(message, cause, args);
     }
 
-    final HttpURLConnection openConnection() throws IOException {
-        return (HttpURLConnection) getConnectionHandler().getConnection(m_url);
-    }
-
-    private ConnectionHandler getConnectionHandler() {
+    final ConnectionHandler getConnectionHandler() {
         return m_handler.getConnectionHandler();
     }
+    
+    final URL getURL() {
+        return m_url;
+    }
 
     private ExecutorService getExecutor() {
         return m_handler.getExecutor();

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadResultImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadResultImpl.java?rev=1524032&r1=1524031&r2=1524032&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadResultImpl.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadResultImpl.java Tue Sep 17 13:14:32 2013
@@ -18,52 +18,39 @@
  */
 package org.apache.ace.agent.impl;
 
-import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
 import org.apache.ace.agent.DownloadResult;
 import org.apache.ace.agent.DownloadState;
 
+/**
+ * Default, non thread-safe, implementation of {@link DownloadResult}.
+ */
 public class DownloadResultImpl implements DownloadResult {
     private final DownloadState m_state;
-    private final File m_file;
-    private final int m_code;
-    private final Throwable m_cause;
+    private final InputStream m_inputStream;
 
-    DownloadResultImpl(DownloadState state, Throwable cause, int code) {
+    DownloadResultImpl(DownloadState state) {
         m_state = state;
-        m_file = null;
-        m_code = code;
-        m_cause = cause;
+        m_inputStream = null;
     }
 
-    DownloadResultImpl(DownloadState state, File file, int code) {
+    DownloadResultImpl(DownloadState state, InputStream is) {
         m_state = state;
-        m_file = file;
-        m_code = code;
-        m_cause = null;
-    }
-
-    @Override
-    public DownloadState getState() {
-        return m_state;
+        m_inputStream = is;
     }
 
     @Override
-    @SuppressWarnings("resource")
     public InputStream getInputStream() throws IOException {
-        return m_file != null ? new FileInputStream(m_file) : null;
-    }
-
-    @Override
-    public int getCode() {
-        return m_code;
+        if (!isComplete()) {
+            throw new IllegalStateException("Cannot access incomplete download result!");
+        }
+        return m_inputStream;
     }
 
     @Override
-    public Throwable getCause() {
-        return m_cause;
+    public boolean isComplete() {
+        return DownloadState.SUCCESSFUL == m_state;
     }
 }

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/UpdateHandlerBase.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/UpdateHandlerBase.java?rev=1524032&r1=1524031&r2=1524032&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/UpdateHandlerBase.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/UpdateHandlerBase.java Tue Sep 17 13:14:32 2013
@@ -96,19 +96,7 @@ abstract class UpdateHandlerBase extends
     }
 
     protected InputStream getInputStream(URL packageURL) throws RetryAfterException, IOException {
-        URLConnection connection = null;
-        // TODO handle problems and retries
-        try {
-            connection = getConnection(packageURL);
-
-            checkConnectionResponse(connection);
-
-            return connection.getInputStream();
-        }
-        catch (IOException e) {
-            close(connection);
-            throw e;
-        }
+        return new ContentRangeInputStream(getConnectionHandler(), packageURL);
     }
 
     protected long getPackageSize(URL url) throws RetryAfterException, IOException {

Added: ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/ContentRangeInputStreamTest.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/ContentRangeInputStreamTest.java?rev=1524032&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/ContentRangeInputStreamTest.java (added)
+++ ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/ContentRangeInputStreamTest.java Tue Sep 17 13:14:32 2013
@@ -0,0 +1,656 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.agent.impl;
+
+import static org.testng.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLConnection;
+
+import org.apache.ace.agent.ConnectionHandler;
+import org.apache.ace.agent.RetryAfterException;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Test cases for {@link ContentRangeInputStream}.
+ */
+public class ContentRangeInputStreamTest {
+    static enum Failure {
+        CONTENT_NOT_FOUND, SERVER_UNAVAILABLE, PARTIAL_NO_CONTENT_RANGE, PARTIAL_NON_BYTE_RANGE, PARTIAL_COMPLETE_BODY, PARTIAL_CHANGING_CONTENT_LENGTH;
+    }
+
+    /**
+     * Stub implementation of an {@link HttpURLConnection} that returns all content in one big chunk.
+     */
+    private static class CompleteContentConnection extends TestHttpURLConnection {
+        private final boolean m_includeLength;
+        private InputStream m_stream;
+
+        public CompleteContentConnection(String content, boolean includeLength) {
+            super(content);
+            m_includeLength = includeLength;
+        }
+
+        @Override
+        public void disconnect() {
+            m_stream = null;
+        }
+
+        @Override
+        public int getContentLength() {
+            return m_includeLength ? m_length : -1;
+        }
+
+        @Override
+        public InputStream getInputStream() throws IOException {
+            if (m_stream == null) {
+                m_stream = new ByteArrayInputStream(m_content.getBytes());
+            }
+            return m_stream;
+        }
+
+        @Override
+        public int getResponseCode() throws IOException {
+            return 200;
+        }
+    }
+
+    /**
+     * Stub implementation of an {@link HttpURLConnection} that yields various failures.
+     */
+    private static class FailingContentConnection extends TestHttpURLConnection {
+        private final Failure m_failure;
+        private InputStream m_stream;
+
+        public FailingContentConnection(String content, Failure failure) {
+            super(content);
+            m_failure = failure;
+        }
+
+        @Override
+        public void disconnect() {
+            m_stream = null;
+        }
+
+        @Override
+        public int getContentLength() {
+            return -1;
+        }
+
+        @Override
+        public String getHeaderField(String name) {
+            int rc = getResponseCode();
+            if (rc == 206) {
+                if (m_failure == Failure.PARTIAL_NON_BYTE_RANGE) {
+                    return String.format("octets %d-%d/%d", 48, 96, m_length);
+                }
+                else if (m_failure == Failure.PARTIAL_CHANGING_CONTENT_LENGTH) {
+                    int cl = (getRequestProperty("Range") != null) ? 1024 : m_length;
+                    return String.format("bytes %d-%d/%d", 48, 96, cl);
+                }
+                else if (m_failure != Failure.PARTIAL_NO_CONTENT_RANGE) {
+                    return String.format("bytes %d-%d/%d", 48, 96, m_length);
+                }
+            }
+            return super.getHeaderField(name);
+        }
+
+        @Override
+        public InputStream getInputStream() throws IOException {
+            if (m_stream == null) {
+                m_stream = new ByteArrayInputStream(m_content.substring(48, 96).getBytes());
+            }
+            return m_stream;
+        }
+
+        @Override
+        public int getResponseCode() {
+            if (m_failure == Failure.PARTIAL_NO_CONTENT_RANGE || m_failure == Failure.PARTIAL_NON_BYTE_RANGE || m_failure == Failure.PARTIAL_CHANGING_CONTENT_LENGTH) {
+                return 206;
+            }
+            else if (m_failure == Failure.PARTIAL_COMPLETE_BODY) {
+                if (getRequestProperty("Range") != null) {
+                    return 200;
+                }
+                return 206;
+            }
+            else if (m_failure == Failure.CONTENT_NOT_FOUND) {
+                return 404;
+            }
+            else if (m_failure == Failure.SERVER_UNAVAILABLE) {
+                return 503;
+            }
+            return 200;
+        }
+    }
+
+    /**
+     * Stub implementation of an {@link HttpURLConnection} that returns all content in one big chunk.
+     */
+    private static class PartialContentConnection extends TestHttpURLConnection {
+        private final int m_chunkSize;
+        private final boolean m_deferSendingTotalLength;
+
+        private int[] m_connInfo;
+        private InputStream m_partialContentStream;
+
+        public PartialContentConnection(String content, boolean deferSendingTotalLength) {
+            super(content);
+            // use an odd divisor to ensure that not all chunks are the same...
+            m_chunkSize = (m_length / 7);
+            m_deferSendingTotalLength = deferSendingTotalLength;
+        }
+
+        @Override
+        public void disconnect() {
+            m_connInfo = null;
+            m_partialContentStream = null;
+        }
+
+        @Override
+        public int getContentLength() {
+            int[] connInfo = determineNextContent();
+            return (connInfo != null) ? connInfo[1] - connInfo[0] : -1;
+        }
+
+        @Override
+        public String getHeaderField(String name) {
+            int[] connInfo = determineNextContent();
+
+            if ("Content-Range".equals(name)) {
+                String contentLength;
+                if (m_deferSendingTotalLength) {
+                    contentLength = "*";
+                    if ((connInfo[1] - connInfo[0]) < m_chunkSize) {
+                        contentLength = Integer.toString(m_length);
+                    }
+                }
+                else {
+                    contentLength = Integer.toString(m_length);
+                }
+                return String.format("bytes %d-%d/%s", m_connInfo[0], m_connInfo[1], contentLength);
+            }
+            return super.getHeaderField(name);
+        }
+
+        @Override
+        public InputStream getInputStream() throws IOException {
+            determineNextContent();
+            return m_partialContentStream;
+        }
+
+        @Override
+        public int getResponseCode() throws IOException {
+            return 206;
+        }
+
+        private int[] determineNextContent() {
+            if (m_connInfo == null) {
+                int start = 0;
+                int end = m_chunkSize;
+
+                String range = getRequestProperty("Range");
+                if (range != null && range.startsWith("bytes=")) {
+                    String[] parts = range.substring(6).split("-");
+
+                    start = Integer.parseInt(parts[0]);
+                    if (parts.length > 1) {
+                        end = Integer.parseInt(parts[1]);
+                    }
+                    else {
+                        end = start + m_chunkSize;
+                    }
+                }
+
+                m_connInfo = new int[] { Math.min(m_length, start), Math.min(m_length, end) };
+                m_partialContentStream = new ByteArrayInputStream(m_content.substring(m_connInfo[0], m_connInfo[1]).getBytes());
+            }
+            return m_connInfo;
+        }
+    }
+
+    /** Stub implementation that simply opens all given URLs. */
+    private static class TestConnectionHandler implements ConnectionHandler {
+        private final HttpURLConnection m_conn;
+
+        public TestConnectionHandler(HttpURLConnection conn) {
+            m_conn = conn;
+        }
+
+        @Override
+        public URLConnection getConnection(URL url) throws IOException {
+            return m_conn;
+        }
+    }
+
+    private static abstract class TestHttpURLConnection extends HttpURLConnection {
+        protected final String m_content;
+        protected final int m_length;
+
+        protected TestHttpURLConnection(String content) {
+            super(null /* url, not used. */);
+            m_content = content;
+            m_length = content.length();
+        }
+
+        @Override
+        public void connect() throws IOException {
+            // Nop
+        }
+
+        @Override
+        public boolean usingProxy() {
+            return false;
+        }
+    }
+
+    private static URL m_testURL;
+    private static String m_content;
+
+    @BeforeClass
+    protected static void setUpSuite() throws Exception {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < 100; i++) {
+            sb.append(String.format("%03d. Hello World of Content-Range Input Stream!\n", i + 1));
+        }
+        m_content = sb.toString();
+        m_testURL = new URL("file://nonExistingFile.txt");
+    }
+
+    private static String slurpAsStringByteForByte(InputStream is) throws IOException {
+        StringBuilder sb = new StringBuilder();
+
+        int read = 0;
+        do {
+            read = is.read();
+            if (read > 0) {
+                sb.append((char) read);
+            }
+        }
+        while (read > 0);
+        return sb.toString();
+    }
+
+    private static String slurpAsStringWithBuffer(InputStream is) throws IOException {
+        StringBuilder sb = new StringBuilder();
+
+        byte[] buf = new byte[64];
+        int read = 0;
+        do {
+            read = is.read(buf);
+            if (read > 0) {
+                sb.append(new String(buf, 0, read));
+            }
+        }
+        while (read > 0);
+        return sb.toString();
+    }
+
+    /**
+     * Tests that the "Range" header is correctly set.
+     */
+    @Test
+    public void testRangeHeadersCorrectlySetOk() throws Exception {
+        String content = m_content;
+        PartialContentConnection conn;
+        ContentRangeInputStream is;
+
+        conn = new PartialContentConnection(content, false);
+        // no offset causes no Range header to be set (initially)...
+        is = new ContentRangeInputStream(new TestConnectionHandler(conn), m_testURL);
+
+        is.read(); // read one byte...
+        is.close();
+
+        // Make sure the proper request header is NOT set...
+        assertRequestHeader(conn, "Range", null);
+
+        conn = new PartialContentConnection(content, false);
+        // start at 48 bytes and return the next complete chunk...
+        is = new ContentRangeInputStream(new TestConnectionHandler(conn), m_testURL, 48);
+
+        is.read(); // read one byte...
+        is.close();
+
+        // Make sure the proper request header is set...
+        assertRequestHeader(conn, "Range", "bytes=48-");
+
+        conn = new PartialContentConnection(content, false);
+        // 4752 + 48 = 4800, causing only one chunk to be returned...
+        is = new ContentRangeInputStream(new TestConnectionHandler(conn), m_testURL, 48, 4752);
+
+        is.read(); // read one byte...
+        is.close();
+
+        // Make sure the proper request header is set...
+        assertRequestHeader(conn, "Range", "bytes=48-4800");
+    }
+
+    /**
+     * Tests that we can read non-partial content and return the expected contents.
+     */
+    @Test(expectedExceptions = IOException.class)
+    public void testReadClosedStreamFail() throws Exception {
+        ConnectionHandler handler = new TestConnectionHandler(new CompleteContentConnection(m_content, true));
+
+        ContentRangeInputStream is = new ContentRangeInputStream(handler, m_testURL);
+        is.close(); // simulate an early close...
+
+        is.read(); // should fail!
+    }
+
+    /**
+     * Tests that we can read non-partial content and return the expected contents.
+     */
+    @Test
+    public void testReadNonPartialContentByteForByteOk() throws Exception {
+        String content = m_content;
+
+        ConnectionHandler handler = new TestConnectionHandler(new CompleteContentConnection(content, true));
+        ContentRangeInputStream is = new ContentRangeInputStream(handler, m_testURL);
+
+        assertEquals(slurpAsStringByteForByte(is), content);
+
+        // try several additional reads, which should all return -1 (= EOF)...
+        int tries = 5;
+        while (--tries > 0) {
+            assertEquals(is.read(), -1);
+        }
+    }
+
+    /**
+     * Tests that we can read non-partial content and return the expected contents.
+     */
+    @Test
+    public void testReadNonPartialContentOk() throws Exception {
+        String content = m_content;
+
+        ConnectionHandler handler = new TestConnectionHandler(new CompleteContentConnection(content, true));
+        ContentRangeInputStream is = new ContentRangeInputStream(handler, m_testURL);
+
+        assertEquals(slurpAsStringWithBuffer(is), content);
+    }
+
+    /**
+     * Tests that we can read non-partial content and return the expected contents.
+     */
+    @Test
+    public void testReadNonPartialEmptyContentOk() throws Exception {
+        String content = "";
+
+        ConnectionHandler handler = new TestConnectionHandler(new CompleteContentConnection(content, true));
+        ContentRangeInputStream is = new ContentRangeInputStream(handler, m_testURL);
+
+        assertEquals(slurpAsStringWithBuffer(is), content);
+    }
+
+    /**
+     * Tests that we can read non-partial content and return the expected contents.
+     */
+    @Test
+    public void testReadNonPartialWithoutContentLengthOk() throws Exception {
+        String content = "";
+
+        ConnectionHandler handler = new TestConnectionHandler(new CompleteContentConnection(content, false));
+        ContentRangeInputStream is = new ContentRangeInputStream(handler, m_testURL);
+
+        assertEquals(slurpAsStringWithBuffer(is), content);
+    }
+
+    /**
+     * Tests that we can read partial content and return the expected contents.
+     */
+    @Test
+    public void testReadPartialContentByteForByteOk() throws Exception {
+        String content = m_content;
+
+        ConnectionHandler handler = new TestConnectionHandler(new PartialContentConnection(content, false));
+        ContentRangeInputStream is = new ContentRangeInputStream(handler, m_testURL);
+
+        assertEquals(slurpAsStringByteForByte(is), content);
+    }
+
+    /**
+     * Tests that we cannot read partial content if the content is not available.
+     */
+    @Test(expectedExceptions = IOException.class)
+    public void testReadPartialContentNotFoundFail() throws Exception {
+        ConnectionHandler handler = new TestConnectionHandler(new FailingContentConnection(m_content, Failure.CONTENT_NOT_FOUND));
+        ContentRangeInputStream is = null;
+
+        try {
+            is = new ContentRangeInputStream(handler, m_testURL);
+            is.read(); // should fail!
+        }
+        finally {
+            if (is != null) {
+                is.close();
+            }
+        }
+    }
+
+    /**
+     * Tests that we can read partial content and return the expected contents.
+     */
+    @Test
+    public void testReadPartialContentOk() throws Exception {
+        String content = m_content;
+
+        ConnectionHandler handler = new TestConnectionHandler(new PartialContentConnection(content, false));
+        ContentRangeInputStream is = new ContentRangeInputStream(handler, m_testURL);
+
+        assertEquals(slurpAsStringWithBuffer(is), content);
+    }
+
+    /**
+     * Tests that we cannot read partial content if the server is not available.
+     */
+    @Test(expectedExceptions = RetryAfterException.class)
+    public void testReadPartialContentServerUnavailableFail() throws Exception {
+        ConnectionHandler handler = new TestConnectionHandler(new FailingContentConnection(m_content, Failure.SERVER_UNAVAILABLE));
+        ContentRangeInputStream is = null;
+
+        try {
+            is = new ContentRangeInputStream(handler, m_testURL);
+            is.read(); // should fail!
+        }
+        finally {
+            if (is != null) {
+                is.close();
+            }
+        }
+    }
+
+    /**
+     * Tests that we cannot read partial content if the server returns a complete body.
+     */
+    @Test(expectedExceptions = IOException.class)
+    public void testReadPartialContentWithChangingContentLengthFail() throws Exception {
+        ConnectionHandler handler = new TestConnectionHandler(new FailingContentConnection(m_content, Failure.PARTIAL_CHANGING_CONTENT_LENGTH));
+        ContentRangeInputStream is = null;
+
+        try {
+            is = new ContentRangeInputStream(handler, m_testURL);
+            is.read(new byte[1024]); // should succeed.
+            is.read(new byte[1024]); // should fail!
+        }
+        finally {
+            if (is != null) {
+                is.close();
+            }
+        }
+    }
+
+    /**
+     * Tests that we can read non-partial content and return the expected contents.
+     */
+    @Test
+    public void testReadPartialContentWithChunkSizeOk() throws Exception {
+        String content = m_content;
+
+        PartialContentConnection conn = new PartialContentConnection(content, false);
+        ConnectionHandler handler = new TestConnectionHandler(conn);
+        // should cause chunks of 1024 bytes to be used, which means 4 complete chunks and one chunk of 704 bytes...
+        ContentRangeInputStream is = new ContentRangeInputStream(handler, m_testURL, 0, 1024);
+
+        assertEquals(slurpAsStringWithBuffer(is), content);
+
+        assertResponseHeader(conn, "Content-Range", "bytes 4096-4800/4800");
+    }
+
+    /**
+     * Tests that we cannot read partial content if the server returns a complete body.
+     */
+    @Test(expectedExceptions = IOException.class)
+    public void testReadPartialContentWithCompleteBodyFail() throws Exception {
+        ConnectionHandler handler = new TestConnectionHandler(new FailingContentConnection(m_content, Failure.PARTIAL_COMPLETE_BODY));
+        ContentRangeInputStream is = null;
+
+        try {
+            is = new ContentRangeInputStream(handler, m_testURL);
+            is.read(new byte[1024]); // should succeed.
+            is.read(new byte[1024]); // should fail!
+        }
+        finally {
+            if (is != null) {
+                is.close();
+            }
+        }
+    }
+
+    /**
+     * Tests that we can read partial content and return the expected contents.
+     */
+    @Test
+    public void testReadPartialContentWithDeferredTotalLengthOk() throws Exception {
+        String content = m_content;
+
+        ConnectionHandler handler = new TestConnectionHandler(new PartialContentConnection(content, true));
+        ContentRangeInputStream is = new ContentRangeInputStream(handler, m_testURL);
+
+        assertEquals(slurpAsStringWithBuffer(is), content);
+    }
+
+    /**
+     * Tests that we can read non-partial content and return the expected contents.
+     */
+    @Test
+    public void testReadPartialContentWithOffsetAndChunkSizeOk() throws Exception {
+        String content = m_content;
+
+        PartialContentConnection conn = new PartialContentConnection(content, false);
+        ConnectionHandler handler = new TestConnectionHandler(conn);
+        // 4752 + 48 = 4800, causing only one chunk to be returned...
+        ContentRangeInputStream is = new ContentRangeInputStream(handler, m_testURL, 48, 4752);
+
+        assertEquals(slurpAsStringWithBuffer(is), content.substring(48));
+    }
+
+    /**
+     * Tests that we can read non-partial content and return the expected contents.
+     */
+    @Test
+    public void testReadPartialContentWithOffsetOk() throws Exception {
+        String content = m_content;
+
+        ConnectionHandler handler = new TestConnectionHandler(new PartialContentConnection(content, false));
+        ContentRangeInputStream is = new ContentRangeInputStream(handler, m_testURL, 48);
+
+        assertEquals(slurpAsStringWithBuffer(is), content.substring(48));
+    }
+
+    /**
+     * Tests that we cannot read partial content if given a non-byte range value in the Content-Range header.
+     */
+    @Test(expectedExceptions = IOException.class)
+    public void testReadPartialContentWithoutByteRangeValueFail() throws Exception {
+        ConnectionHandler handler = new TestConnectionHandler(new FailingContentConnection(m_content, Failure.PARTIAL_NON_BYTE_RANGE));
+        ContentRangeInputStream is = null;
+
+        try {
+            is = new ContentRangeInputStream(handler, m_testURL);
+            is.read(); // should fail!
+        }
+        finally {
+            if (is != null) {
+                is.close();
+            }
+        }
+    }
+
+    /**
+     * Tests that we cannot read partial content without a Content-Range header.
+     */
+    @Test(expectedExceptions = IOException.class)
+    public void testReadPartialContentWithoutContentRangeHeaderFail() throws Exception {
+        ConnectionHandler handler = new TestConnectionHandler(new FailingContentConnection(m_content, Failure.PARTIAL_NO_CONTENT_RANGE));
+        ContentRangeInputStream is = null;
+
+        try {
+            is = new ContentRangeInputStream(handler, m_testURL);
+            is.read(); // should fail!
+        }
+        finally {
+            if (is != null) {
+                is.close();
+            }
+        }
+    }
+
+    /**
+     * Tests that we can read partial content and return the expected contents.
+     */
+    @Test
+    public void testReadPartialEmptyContentByteForByteOk() throws Exception {
+        String content = "";
+
+        ConnectionHandler handler = new TestConnectionHandler(new PartialContentConnection(content, false));
+        ContentRangeInputStream is = new ContentRangeInputStream(handler, m_testURL);
+
+        assertEquals(slurpAsStringByteForByte(is), content);
+    }
+
+    /**
+     * Tests that we can read partial content and return the expected contents.
+     */
+    @Test
+    public void testReadPartialEmptyContentOk() throws Exception {
+        String content = "";
+
+        ConnectionHandler handler = new TestConnectionHandler(new PartialContentConnection(content, false));
+        ContentRangeInputStream is = new ContentRangeInputStream(handler, m_testURL);
+
+        assertEquals(slurpAsStringWithBuffer(is), content);
+    }
+
+    private void assertRequestHeader(HttpURLConnection conn, String property, String expected) {
+        String value = conn.getRequestProperty(property);
+        assertEquals(expected, value);
+    }
+
+    private void assertResponseHeader(HttpURLConnection conn, String property, String expected) {
+        String value = conn.getHeaderField(property);
+        assertEquals(expected, value);
+    }
+}

Propchange: ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/ContentRangeInputStreamTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message