ace-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject svn commit: r1530190 - in /ace/trunk: org.apache.ace.agent/src/org/apache/ace/agent/impl/ org.apache.ace.repository.itest/src/org/apache/ace/it/repository/ org.apache.ace.repository/src/org/apache/ace/repository/ext/impl/
Date Tue, 08 Oct 2013 09:04:32 GMT
Author: jawi
Date: Tue Oct  8 09:04:32 2013
New Revision: 1530190

URL: http://svn.apache.org/r1530190
Log:
Yet another attempt to make the itests more robust:

- it appears that somehow a resource starvation appears with respect to
  the number of available sockets, make sure that we properly handle
  exceptions during the use of HttpURLConnections.


Modified:
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ConnectionUtil.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ContentRangeInputStream.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java
    ace/trunk/org.apache.ace.repository.itest/src/org/apache/ace/it/repository/Utils.java
    ace/trunk/org.apache.ace.repository/src/org/apache/ace/repository/ext/impl/RemoteRepository.java

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ConnectionUtil.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ConnectionUtil.java?rev=1530190&r1=1530189&r2=1530190&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ConnectionUtil.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ConnectionUtil.java Tue Oct
 8 09:04:32 2013
@@ -98,6 +98,25 @@ class ConnectionUtil {
         return null;
     }
 
+    /**
+     * @see http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
+     */
+    public static int handleIOException(URLConnection conn) {
+        int respCode = -1;
+        if (!(conn instanceof HttpURLConnection)) {
+            return respCode;
+        }
+
+        try {
+            respCode = ((HttpURLConnection) conn).getResponseCode();
+            flushStream(((HttpURLConnection) conn).getErrorStream());
+        }
+        catch (IOException ex) {
+            // deal with the exception
+        }
+        return respCode;
+    }
+
     public static void copy(InputStream is, OutputStream os) throws IOException {
         copy(is, os, DEFAULT_BUFFER_SIZE);
     }
