Return-Path: X-Original-To: apmail-manifoldcf-commits-archive@www.apache.org Delivered-To: apmail-manifoldcf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E7D7DE08F for ; Fri, 23 Nov 2012 15:37:57 +0000 (UTC) Received: (qmail 1320 invoked by uid 500); 23 Nov 2012 15:37:57 -0000 Delivered-To: apmail-manifoldcf-commits-archive@manifoldcf.apache.org Received: (qmail 1182 invoked by uid 500); 23 Nov 2012 15:37:53 -0000 Mailing-List: contact commits-help@manifoldcf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@manifoldcf.apache.org Delivered-To: mailing list commits@manifoldcf.apache.org Received: (qmail 1142 invoked by uid 99); 23 Nov 2012 15:37:52 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Nov 2012 15:37:52 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Nov 2012 15:37:49 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id DA03E238897F; Fri, 23 Nov 2012 15:37:28 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1412917 - in /manifoldcf/branches/CONNECTORS-120/connectors/livelink/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/livelink: LivelinkAuthority.java LivelinkConnector.java Date: Fri, 23 Nov 2012 15:37:28 -0000 To: commits@manifoldcf.apache.org From: kwright@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121123153728.DA03E238897F@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: kwright Date: Fri Nov 23 15:37:27 2012 New Revision: 1412917 URL: http://svn.apache.org/viewvc?rev=1412917&view=rev Log: Finished conversion of Livelink connector to httpcomponents. Modified: manifoldcf/branches/CONNECTORS-120/connectors/livelink/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/livelink/LivelinkAuthority.java manifoldcf/branches/CONNECTORS-120/connectors/livelink/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/livelink/LivelinkConnector.java Modified: manifoldcf/branches/CONNECTORS-120/connectors/livelink/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/livelink/LivelinkAuthority.java URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-120/connectors/livelink/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/livelink/LivelinkAuthority.java?rev=1412917&r1=1412916&r2=1412917&view=diff ============================================================================== --- manifoldcf/branches/CONNECTORS-120/connectors/livelink/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/livelink/LivelinkAuthority.java (original) +++ manifoldcf/branches/CONNECTORS-120/connectors/livelink/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/livelink/LivelinkAuthority.java Fri Nov 23 15:37:27 2012 @@ -31,11 +31,6 @@ import java.util.regex.*; import com.opentext.api.*; -import org.apache.commons.httpclient.*; -import org.apache.commons.httpclient.methods.*; -import org.apache.commons.httpclient.params.*; - - /** This is the Livelink implementation of the IAuthorityConnector interface. * This is not based on Volant code, but has been developed by me at the behest of * James Maupin for use at Shell. Modified: manifoldcf/branches/CONNECTORS-120/connectors/livelink/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/livelink/LivelinkConnector.java URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-120/connectors/livelink/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/livelink/LivelinkConnector.java?rev=1412917&r1=1412916&r2=1412917&view=diff ============================================================================== --- manifoldcf/branches/CONNECTORS-120/connectors/livelink/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/livelink/LivelinkConnector.java (original) +++ manifoldcf/branches/CONNECTORS-120/connectors/livelink/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/livelink/LivelinkConnector.java Fri Nov 23 15:37:27 2012 @@ -23,18 +23,53 @@ import org.apache.manifoldcf.agents.inte import org.apache.manifoldcf.crawler.interfaces.*; import org.apache.manifoldcf.crawler.system.Logging; import org.apache.manifoldcf.crawler.system.ManifoldCF; +import org.apache.manifoldcf.core.common.XThreadInputStream; import java.io.*; import java.util.*; import java.net.*; +import java.util.concurrent.TimeUnit; import com.opentext.api.*; -import org.apache.commons.httpclient.*; -import org.apache.commons.httpclient.methods.*; -import org.apache.commons.httpclient.params.*; -import org.apache.commons.httpclient.auth.*; -import org.apache.commons.httpclient.protocol.*; +import org.apache.http.conn.ClientConnectionManager; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.NameValuePair; +import org.apache.http.impl.conn.PoolingClientConnectionManager; +import org.apache.http.conn.scheme.Scheme; +import org.apache.http.conn.ssl.SSLSocketFactory; +import org.apache.http.conn.ssl.BrowserCompatHostnameVerifier; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.NTCredentials; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.impl.client.AbstractHttpClient; +import org.apache.http.impl.client.DefaultRedirectStrategy; +import org.apache.http.util.EntityUtils; +import org.apache.http.params.BasicHttpParams; +import org.apache.http.params.HttpParams; +import org.apache.http.params.CoreConnectionPNames; +import org.apache.http.HttpStatus; +import org.apache.http.HttpHost; +import org.apache.http.Header; +import org.apache.http.conn.params.ConnRoutePNames; +import org.apache.http.message.BasicHeader; +import org.apache.http.client.params.ClientPNames; +import org.apache.http.client.params.HttpClientParams; +import org.apache.http.message.BasicNameValuePair; +import org.apache.http.protocol.HTTP; +import org.apache.http.client.entity.UrlEncodedFormEntity; + +import org.apache.http.conn.ConnectTimeoutException; +import org.apache.http.client.RedirectException; +import org.apache.http.client.CircularRedirectException; +import org.apache.http.NoHttpResponseException; +import org.apache.http.HttpException; /** This is the Livelink implementation of the IRepositoryConnectr interface. @@ -105,12 +140,11 @@ public class LivelinkConnector extends o // SSL support private String keystoreData = null; private IKeystoreManager keystoreManager = null; - private LivelinkSecureSocketFactory secureSocketFactory = null; - private ProtocolFactory myFactory = null; // Connection management - private MultiThreadedHttpConnectionManager connectionManager = null; - + private ClientConnectionManager connectionManager = null; + private AbstractHttpClient httpClient = null; + // Base path for viewing private String viewBasePath = null; @@ -348,26 +382,39 @@ public class LivelinkConnector extends o else serverPort = new Integer(serverPortString).intValue(); + // Set up connection manager + PoolingClientConnectionManager localConnectionManager = new PoolingClientConnectionManager(); + localConnectionManager.setMaxTotal(1); // Set up ssl if indicated keystoreData = params.getParameter(LiveLinkParameters.livelinkKeystore); - myFactory = new ProtocolFactory(); - if (keystoreData != null) { keystoreManager = KeystoreManagerFactory.make("",keystoreData); - secureSocketFactory = new LivelinkSecureSocketFactory(keystoreManager.getSecureSocketFactory()); - Protocol myHttpsProtocol = new Protocol("https", (ProtocolSocketFactory)secureSocketFactory, 443); - myFactory.registerProtocol("https",myHttpsProtocol); - if (Logging.connectors.isDebugEnabled()) - { - Logging.connectors.debug("Livelink: Created new secure protocol class instance; factory type is '"+myHttpsProtocol.getSocketFactory().getClass().getName()+"'"); - } + SSLSocketFactory myFactory = new SSLSocketFactory(keystoreManager.getSecureSocketFactory(), + new BrowserCompatHostnameVerifier()); + Scheme myHttpsProtocol = new Scheme("https", 443, myFactory); + localConnectionManager.getSchemeRegistry().register(myHttpsProtocol); + } + connectionManager = localConnectionManager; + + // Create the httpclient + BasicHttpParams params = new BasicHttpParams(); + params.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY,true); + params.setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK,false); + params.setBooleanParameter(ClientPNames.ALLOW_CIRCULAR_REDIRECTS,true); + params.setIntParameter(CoreConnectionPNames.SO_TIMEOUT,60000); + params.setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT,300000); + params.setBooleanParameter(ClientPNames.HANDLE_REDIRECTS,true); + DefaultHttpClient localHttpClient = new DefaultHttpClient(connectionManager,params); + localHttpClient.setRedirectStrategy(new DefaultRedirectStrategy()); + // Set up authentication to use + if (ntlmDomain != null) + { + localHttpClient.getCredentialsProvider().setCredentials(AuthScope.ANY, + new NTCredentials(ntlmUsername,ntlmPassword,currentHost,ntlmDomain)); } - - // Set up connection manager - connectionManager = new MultiThreadedHttpConnectionManager(); - connectionManager.getParams().setMaxTotalConnections(1); - + httpClient = localHttpClient; + // System.out.println("Connection server object = "+llServer.toString()); // Establish the actual connection @@ -412,73 +459,26 @@ public class LivelinkConnector extends o // All methods below this line will ONLY be called if a connect() call succeeded // on this instance! - protected static class ExecuteMethodThread extends Thread - { - protected HttpClient client; - protected HttpMethodBase executeMethod; - protected HostConfiguration hostConfiguration; - protected Throwable exception = null; - protected int rval = 0; - - public ExecuteMethodThread(HttpClient client, HostConfiguration hostConfiguration, HttpMethodBase executeMethod) - { - super(); - setDaemon(true); - this.client = client; - this.hostConfiguration = hostConfiguration; - this.executeMethod = executeMethod; - } - public void run() - { - try - { - // Call the execute method appropriately - rval = client.executeMethod(hostConfiguration,executeMethod,null); - } - catch (Throwable e) - { - this.exception = e; - } - } - - public Throwable getException() - { - return exception; - } - - public int getResponse() - { - return rval; - } - } - - protected static int executeMethodViaThread(HttpClient client, HostConfiguration hostConfiguration, HttpMethodBase executeMethod) - throws InterruptedException, IOException + protected static int executeMethodViaThread(HttpClient client, HttpRequestBase executeMethod) + throws InterruptedException, HttpException, IOException { - ExecuteMethodThread t = new ExecuteMethodThread(client,hostConfiguration,executeMethod); + ExecuteMethodThread t = new ExecuteMethodThread(client,executeMethod); + t.start(); try { - t.start(); - t.join(); - Throwable thr = t.getException(); - if (thr != null) - { - if (thr instanceof IOException) - throw (IOException)thr; - else if (thr instanceof RuntimeException) - throw (RuntimeException)thr; - else - throw (Error)thr; - } - return t.getResponse(); + return t.getResponseCode(); } catch (InterruptedException e) { t.interrupt(); - // We need the caller to abandon any connections left around, so rethrow in a way that forces them to process the event properly. throw e; } + finally + { + t.abort(); + t.finishUp(); + } } /** Check status of connection. @@ -498,71 +498,56 @@ public class LivelinkConnector extends o String ingestHttpAddress = ingestCgiPath; HttpClient client = getInitializedClient(contextMsg); - + HttpGet method = new HttpGet(ingestHttpAddress); try { - // Set up fetch using our special stuff if it's https - GetMethod method = new GetMethod(ingestHttpAddress); - try + int statusCode = executeMethodViaThread(client,method); + switch (statusCode) { - method.getParams().setParameter("http.socket.timeout", new Integer(300000)); - method.setFollowRedirects(true); - - int statusCode = executeMethodViaThread(client,getHostConfiguration(contextMsg),method); - switch (statusCode) - { - case 502: - return "Fetch test had transient 502 error response"; + case 502: + return "Fetch test had transient 502 error response"; - case HttpStatus.SC_UNAUTHORIZED: - return "Fetch test returned UNAUTHORIZED (401) response; check the security credentials and configuration"; + case HttpStatus.SC_UNAUTHORIZED: + return "Fetch test returned UNAUTHORIZED (401) response; check the security credentials and configuration"; - case HttpStatus.SC_OK: - return super.check(); + case HttpStatus.SC_OK: + return super.check(); - default: - return "Fetch test returned an unexpected response code of "+Integer.toString(statusCode); - } - } - catch (InterruptedException e) - { - // Drop the connection on the floor - method = null; - throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED); - } - catch (java.net.SocketTimeoutException e) - { - return "Fetch test timed out reading from the Livelink HTTP Server: "+e.getMessage(); - } - catch (java.net.SocketException e) - { - return "Fetch test received a socket error reading from Livelink HTTP Server: "+e.getMessage(); - } - catch (javax.net.ssl.SSLHandshakeException e) - { - return "Fetch test was unable to set up a SSL connection to Livelink HTTP Server: "+e.getMessage(); - } - catch (org.apache.commons.httpclient.ConnectTimeoutException e) - { - return "Fetch test connection timed out reading from Livelink HTTP Server: "+e.getMessage(); - } - catch (InterruptedIOException e) - { - throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED); - } - catch (IOException e) - { - return "Fetch test had an IO failure: "+e.getMessage(); - } - finally - { - if (method != null) - method.releaseConnection(); + default: + return "Fetch test returned an unexpected response code of "+Integer.toString(statusCode); } } - catch (IllegalStateException e) + catch (InterruptedException e) + { + throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED); + } + catch (java.net.SocketTimeoutException e) + { + return "Fetch test timed out reading from the Livelink HTTP Server: "+e.getMessage(); + } + catch (java.net.SocketException e) + { + return "Fetch test received a socket error reading from Livelink HTTP Server: "+e.getMessage(); + } + catch (javax.net.ssl.SSLHandshakeException e) + { + return "Fetch test was unable to set up a SSL connection to Livelink HTTP Server: "+e.getMessage(); + } + catch (ConnectTimeoutException e) + { + return "Fetch test connection timed out reading from Livelink HTTP Server: "+e.getMessage(); + } + catch (InterruptedIOException e) + { + throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED); + } + catch (HttpException e) { - return "Fetch test had a state exception talking to Livelink HTTP Server: "+e.getMessage(); + return "Fetch test had an HTTP exception: "+e.getMessage(); + } + catch (IOException e) + { + return "Fetch test had an IO failure: "+e.getMessage(); } } catch (ServiceInterruption e) @@ -585,7 +570,7 @@ public class LivelinkConnector extends o throws ManifoldCFException { if (connectionManager != null) - connectionManager.closeIdleConnections(60000L); + connectionManager.closeIdleConnections(60000L,TimeUnit.MILLISECONDS); } /** Close the connection. Call this before discarding the repository connector. @@ -601,8 +586,6 @@ public class LivelinkConnector extends o LLAttributes = null; keystoreData = null; keystoreManager = null; - secureSocketFactory = null; - myFactory = null; ingestPortNumber = -1; serverName = null; @@ -3665,64 +3648,6 @@ public class LivelinkConnector extends o } } - protected static class CloseThread extends Thread - { - protected InputStream is; - protected Throwable exception = null; - - public CloseThread(InputStream is) - { - super(); - setDaemon(true); - this.is = is; - } - - public void run() - { - try - { - // Call the close method appropriately - is.close(); - } - catch (Throwable e) - { - this.exception = e; - } - } - - public Throwable getException() - { - return exception; - } - } - - protected static void closeViaThread(InputStream is) - throws InterruptedException, IOException - { - CloseThread t = new CloseThread(is); - try - { - t.start(); - t.join(); - Throwable thr = t.getException(); - if (thr != null) - { - if (thr instanceof IOException) - throw (IOException)thr; - else if (thr instanceof RuntimeException) - throw (RuntimeException)thr; - else - throw (Error)thr; - } - } - catch (InterruptedException e) - { - t.interrupt(); - // We need the caller to abandon any connections left around, so rethrow in a way that forces them to process the event properly. - throw e; - } - } - /** * Connects to the specified Livelink document using HTTP protocol * @param documentIdentifier is the document identifier (as far as the crawler knows). @@ -3843,328 +3768,299 @@ public class LivelinkConnector extends o if (Logging.connectors.isInfoEnabled()) Logging.connectors.info("Livelink: " + ingestHttpAddress); + long startTime = System.currentTimeMillis(); + String resultCode = "OK"; + String resultDescription = null; + Long readSize = null; + + HttpGet method = new HttpGet(ingestHttpAddress); + ExecuteMethodThread methodThread = new ExecuteMethodThread(client,method); + methodThread.start(); try { - long startTime = System.currentTimeMillis(); - String resultCode = "OK"; - String resultDescription = null; - Long readSize = null; - // Set up fetch using our special stuff if it's https - GetMethod method = new GetMethod(ingestHttpAddress); - try + int statusCode = methodThread.getResponseCode(); + switch (statusCode) { - method.getParams().setParameter("http.socket.timeout", new Integer(300000)); - method.setFollowRedirects(true); - + case 500: + case 502: + Logging.connectors.warn("Livelink: Service interruption during fetch "+contextMsg+" with Livelink HTTP Server, retrying..."); + throw new ServiceInterruption("Service interruption during fetch",new ManifoldCFException(Integer.toString(statusCode)+" error while fetching"),System.currentTimeMillis()+60000L, + System.currentTimeMillis()+600000L,-1,true); + + case HttpStatus.SC_UNAUTHORIZED: + Logging.connectors.warn("Livelink: Document fetch unauthorized for "+ingestHttpAddress+" ("+contextMsg+")"); + // Since we logged in, we should fail here if the ingestion user doesn't have access to the + // the document, but if we do, don't fail hard. + resultCode = "UNAUTHORIZED"; + activities.deleteDocument(documentIdentifier,version); + return; - int statusCode = executeMethodViaThread(client,getHostConfiguration(contextMsg),method); - switch (statusCode) + case HttpStatus.SC_OK: + if (Logging.connectors.isDebugEnabled()) + Logging.connectors.debug("Livelink: Created http document connection to Livelink "+contextMsg); + long dataSize = methodThread.getResponseContentLength(); + // The above replaces this, which required another access: + // long dataSize = (long)value.toInteger("DataSize"); + // A non-existent content length will cause a value of -1 to be returned. This seems to indicate that the session login did not work right. + if (dataSize >= 0) { - case 500: - case 502: - Logging.connectors.warn("Livelink: Service interruption during fetch "+contextMsg+" with Livelink HTTP Server, retrying..."); - throw new ServiceInterruption("Service interruption during fetch",new ManifoldCFException(Integer.toString(statusCode)+" error while fetching"),System.currentTimeMillis()+60000L, - System.currentTimeMillis()+600000L,-1,true); - - case HttpStatus.SC_UNAUTHORIZED: - Logging.connectors.warn("Livelink: Document fetch unauthorized for "+ingestHttpAddress+" ("+contextMsg+")"); - // Since we logged in, we should fail here if the ingestion user doesn't have access to the - // the document, but if we do, don't fail hard. - resultCode = "UNAUTHORIZED"; - activities.deleteDocument(documentIdentifier,version); - return; - - case HttpStatus.SC_OK: if (Logging.connectors.isDebugEnabled()) - Logging.connectors.debug("Livelink: Created http document connection to Livelink "+contextMsg); - long dataSize = method.getResponseContentLength(); - // The above replaces this, which required another access: - // long dataSize = (long)value.toInteger("DataSize"); - // A non-existent content length will cause a value of -1 to be returned. This seems to indicate that the session login did not work right. - if (dataSize >= 0) + Logging.connectors.debug("Livelink: Content length from livelink server "+contextMsg+"' = "+new Long(dataSize).toString()); + if (activities.checkLengthIndexable(dataSize)) { - if (Logging.connectors.isDebugEnabled()) - Logging.connectors.debug("Livelink: Content length from livelink server "+contextMsg+"' = "+new Long(dataSize).toString()); - if (activities.checkLengthIndexable(dataSize)) + try { + InputStream is = methodThread.getSafeInputStream(); try { - InputStream is = method.getResponseBodyAsStream(); - try - { - rd.setBinary(is,dataSize); - - activities.ingestDocument(documentIdentifier,version,viewHttpAddress,rd); + rd.setBinary(is,dataSize); + + activities.ingestDocument(documentIdentifier,version,viewHttpAddress,rd); - if (Logging.connectors.isDebugEnabled()) - Logging.connectors.debug("Livelink: Ingesting done "+contextMsg); + if (Logging.connectors.isDebugEnabled()) + Logging.connectors.debug("Livelink: Ingesting done "+contextMsg); - } - finally - { - // Close stream via thread, since otherwise this can hang - closeViaThread(is); - } - } - catch (java.net.SocketTimeoutException e) - { - resultCode = "DATATIMEOUT"; - resultDescription = e.getMessage(); - currentTime = System.currentTimeMillis(); - Logging.connectors.warn("Livelink: Livelink socket timed out ingesting from the Livelink HTTP Server "+contextMsg+": "+e.getMessage(), e); - throw new ServiceInterruption("Socket timed out: "+e.getMessage(),e,currentTime+300000L,currentTime+6*3600000L,-1,false); - } - catch (java.net.SocketException e) - { - resultCode = "DATASOCKETERROR"; - resultDescription = e.getMessage(); - currentTime = System.currentTimeMillis(); - Logging.connectors.warn("Livelink: Livelink socket error ingesting from the Livelink HTTP Server "+contextMsg+": "+e.getMessage(), e); - throw new ServiceInterruption("Socket error: "+e.getMessage(),e,currentTime+300000L,currentTime+6*3600000L,-1,false); } - catch (javax.net.ssl.SSLHandshakeException e) + finally { - resultCode = "DATASSLHANDSHAKEERROR"; - resultDescription = e.getMessage(); - currentTime = System.currentTimeMillis(); - Logging.connectors.warn("Livelink: SSL handshake failed authenticating "+contextMsg+": "+e.getMessage(),e); - throw new ServiceInterruption("SSL handshake error: "+e.getMessage(),e,currentTime+60000L,currentTime+300000L,-1,true); + // Close stream via thread, since otherwise this can hang + is.close(); } - catch (org.apache.commons.httpclient.ConnectTimeoutException e) - { - resultCode = "CONNECTTIMEOUT"; - resultDescription = e.getMessage(); - currentTime = System.currentTimeMillis(); - Logging.connectors.warn("Livelink: Livelink socket timed out connecting to the Livelink HTTP Server "+contextMsg+": "+e.getMessage(), e); - throw new ServiceInterruption("Connect timed out: "+e.getMessage(),e,currentTime+300000L,currentTime+6*3600000L,-1,false); - } - catch (InterruptedException e) - { - throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED); - } - catch (InterruptedIOException e) - { - throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED); - } - catch (IOException e) - { - resultCode = "DATAEXCEPTION"; - resultDescription = e.getMessage(); - // Treat unknown error ingesting data as a transient condition - currentTime = System.currentTimeMillis(); - Logging.connectors.warn("Livelink: IO exception ingesting "+contextMsg+": "+e.getMessage(),e); - throw new ServiceInterruption("IO exception ingesting "+contextMsg+": "+e.getMessage(),e,currentTime+300000L,currentTime+6*3600000L,-1,false); - } - readSize = new Long(dataSize); } - else + catch (java.net.SocketTimeoutException e) + { + resultCode = "DATATIMEOUT"; + resultDescription = e.getMessage(); + currentTime = System.currentTimeMillis(); + Logging.connectors.warn("Livelink: Livelink socket timed out ingesting from the Livelink HTTP Server "+contextMsg+": "+e.getMessage(), e); + throw new ServiceInterruption("Socket timed out: "+e.getMessage(),e,currentTime+300000L,currentTime+6*3600000L,-1,false); + } + catch (java.net.SocketException e) + { + resultCode = "DATASOCKETERROR"; + resultDescription = e.getMessage(); + currentTime = System.currentTimeMillis(); + Logging.connectors.warn("Livelink: Livelink socket error ingesting from the Livelink HTTP Server "+contextMsg+": "+e.getMessage(), e); + throw new ServiceInterruption("Socket error: "+e.getMessage(),e,currentTime+300000L,currentTime+6*3600000L,-1,false); + } + catch (javax.net.ssl.SSLHandshakeException e) { - resultCode = "DOCUMENTTOOLONG"; - activities.deleteDocument(documentIdentifier,version); + resultCode = "DATASSLHANDSHAKEERROR"; + resultDescription = e.getMessage(); + currentTime = System.currentTimeMillis(); + Logging.connectors.warn("Livelink: SSL handshake failed authenticating "+contextMsg+": "+e.getMessage(),e); + throw new ServiceInterruption("SSL handshake error: "+e.getMessage(),e,currentTime+60000L,currentTime+300000L,-1,true); } + catch (ConnectTimeoutException e) + { + resultCode = "CONNECTTIMEOUT"; + resultDescription = e.getMessage(); + currentTime = System.currentTimeMillis(); + Logging.connectors.warn("Livelink: Livelink socket timed out connecting to the Livelink HTTP Server "+contextMsg+": "+e.getMessage(), e); + throw new ServiceInterruption("Connect timed out: "+e.getMessage(),e,currentTime+300000L,currentTime+6*3600000L,-1,false); + } + catch (InterruptedException e) + { + throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED); + } + catch (InterruptedIOException e) + { + throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED); + } + catch (HttpException e) + { + resultCode = "HTTPEXCEPTION"; + resultDescription = e.getMessage(); + // Treat unknown error ingesting data as a transient condition + currentTime = System.currentTimeMillis(); + Logging.connectors.warn("Livelink: HTTP exception ingesting "+contextMsg+": "+e.getMessage(),e); + throw new ServiceInterruption("HTTP exception ingesting "+contextMsg+": "+e.getMessage(),e,currentTime+300000L,currentTime+6*3600000L,-1,false); + } + catch (IOException e) + { + resultCode = "DATAEXCEPTION"; + resultDescription = e.getMessage(); + // Treat unknown error ingesting data as a transient condition + currentTime = System.currentTimeMillis(); + Logging.connectors.warn("Livelink: IO exception ingesting "+contextMsg+": "+e.getMessage(),e); + throw new ServiceInterruption("IO exception ingesting "+contextMsg+": "+e.getMessage(),e,currentTime+300000L,currentTime+6*3600000L,-1,false); + } + readSize = new Long(dataSize); } else { - resultCode = "SESSIONLOGINFAILED"; + resultCode = "DOCUMENTTOOLONG"; activities.deleteDocument(documentIdentifier,version); } - break; - case HttpStatus.SC_BAD_REQUEST: - case HttpStatus.SC_USE_PROXY: - case HttpStatus.SC_GONE: - resultCode = "ERROR "+Integer.toString(statusCode); - throw new ManifoldCFException("Unrecoverable request failure; error = "+Integer.toString(statusCode)); - default: - resultCode = "UNKNOWN"; - Logging.connectors.warn("Livelink: Attempt to retrieve document from '"+ingestHttpAddress+"' received a response of "+Integer.toString(statusCode)+"; retrying in one minute"); - currentTime = System.currentTimeMillis(); - throw new ServiceInterruption("Fetch failed; retrying in 1 minute",new ManifoldCFException("Fetch failed with unknown code "+Integer.toString(statusCode)), - currentTime+60000L,currentTime+600000L,-1,true); } - } - catch (InterruptedException e) - { - // Drop the connection on the floor - method = null; - throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED); - } - catch (java.net.SocketTimeoutException e) - { - Logging.connectors.warn("Livelink: Socket timed out reading from the Livelink HTTP Server "+contextMsg+": "+e.getMessage(), e); - resultCode = "TIMEOUT"; - resultDescription = e.getMessage(); - currentTime = System.currentTimeMillis(); - throw new ServiceInterruption("Socket timed out: "+e.getMessage(),e,currentTime+300000L,currentTime+6*3600000L,-1,true); - } - catch (java.net.SocketException e) - { - Logging.connectors.warn("Livelink: Socket error reading from Livelink HTTP Server "+contextMsg+": "+e.getMessage(), e); - resultCode = "SOCKETERROR"; - resultDescription = e.getMessage(); - currentTime = System.currentTimeMillis(); - throw new ServiceInterruption("Socket error: "+e.getMessage(),e,currentTime+300000L,currentTime+6*3600000L,-1,true); - } - catch (javax.net.ssl.SSLHandshakeException e) - { - currentTime = System.currentTimeMillis(); - Logging.connectors.warn("Livelink: SSL handshake failed "+contextMsg+": "+e.getMessage(),e); - resultCode = "SSLHANDSHAKEERROR"; - resultDescription = e.getMessage(); - throw new ServiceInterruption("SSL handshake error: "+e.getMessage(),e,currentTime+60000L,currentTime+300000L,-1,true); - } - catch (org.apache.commons.httpclient.ConnectTimeoutException e) - { - Logging.connectors.warn("Livelink: Connect timed out reading from the Livelink HTTP Server "+contextMsg+": "+e.getMessage(), e); - resultCode = "CONNECTTIMEOUT"; - resultDescription = e.getMessage(); + else + { + resultCode = "SESSIONLOGINFAILED"; + activities.deleteDocument(documentIdentifier,version); + } + break; + case HttpStatus.SC_BAD_REQUEST: + case HttpStatus.SC_USE_PROXY: + case HttpStatus.SC_GONE: + resultCode = "ERROR "+Integer.toString(statusCode); + throw new ManifoldCFException("Unrecoverable request failure; error = "+Integer.toString(statusCode)); + default: + resultCode = "UNKNOWN"; + Logging.connectors.warn("Livelink: Attempt to retrieve document from '"+ingestHttpAddress+"' received a response of "+Integer.toString(statusCode)+"; retrying in one minute"); currentTime = System.currentTimeMillis(); - throw new ServiceInterruption("Connect timed out: "+e.getMessage(),e,currentTime+300000L,currentTime+6*3600000L,-1,true); + throw new ServiceInterruption("Fetch failed; retrying in 1 minute",new ManifoldCFException("Fetch failed with unknown code "+Integer.toString(statusCode)), + currentTime+60000L,currentTime+600000L,-1,true); } - catch (InterruptedIOException e) - { - throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED); - } - catch (IOException e) - { - resultCode = "EXCEPTION"; - resultDescription = e.getMessage(); - throw new ManifoldCFException("Exception getting response "+contextMsg+": "+e.getMessage(), e); - } - finally + } + catch (InterruptedException e) + { + // Drop the connection on the floor + methodThread.interrupt(); + methodThread = null; + throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED); + } + catch (java.net.SocketTimeoutException e) + { + Logging.connectors.warn("Livelink: Socket timed out reading from the Livelink HTTP Server "+contextMsg+": "+e.getMessage(), e); + resultCode = "TIMEOUT"; + resultDescription = e.getMessage(); + currentTime = System.currentTimeMillis(); + throw new ServiceInterruption("Socket timed out: "+e.getMessage(),e,currentTime+300000L,currentTime+6*3600000L,-1,true); + } + catch (java.net.SocketException e) + { + Logging.connectors.warn("Livelink: Socket error reading from Livelink HTTP Server "+contextMsg+": "+e.getMessage(), e); + resultCode = "SOCKETERROR"; + resultDescription = e.getMessage(); + currentTime = System.currentTimeMillis(); + throw new ServiceInterruption("Socket error: "+e.getMessage(),e,currentTime+300000L,currentTime+6*3600000L,-1,true); + } + catch (javax.net.ssl.SSLHandshakeException e) + { + currentTime = System.currentTimeMillis(); + Logging.connectors.warn("Livelink: SSL handshake failed "+contextMsg+": "+e.getMessage(),e); + resultCode = "SSLHANDSHAKEERROR"; + resultDescription = e.getMessage(); + throw new ServiceInterruption("SSL handshake error: "+e.getMessage(),e,currentTime+60000L,currentTime+300000L,-1,true); + } + catch (ConnectTimeoutException e) + { + Logging.connectors.warn("Livelink: Connect timed out reading from the Livelink HTTP Server "+contextMsg+": "+e.getMessage(), e); + resultCode = "CONNECTTIMEOUT"; + resultDescription = e.getMessage(); + currentTime = System.currentTimeMillis(); + throw new ServiceInterruption("Connect timed out: "+e.getMessage(),e,currentTime+300000L,currentTime+6*3600000L,-1,true); + } + catch (InterruptedIOException e) + { + throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED); + } + catch (HttpException e) + { + resultCode = "EXCEPTION"; + resultDescription = e.getMessage(); + throw new ManifoldCFException("Exception getting response "+contextMsg+": "+e.getMessage(), e); + } + catch (IOException e) + { + resultCode = "EXCEPTION"; + resultDescription = e.getMessage(); + throw new ManifoldCFException("Exception getting response "+contextMsg+": "+e.getMessage(), e); + } + finally + { + if (methodThread != null) { - if (method != null) + methodThread.abort(); + activities.recordActivity(new Long(startTime),ACTIVITY_FETCH,readSize,Integer.toString(objID),resultCode,resultDescription,null); + try + { + methodThread.finishUp(); + } + catch (InterruptedException e) { - method.releaseConnection(); - activities.recordActivity(new Long(startTime),ACTIVITY_FETCH,readSize,Integer.toString(objID),resultCode,resultDescription,null); + throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED); } } } - catch (IllegalStateException e) - { - Logging.connectors.error("Livelink: State exception dealing with '"+ingestHttpAddress+"': "+e.getMessage(),e); - throw new ManifoldCFException("State exception dealing with '"+ingestHttpAddress+"': "+e.getMessage(),e); - } - } - - /** Initialize the host configuration */ - protected HostConfiguration getHostConfiguration(String contextMsg) - { - HostConfiguration clientConf = new HostConfiguration(); - // clientConf.setLocalAddress(currentAddr); - clientConf.setParams(new HostParams()); - clientConf.setHost(llServer.getHost(),ingestPortNumber,myFactory.getProtocol(ingestProtocol)); - return clientConf; } /** Initialize a livelink client connection */ protected HttpClient getInitializedClient(String contextMsg) throws ServiceInterruption, ManifoldCFException { - HttpClient client = new HttpClient(connectionManager); - client.getParams().setParameter(org.apache.commons.httpclient.params.HttpClientParams.PROTOCOL_FACTORY,myFactory); - client.getParams().setParameter(org.apache.commons.httpclient.params.HttpClientParams.ALLOW_CIRCULAR_REDIRECTS,new Boolean(true)); - long currentTime; - - if (ntlmDomain != null) - { - // Set the NTLM credentials - if (Logging.connectors.isDebugEnabled()) - Logging.connectors.debug("Livelink: Setting up NTLM credentials "+contextMsg); - client.getState().setCredentials(AuthScope.ANY, - new NTCredentials(ntlmUsername,ntlmPassword,currentHost,ntlmDomain)); - } - if (Logging.connectors.isDebugEnabled()) Logging.connectors.debug("Livelink: Session authenticating via http "+contextMsg+"..."); + HttpGet authget = new HttpGet(createLivelinkLoginURI()); try { - GetMethod authget = new GetMethod(createLivelinkLoginURI()); - try - { - authget.getParams().setParameter("http.socket.timeout", new Integer(60000)); - authget.setFollowRedirects(true); - if (Logging.connectors.isDebugEnabled()) - Logging.connectors.debug("Livelink: Created new GetMethod "+contextMsg+"; executing authentication method"); - int statusCode = executeMethodViaThread(client,getHostConfiguration(contextMsg),authget); - - if (statusCode == 502 || statusCode == 500) - { - Logging.connectors.warn("Livelink: Service interruption during authentication "+contextMsg+" with Livelink HTTP Server, retrying..."); - currentTime = System.currentTimeMillis(); - throw new ServiceInterruption("502 error during authentication",new ManifoldCFException("502 error while authenticating"), - currentTime+60000L,currentTime+600000L,-1,true); - } - if (statusCode != HttpStatus.SC_OK) - { - Logging.connectors.error("Livelink: Failed to authenticate "+contextMsg+" against Livelink HTTP Server; Status code: " + statusCode); - // Ok, so we didn't get in - simply do not ingest - if (statusCode == HttpStatus.SC_UNAUTHORIZED) - throw new ManifoldCFException("Session authorization failed with a 401 code; are credentials correct?"); - else - throw new ManifoldCFException("Session authorization failed with code "+Integer.toString(statusCode)); - } - if (Logging.connectors.isDebugEnabled()) - Logging.connectors.debug("Livelink: Retrieving authentication response "+contextMsg+""); - authget.getResponseBodyAsStream(); - if (Logging.connectors.isDebugEnabled()) - Logging.connectors.debug("Livelink: Authentication response retrieved "+contextMsg+""); + if (Logging.connectors.isDebugEnabled()) + Logging.connectors.debug("Livelink: Created new HttpGet "+contextMsg+"; executing authentication method"); + int statusCode = executeMethodViaThread(httpClient,authget); - } - catch (InterruptedException e) - { - // Drop the connection on the floor - authget = null; - throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED); - } - catch (java.net.SocketTimeoutException e) + if (statusCode == 502 || statusCode == 500) { + Logging.connectors.warn("Livelink: Service interruption during authentication "+contextMsg+" with Livelink HTTP Server, retrying..."); currentTime = System.currentTimeMillis(); - Logging.connectors.warn("Livelink: Socket timed out authenticating to the Livelink HTTP Server "+contextMsg+": "+e.getMessage(), e); - throw new ServiceInterruption("Socket timed out: "+e.getMessage(),e,currentTime+300000L,currentTime+6*3600000L,-1,true); + throw new ServiceInterruption("502 error during authentication",new ManifoldCFException("502 error while authenticating"), + currentTime+60000L,currentTime+600000L,-1,true); } - catch (java.net.SocketException e) + if (statusCode != HttpStatus.SC_OK) { - currentTime = System.currentTimeMillis(); - Logging.connectors.warn("Livelink: Socket error authenticating to the Livelink HTTP Server "+contextMsg+": "+e.getMessage(), e); - throw new ServiceInterruption("Socket error: "+e.getMessage(),e,currentTime+300000L,currentTime+6*3600000L,-1,true); - } - catch (javax.net.ssl.SSLHandshakeException e) - { - currentTime = System.currentTimeMillis(); - Logging.connectors.warn("Livelink: SSL handshake failed authenticating "+contextMsg+": "+e.getMessage(),e); - throw new ServiceInterruption("SSL handshake error: "+e.getMessage(),e,currentTime+60000L,currentTime+300000L,-1,true); - } - catch (org.apache.commons.httpclient.ConnectTimeoutException e) - { - currentTime = System.currentTimeMillis(); - Logging.connectors.warn("Livelink: Connect timed out authenticating to the Livelink HTTP Server "+contextMsg+": "+e.getMessage(), e); - throw new ServiceInterruption("Connect timed out: "+e.getMessage(),e,currentTime+300000L,currentTime+6*3600000L,-1,true); - } - catch (InterruptedIOException e) - { - throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED); - } - catch (IOException e) - { - Logging.connectors.error("Livelink: IO exception when authenticating to the Livelink HTTP Server "+contextMsg+": "+e.getMessage(), e); - throw new ManifoldCFException("Unable to communicate with the Livelink HTTP Server: "+e.getMessage(), e); - - } - finally - { - if (authget != null) - authget.releaseConnection(); + Logging.connectors.error("Livelink: Failed to authenticate "+contextMsg+" against Livelink HTTP Server; Status code: " + statusCode); + // Ok, so we didn't get in - simply do not ingest + if (statusCode == HttpStatus.SC_UNAUTHORIZED) + throw new ManifoldCFException("Session authorization failed with a 401 code; are credentials correct?"); + else + throw new ManifoldCFException("Session authorization failed with code "+Integer.toString(statusCode)); } } - catch (IllegalStateException e) + catch (InterruptedException e) + { + throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED); + } + catch (java.net.SocketTimeoutException e) + { + currentTime = System.currentTimeMillis(); + Logging.connectors.warn("Livelink: Socket timed out authenticating to the Livelink HTTP Server "+contextMsg+": "+e.getMessage(), e); + throw new ServiceInterruption("Socket timed out: "+e.getMessage(),e,currentTime+300000L,currentTime+6*3600000L,-1,true); + } + catch (java.net.SocketException e) + { + currentTime = System.currentTimeMillis(); + Logging.connectors.warn("Livelink: Socket error authenticating to the Livelink HTTP Server "+contextMsg+": "+e.getMessage(), e); + throw new ServiceInterruption("Socket error: "+e.getMessage(),e,currentTime+300000L,currentTime+6*3600000L,-1,true); + } + catch (javax.net.ssl.SSLHandshakeException e) + { + currentTime = System.currentTimeMillis(); + Logging.connectors.warn("Livelink: SSL handshake failed authenticating "+contextMsg+": "+e.getMessage(),e); + throw new ServiceInterruption("SSL handshake error: "+e.getMessage(),e,currentTime+60000L,currentTime+300000L,-1,true); + } + catch (ConnectTimeoutException e) + { + currentTime = System.currentTimeMillis(); + Logging.connectors.warn("Livelink: Connect timed out authenticating to the Livelink HTTP Server "+contextMsg+": "+e.getMessage(), e); + throw new ServiceInterruption("Connect timed out: "+e.getMessage(),e,currentTime+300000L,currentTime+6*3600000L,-1,true); + } + catch (InterruptedIOException e) { - Logging.connectors.error("Livelink: State exception dealing with '"+createLivelinkLoginURI()+"'",e); - throw new ManifoldCFException("State exception dealing with login URI: "+e.getMessage(),e); + throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED); + } + catch (HttpException e) + { + Logging.connectors.error("Livelink: HTTP exception when authenticating to the Livelink HTTP Server "+contextMsg+": "+e.getMessage(), e); + throw new ManifoldCFException("Unable to communicate with the Livelink HTTP Server: "+e.getMessage(), e); + } + catch (IOException e) + { + Logging.connectors.error("Livelink: IO exception when authenticating to the Livelink HTTP Server "+contextMsg+": "+e.getMessage(), e); + throw new ManifoldCFException("Unable to communicate with the Livelink HTTP Server: "+e.getMessage(), e); } - return client; + return httpClient; } /** Pack category and attribute */ @@ -6052,104 +5948,6 @@ public class LivelinkConnector extends o } } - /** HTTPClient secure socket factory, which implements SecureProtocolSocketFactory - */ - protected static class LivelinkSecureSocketFactory implements SecureProtocolSocketFactory - { - /** This is the javax.net socket factory. - */ - protected javax.net.ssl.SSLSocketFactory socketFactory; - - /** Constructor */ - public LivelinkSecureSocketFactory(javax.net.ssl.SSLSocketFactory socketFactory) - { - this.socketFactory = socketFactory; - } - - public Socket createSocket( - String host, - int port, - InetAddress clientHost, - int clientPort) - throws IOException, UnknownHostException - { - if (Logging.connectors.isDebugEnabled()) - Logging.connectors.debug("Livelink: Creating secure livelink connection to '"+host+"' on port "+Integer.toString(port)); - return socketFactory.createSocket( - host, - port, - clientHost, - clientPort - ); - } - - public Socket createSocket( - final String host, - final int port, - final InetAddress localAddress, - final int localPort, - final HttpConnectionParams params - ) throws IOException, UnknownHostException, ConnectTimeoutException - { - if (Logging.connectors.isDebugEnabled()) - Logging.connectors.debug("Livelink: Creating secure livelink connection to '"+host+"' on port "+Integer.toString(port)); - - if (params == null) - { - throw new IllegalArgumentException("Parameters may not be null"); - } - int timeout = params.getConnectionTimeout(); - if (timeout == 0) - { - return createSocket(host, port, localAddress, localPort); - } - else - throw new IllegalArgumentException("This implementation does not handle non-zero connection timeouts"); - } - - public Socket createSocket(String host, int port) - throws IOException, UnknownHostException - { - if (Logging.connectors.isDebugEnabled()) - Logging.connectors.debug("Livelink: Creating secure livelink connection to '"+host+"' on port "+Integer.toString(port)); - return socketFactory.createSocket( - host, - port - ); - } - - public Socket createSocket( - Socket socket, - String host, - int port, - boolean autoClose) - throws IOException, UnknownHostException - { - if (Logging.connectors.isDebugEnabled()) - Logging.connectors.debug("Livelink: Creating secure livelink connection to '"+host+"' on port "+Integer.toString(port)); - return socketFactory.createSocket( - socket, - host, - port, - autoClose - ); - } - - public boolean equals(Object obj) - { - if (obj == null || !(obj instanceof LivelinkSecureSocketFactory)) - return false; - // Each object is unique - return super.equals(obj); - } - - public int hashCode() - { - return super.hashCode(); - } - - } - // Here's an interesting note. All of the LAPI exceptions are subclassed off of RuntimeException. This makes life // hell because there is no superclass exception to capture, and even tweaky server communication issues wind up throwing // uncaught RuntimeException's up the stack. @@ -6251,6 +6049,262 @@ public class LivelinkConnector extends o } + /** This thread does the actual socket communication with the server. + * It's set up so that it can be abandoned at shutdown time. + * + * The way it works is as follows: + * - it starts the transaction + * - it receives the response, and saves that for the calling class to inspect + * - it transfers the data part to an input stream provided to the calling class + * - it shuts the connection down + * + * If there is an error, the sequence is aborted, and an exception is recorded + * for the calling class to examine. + * + * The calling class basically accepts the sequence above. It starts the + * thread, and tries to get a response code. If instead an exception is seen, + * the exception is thrown up the stack. + */ + protected static class ExecuteMethodThread extends Thread + { + /** Client and method, all preconfigured */ + protected final HttpClient httpClient; + protected final HttpRequestBase executeMethod; + + protected HttpResponse response = null; + protected Throwable responseException = null; + protected XThreadInputStream threadStream = null; + protected boolean streamCreated = false; + protected Throwable streamException = null; + protected boolean abortThread = false; + + protected Throwable shutdownException = null; + + protected Throwable generalException = null; + + public ExecuteMethodThread(HttpClient httpClient, HttpRequestBase executeMethod) + { + super(); + setDaemon(true); + this.httpClient = httpClient; + this.executeMethod = executeMethod; + } + + public void run() + { + try + { + try + { + // Call the execute method appropriately + synchronized (this) + { + if (!abortThread) + { + try + { + response = httpClient.execute(executeMethod); + } + catch (java.net.SocketTimeoutException e) + { + responseException = e; + } + catch (ConnectTimeoutException e) + { + responseException = e; + } + catch (InterruptedIOException e) + { + throw e; + } + catch (Throwable e) + { + responseException = e; + } + this.notifyAll(); + } + } + + // Start the transfer of the content + if (responseException == null) + { + synchronized (this) + { + if (!abortThread) + { + try + { + InputStream bodyStream = response.getEntity().getContent(); + if (bodyStream != null) + { + threadStream = new XThreadInputStream(bodyStream); + } + streamCreated = true; + } + catch (java.net.SocketTimeoutException e) + { + streamException = e; + } + catch (ConnectTimeoutException e) + { + streamException = e; + } + catch (InterruptedIOException e) + { + throw e; + } + catch (Throwable e) + { + streamException = e; + } + this.notifyAll(); + } + } + } + + if (responseException == null && streamException == null) + { + if (threadStream != null) + { + // Stuff the content until we are done + threadStream.stuffQueue(); + } + } + + } + finally + { + synchronized (this) + { + try + { + executeMethod.abort(); + } + catch (Throwable e) + { + shutdownException = e; + } + this.notifyAll(); + } + } + } + catch (Throwable e) + { + // We catch exceptions here that should ONLY be InterruptedExceptions, as a result of the thread being aborted. + this.generalException = e; + } + } + + public int getResponseCode() + throws InterruptedException, IOException, HttpException + { + // Must wait until the response object is there + while (true) + { + synchronized (this) + { + checkException(responseException); + if (response != null) + return response.getStatusLine().getStatusCode(); + wait(); + } + } + } + + public long getResponseContentLength() + throws InterruptedException, IOException, HttpException + { + String contentLength = getFirstHeader("Content-Length"); + if (contentLength == null || contentLength.length() == 0) + return -1L; + return new Long(contentLength.trim()).longValue(); + } + + public String getFirstHeader(String headerName) + throws InterruptedException, IOException, HttpException + { + // Must wait for the response object to appear + while (true) + { + synchronized (this) + { + checkException(responseException); + if (response != null) + { + Header h = response.getFirstHeader(headerName); + if (h == null) + return null; + return h.getValue(); + } + wait(); + } + } + } + + public InputStream getSafeInputStream() + throws InterruptedException, IOException, HttpException + { + // Must wait until stream is created, or until we note an exception was thrown. + while (true) + { + synchronized (this) + { + if (responseException != null) + throw new IllegalStateException("Check for response before getting stream"); + checkException(streamException); + if (streamCreated) + return threadStream; + wait(); + } + } + } + + public void abort() + { + // This will be called during the finally + // block in the case where all is well (and + // the stream completed) and in the case where + // there were exceptions. + synchronized (this) + { + if (streamCreated) + { + if (threadStream != null) + threadStream.abort(); + } + abortThread = true; + } + } + + public void finishUp() + throws InterruptedException + { + join(); + } + + protected synchronized void checkException(Throwable exception) + throws IOException, HttpException + { + if (exception != null) + { + // Throw the current exception, but clear it, so no further throwing is possible on the same problem. + Throwable e = exception; + if (e instanceof IOException) + throw (IOException)e; + else if (e instanceof HttpException) + throw (HttpException)e; + else if (e instanceof RuntimeException) + throw (RuntimeException)e; + else if (e instanceof Error) + throw (Error)e; + else + throw new RuntimeException("Unhandled exception of type: "+e.getClass().getName(),e); + } + } + + } + + }