@@ -129,14 +148,32 @@ class ConnectionUtil {
      * @param connection
      *            the URL connection to get the response code for, can be <code>null</code>.
      * @return the response code for the given connection, or <code>-1</code>
if it could not be determined.
-     * @throws IOException
-     *             if retrieving the response code failed.
      */
-    private static int getResponseCode(URLConnection connection) throws IOException {
-        if (connection instanceof HttpURLConnection) {
-            return ((HttpURLConnection) connection).getResponseCode();
+    private static int getResponseCode(URLConnection connection) {
+        try {
+            if (connection instanceof HttpURLConnection) {
+                return ((HttpURLConnection) connection).getResponseCode();
+            }
+            return -1;
+        }
+        catch (IOException exception) {
+            return handleIOException(connection);
+        }
+    }
+
+    static void flushStream(InputStream is) {
+        byte[] buf = new byte[4096];
+        try {
+            while (is.read(buf) > 0) {
+                // Ignore...
+            }
+        }
+        catch (IOException ex) {
+            // deal with the exception
+        }
+        finally {
+            closeSilently(is);
         }
-        return -1;
     }
 
     private ConnectionUtil() {

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ContentRangeInputStream.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ContentRangeInputStream.java?rev=1530190&r1=1530189&r2=1530190&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ContentRangeInputStream.java
(original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/ContentRangeInputStream.java
Tue Oct  8 09:04:32 2013
@@ -19,7 +19,8 @@
 package org.apache.ace.agent.impl;
 
 import static org.apache.ace.agent.impl.ConnectionUtil.DEFAULT_RETRY_TIME;
-import static org.apache.ace.agent.impl.ConnectionUtil.*;
+import static org.apache.ace.agent.impl.ConnectionUtil.HTTP_RETRY_AFTER;
+import static org.apache.ace.agent.impl.ConnectionUtil.handleIOException;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -165,7 +166,7 @@ class ContentRangeInputStream extends In
             return -1;
         }
 
-        InputStream is = m_conn.getInputStream();
+        InputStream is = getInputStream();
 
         int result = is.read();
         if (result > 0) {
@@ -194,7 +195,7 @@ class ContentRangeInputStream extends In
             return -1;
         }
 
-        InputStream is = m_conn.getInputStream();
+        InputStream is = getInputStream();
 
         int read = is.read(b, off, len);
         if (read >= 0) {
@@ -225,7 +226,8 @@ class ContentRangeInputStream extends In
                     rangeHeader = String.format("%s=%d-", BYTES, m_readTotal);
                 }
                 conn.setRequestProperty(HDR_RANGE, rangeHeader);
-            } else {
+            }
+            else {
                 // Non-HTTP connection, skip the first few bytes when calling this method
for the first time...
                 if (m_contentInfo == null) {
                     long skip = m_readTotal;
@@ -297,7 +299,14 @@ class ContentRangeInputStream extends In
     }
 
     private long[] getHttpContentRangeInfo(HttpURLConnection conn) throws IOException {
-        int rc = conn.getResponseCode();
+        int rc;
+        try {
+            rc = conn.getResponseCode();
+        }
+        catch (IOException exception) {
+            rc = handleIOException(conn);
+        }
+
         if (rc == SC_OK) {
             // Non-chunked response...
             if (m_readTotal > 0) {
@@ -347,6 +356,22 @@ class ContentRangeInputStream extends In
     }
 
     /**
+     * @return the current input stream, never <code>null</code>.
+     * @throws IOException
+     *             in case of I/O problems accessing the input stream.
+     */
+    private InputStream getInputStream() throws IOException {
+        try {
+            return m_conn.getInputStream();
+        }
+        catch (IOException exception) {
+            handleIOException(m_conn);
+            closeChunk();
+            throw exception;
+        }
+    }
+
+    /**
      * Prepares the connection for the next chunk, if needed.
      * 
      * @return <code>true</code> if the prepare was successful (there was a next
chunk to be read), <code>false</code>

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java?rev=1530190&r1=1530189&r2=1530190&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java
(original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java
Tue Oct  8 09:04:32 2013
@@ -19,7 +19,7 @@
 package org.apache.ace.agent.impl;
 
 import static org.apache.ace.agent.impl.ConnectionUtil.close;
-import static org.apache.ace.agent.impl.ConnectionUtil.closeSilently;
+import static org.apache.ace.agent.impl.ConnectionUtil.*;
 
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
@@ -104,14 +104,16 @@ public class FeedbackChannelImpl impleme
                 try {
                     synchronizeStore(storeID, queryConnection.getInputStream(), writer);
                 }
+                catch (IOException e) {
+                    handleIOException(queryConnection);
+                }
                 finally {
                     close(queryConnection);
                 }
             }
             writer.flush();
 
-            ConnectionUtil.checkConnectionResponse(sendConnection);
-            sendConnection.getContent();
+            checkConnectionResponse(sendConnection);
         }
         finally {
             closeSilently(writer);

Modified: ace/trunk/org.apache.ace.repository.itest/src/org/apache/ace/it/repository/Utils.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.repository.itest/src/org/apache/ace/it/repository/Utils.java?rev=1530190&r1=1530189&r2=1530190&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.repository.itest/src/org/apache/ace/it/repository/Utils.java
(original)
+++ ace/trunk/org.apache.ace.repository.itest/src/org/apache/ace/it/repository/Utils.java
Tue Oct  8 09:04:32 2013
@@ -43,6 +43,12 @@ final class Utils {
         }
     }
 
+    static void closeSilently(HttpURLConnection resource) {
+        if (resource != null) {
+            resource.disconnect();
+        }
+    }
+
     /* copy in to out */
     static void copy(InputStream in, OutputStream out) throws IOException {
         byte[] buffer = new byte[COPY_BUFFER_SIZE];
@@ -53,59 +59,93 @@ final class Utils {
         }
     }
 
+    static void flushStream(InputStream is) {
+        byte[] buf = new byte[COPY_BUFFER_SIZE];
+        try {
+            while (is.read(buf) > 0) {
+                // Ignore...
+            }
+        }
+        catch (IOException ex) {
+            // deal with the exception
+        }
+        finally {
+            closeSilently(is);
+        }
+    }
+
     static int get(URL host, String endpoint, String customer, String name, String version,
OutputStream out) throws IOException {
+        int responseCode;
+
         URL url = new URL(host, endpoint + "?customer=" + customer + "&name=" + name
+ "&version=" + version);
+
+        InputStream input = null;
         HttpURLConnection connection = (HttpURLConnection) url.openConnection();
         try {
-            int responseCode = connection.getResponseCode();
-            if (responseCode == HttpURLConnection.HTTP_OK) {
-                InputStream input = connection.getInputStream();
-                try {
-                    copy(input, out);
-                    out.flush();
-                }
-                finally {
-                    closeSilently(input);
-                }
-            }
-            return responseCode;
+            responseCode = connection.getResponseCode();
+            input = connection.getInputStream();
+
+            copy(input, out);
+            out.flush();
+        }
+        catch (IOException e) {
+            responseCode = handleIOException(connection);
         }
         finally {
-            connection.disconnect();
+            closeSilently(input);
+            closeSilently(connection);
+        }
+
+        return responseCode;
+    }
+
+    /**
+     * @see http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
+     */
+    static int handleIOException(HttpURLConnection conn) {
+        int respCode = -1;
+        try {
+            respCode = conn.getResponseCode();
+            flushStream(conn.getErrorStream());
+        }
+        catch (IOException ex) {
+            // deal with the exception
         }
+        return respCode;
     }
 
     static int put(URL host, String endpoint, String customer, String name, String version,
InputStream in) throws IOException {
         URL url = new URL(host, endpoint + "?customer=" + customer + "&name=" + name
+ "&version=" + version);
-        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
-        connection.setDoOutput(true);
-        // ACE-294: enable streaming mode causing only small amounts of memory to be
-        // used for this commit. Otherwise, the entire input stream is cached into
-        // memory prior to sending it to the server...
-        connection.setChunkedStreamingMode(8192);
-        connection.setRequestProperty("Content-Type", MIME_APPLICATION_OCTET_STREAM);
-        OutputStream out = connection.getOutputStream();
+
+        int responseCode;
+        HttpURLConnection connection = null;
+        OutputStream out = null;
+
         try {
+            connection = (HttpURLConnection) url.openConnection();
+            connection.setDoOutput(true);
+            // ACE-294: enable streaming mode causing only small amounts of memory to be
+            // used for this commit. Otherwise, the entire input stream is cached into
+            // memory prior to sending it to the server...
+            connection.setChunkedStreamingMode(8192);
+            connection.setRequestProperty("Content-Type", MIME_APPLICATION_OCTET_STREAM);
+            out = connection.getOutputStream();
+
             copy(in, out);
             out.flush();
+
+            responseCode = connection.getResponseCode();
+            flushStream(connection.getInputStream());
+        }
+        catch (IOException e) {
+            responseCode = handleIOException(connection);
         }
         finally {
             closeSilently(in);
             closeSilently(out);
+            closeSilently(connection);
         }
 
-        int responseCode = connection.getResponseCode();
-        if (responseCode == HttpURLConnection.HTTP_OK) {
-            InputStream is = (InputStream) connection.getContent();
-            try {
-                while (is.read() > 0) {
-                    // ignore...
-                }
-            }
-            finally {
-                closeSilently(is);
-            }
-        }
         return responseCode;
     }
 
@@ -114,30 +154,40 @@ final class Utils {
         String f2 = (name == null) ? null : "name=" + name;
         String filter = ((f1 == null) ? "?" : "?" + f1 + "&") + ((f2 == null) ? "" :
f2);
         URL url = new URL(host, endpoint + filter);
-        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
-        int responseCode = connection.getResponseCode();
-        if (responseCode == HttpURLConnection.HTTP_OK) {
-            InputStream input = connection.getInputStream();
-            try {
-                copy(input, out);
-                out.flush();
-            }
-            finally {
-                closeSilently(input);
-            }
+
+        int responseCode;
+        HttpURLConnection connection = null;
+        InputStream input = null;
+
+        try {
+            connection = (HttpURLConnection) url.openConnection();
+            responseCode = connection.getResponseCode();
+            input = connection.getInputStream();
+
+            copy(input, out);
+            out.flush();
         }
+        catch (IOException e) {
+            responseCode = handleIOException(connection);
+        }
+        finally {
+            closeSilently(input);
+            closeSilently(out);
+            closeSilently(connection);
+        }
+
         return responseCode;
     }
 
     static void waitForWebserver(URL host) throws IOException {
-        int retries = 1;
+        int retries = 1, rc = -1;
         IOException ioe = null;
         HttpURLConnection conn = null;
         while (retries++ < 10) {
             try {
                 conn = (HttpURLConnection) host.openConnection();
 
-                int rc = conn.getResponseCode();
+                rc = conn.getResponseCode();
                 if (rc >= 0) {
                     return;
                 }
@@ -152,6 +202,9 @@ final class Utils {
                     return;
                 }
             }
+            catch (IOException e) {
+                rc = handleIOException(conn);
+            }
             finally {
                 if (conn != null) {
                     conn.disconnect();

Modified: ace/trunk/org.apache.ace.repository/src/org/apache/ace/repository/ext/impl/RemoteRepository.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.repository/src/org/apache/ace/repository/ext/impl/RemoteRepository.java?rev=1530190&r1=1530189&r2=1530190&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.repository/src/org/apache/ace/repository/ext/impl/RemoteRepository.java
(original)
+++ ace/trunk/org.apache.ace.repository/src/org/apache/ace/repository/ext/impl/RemoteRepository.java
Tue Oct  8 09:04:32 2013
@@ -34,16 +34,15 @@ import org.apache.ace.range.SortedRangeS
 import org.apache.ace.repository.Repository;
 
 /**
- * This class works as a local interface for a remote repository by handling the network
- * communication.
+ * This class works as a local interface for a remote repository by handling the network
communication.
  */
 public class RemoteRepository implements Repository {
     private static final String COMMAND_QUERY = "/query";
     private static final String COMMAND_CHECKOUT = "/checkout";
     private static final String COMMAND_COMMIT = "/commit";
-    
+
     private static final String MIME_APPLICATION_OCTET_STREAM = "application/octet-stream";
-    
+
     private static final int COPY_BUFFER_SIZE = 64 * 1024;
 
     private final URL m_url;
@@ -55,9 +54,12 @@ public class RemoteRepository implements
     /**
      * Creates a remote repository that connects to a given location with a given customer-
and repository name.
      * 
-     * @param url The location of the repository.
-     * @param customer The customer name to use.
-     * @param name The repository name to use.
+     * @param url
+     *            The location of the repository.
+     * @param customer
+     *            The customer name to use.
+     * @param name
+     *            The repository name to use.
      */
     public RemoteRepository(URL url, String customer, String name) {
         if (url == null || customer == null || name == null) {
@@ -75,27 +77,29 @@ public class RemoteRepository implements
         }
 
         URL url = buildCommand(m_url, COMMAND_CHECKOUT, version);
+
         HttpURLConnection connection = (HttpURLConnection) m_connectionFactory.createConnection(url);
 
-        if (connection.getResponseCode() == HttpServletResponse.SC_NOT_FOUND) {
-        	connection.disconnect();
-    		throw new IllegalArgumentException("Requested version not found in remote repository.
(" + connection.getResponseMessage() + ")");
+        int rc = connection.getResponseCode();
+        if (rc == HttpServletResponse.SC_NOT_FOUND) {
+            connection.disconnect();
+            throw new IllegalArgumentException("Requested version not found in remote repository.
(" + connection.getResponseMessage() + ")");
         }
-        if (connection.getResponseCode() != HttpServletResponse.SC_OK) {
-        	connection.disconnect();
+        else if (rc != HttpServletResponse.SC_OK) {
+            connection.disconnect();
             throw new IOException("Connection error: " + connection.getResponseMessage());
         }
 
         return connection.getInputStream();
-        
+
     }
 
     public boolean commit(InputStream data, long fromVersion) throws IOException, IllegalArgumentException
{
         URL url = buildCommand(m_url, COMMAND_COMMIT, fromVersion);
         HttpURLConnection connection = (HttpURLConnection) m_connectionFactory.createConnection(url);
-        
+
         // ACE-294: enable streaming mode causing only small amounts of memory to be
-        // used for this commit. Otherwise, the entire input stream is cached into 
+        // used for this commit. Otherwise, the entire input stream is cached into
         // memory prior to sending it to the server...
         connection.setChunkedStreamingMode(8192);
         connection.setRequestProperty("Content-Type", MIME_APPLICATION_OCTET_STREAM);
@@ -103,51 +107,56 @@ public class RemoteRepository implements
 
         OutputStream out = connection.getOutputStream();
         try {
-        	copy(data, out);
-        } finally {
-        	out.flush();
-        	out.close();
+            copy(data, out);
+
+            // causes the stream the be flushed and the server response to be obtained...
+            return connection.getResponseCode() == HttpServletResponse.SC_OK;
+        }
+        finally {
+            out.flush();
+            out.close();
+            connection.disconnect();
         }
-        
-        try {
-			return connection.getResponseCode() == HttpServletResponse.SC_OK;
-		} finally {
-			connection.disconnect();
-		}
     }
 
     public SortedRangeSet getRange() throws IOException {
         URL url = buildCommand(m_url, COMMAND_QUERY, 0);
-        
+
         HttpURLConnection connection = (HttpURLConnection) m_connectionFactory.createConnection(url);
-        
+
         try {
-	        if (connection.getResponseCode() == HttpServletResponse.SC_OK) {
-	            BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
-	            try {
-		            String line = reader.readLine();
-		            if (line == null) {
-		                throw new IOException("Repository not found: customer=" + m_customer +
", name=" + m_name);
-		            }
-
-		            String representation = line.substring(line.lastIndexOf(','));
-		            return new SortedRangeSet(representation);
-	            } finally {
-	            	reader.close();
-	            }
-	        }
-	
-	        throw new IOException("Connection error: " + connection.getResponseMessage());
-        } finally {
-        	connection.disconnect();
+            if (connection.getResponseCode() == HttpServletResponse.SC_OK) {
+                BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
+                try {
+                    String line = reader.readLine();
+                    if (line == null) {
+                        throw new IOException("Repository not found: customer=" + m_customer
+ ", name=" + m_name);
+                    }
+
+                    String representation = line.substring(line.lastIndexOf(','));
+                    return new SortedRangeSet(representation);
+                }
+                finally {
+                    reader.close();
+                }
+            }
+
+            throw new IOException("Connection error: " + connection.getResponseMessage());
+        }
+        finally {
+            connection.disconnect();
         }
     }
 
     /**
      * Helper method which copies the contents of an input stream to an output stream.
-     * @param in The input stream.
-     * @param out The output stream.
-     * @throws java.io.IOException Thrown when one of the streams is closed unexpectedly.
+     * 
+     * @param in
+     *            The input stream.
+     * @param out
+     *            The output stream.
+     * @throws java.io.IOException
+     *             Thrown when one of the streams is closed unexpectedly.
      */
     private static void copy(InputStream in, OutputStream out) throws IOException {
         byte[] buffer = new byte[COPY_BUFFER_SIZE];
@@ -159,12 +168,13 @@ public class RemoteRepository implements
     }
 
     /**
-     * Builds a command string to use in the request to the server, based on the parameters
-     * this object was created with. The version is only mandatory for <code>CHECKOUT</code>
-     * and <code>COMMIT</code>.
+     * Builds a command string to use in the request to the server, based on the parameters
this object was created
+     * with. The version is only mandatory for <code>CHECKOUT</code> and <code>COMMIT</code>.
      * 
-     * @param command A command string, use the <code>COMMAND_</code> constants
in this file.
-     * @param version A version statement.
+     * @param command
+     *            A command string, use the <code>COMMAND_</code> constants in
this file.
+     * @param version
+     *            A version statement.
      * @return The command string.
      */
     private URL buildCommand(URL url, String command, long version) {
@@ -188,12 +198,12 @@ public class RemoteRepository implements
             }
             params.append("version=").append(version);
         }
-        
+
         StringBuilder newURL = new StringBuilder();
         newURL.append(url.toExternalForm());
         newURL.append(command);
         if (params.length() > 0) {
-        	newURL.append("?").append(params);
+            newURL.append("?").append(params);
         }
 
         try {
@@ -208,4 +218,4 @@ public class RemoteRepository implements
     public String toString() {
         return "RemoteRepository[" + m_url + "," + m_customer + "," + m_name + "]";
     }
-}
\ No newline at end of file
+}



Mime
View raw message