activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject svn commit: r355591 - in /incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http: HttpClientTransport.java HttpTransport.java HttpTransportFactory.java HttpTransportServer.java HttpTunnelServlet.java
Date Fri, 09 Dec 2005 20:31:54 GMT
Author: gregw
Date: Fri Dec  9 12:31:51 2005
New Revision: 355591

URL: http://svn.apache.org/viewcvs?rev=355591&view=rev
Log:
Progress on HTTP transport: pass clientID via session or header & fixed lookup of clientID.

Modified:
    incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpClientTransport.java
    incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpTransport.java
    incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpTransportFactory.java
    incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpTransportServer.java
    incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpTunnelServlet.java

Modified: incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpClientTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpClientTransport.java?rev=355591&r1=355590&r2=355591&view=diff
==============================================================================
--- incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpClientTransport.java
(original)
+++ incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpClientTransport.java
Fri Dec  9 12:31:51 2005
@@ -18,6 +18,7 @@
 package org.activemq.transport.http;
 
 import org.activemq.command.Command;
+import org.activemq.command.ConnectionInfo;
 import org.activemq.command.Response;
 import org.activemq.transport.FutureResponse;
 import org.activemq.transport.util.TextWireFormat;
@@ -33,6 +34,7 @@
 
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.net.HttpURLConnection;
 import java.net.URI;
 
 /**
@@ -47,6 +49,8 @@
 
     private HttpClient sendHttpClient;
     private HttpClient receiveHttpClient;
+    private String clientID;
+    private String sessionID;
 
 
     public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) {
@@ -58,14 +62,19 @@
     }
 
     public void oneway(Command command) throws IOException {
+        if (command.getDataStructureType()==ConnectionInfo.DATA_STRUCTURE_TYPE)
+            clientID=((ConnectionInfo)command).getClientId();
+        
         PostMethod httpMethod = new PostMethod(getRemoteUrl().toString());
         configureMethod(httpMethod);
         httpMethod.setRequestBody(getTextWireFormat().toString(command));
         try {
-            int answer = getSendHttpClient().executeMethod(httpMethod);
+            HttpClient client = getSendHttpClient();
+            int answer = client.executeMethod(httpMethod);
             if (answer != HttpStatus.SC_OK) {
                 throw new IOException("Failed to post command: " + command + " as response
was: " + answer);
             }
+            checkSession(httpMethod);
         }
         catch (IOException e) {
             throw IOExceptionSupport.create("Could not post command: " + command + " due
to: " + e, e);
@@ -81,8 +90,10 @@
         HttpClient httpClient = getReceiveHttpClient();
         URI remoteUrl = getRemoteUrl();
         while (!isClosed()) {
+            
             GetMethod httpMethod = new GetMethod(remoteUrl.toString());
             configureMethod(httpMethod);
+     
             try {
                 int answer = httpClient.executeMethod(httpMethod);
                 if (answer != HttpStatus.SC_OK) {
@@ -94,6 +105,7 @@
                     }
                 }
                 else {
+                    checkSession(httpMethod);
                     Command command = getTextWireFormat().readCommand(new DataInputStream(httpMethod.getResponseBodyAsStream()));
                     if (command == null) {
                         log.warn("Received null command from url: " + remoteUrl);
@@ -145,11 +157,21 @@
     }
 
     protected void configureMethod(HttpMethod method) {
-        /** TODO
-        String clientID = getClientID();
-        if (clientID != null) {
+        if (sessionID!=null) {
+            method.addRequestHeader("Cookie", "JSESSIONID="+sessionID);
+        }
+        else if (clientID != null) {
             method.setRequestHeader("clientID", clientID);
         }
-        */
     }
+
+    protected void checkSession(HttpMethod client) {
+        String set_cookie=client.getRequestHeader("Set-Cookie").getValue();
+        
+        if (set_cookie!=null && set_cookie.startsWith("JSESSIONID=")) {
+            String[] bits=set_cookie.split("[=;]");
+            sessionID=bits[1];
+        }
+    }
+    
 }

Modified: incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpTransport.java?rev=355591&r1=355590&r2=355591&view=diff
==============================================================================
--- incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpTransport.java
(original)
+++ incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpTransport.java
Fri Dec  9 12:31:51 2005
@@ -18,6 +18,7 @@
 package org.activemq.transport.http;
 
 import org.activemq.command.Command;
+import org.activemq.command.ConnectionInfo;
 import org.activemq.transport.util.TextWireFormat;
 import org.activemq.util.Callback;
 import org.activemq.util.IOExceptionSupport;
@@ -42,6 +43,8 @@
     private HttpURLConnection sendConnection;
     private HttpURLConnection receiveConnection;
     private URL url;
+    private String clientID;
+    private String sessionID;
 
     public HttpTransport(TextWireFormat wireFormat, URI remoteUrl) throws MalformedURLException
{
         super(wireFormat, remoteUrl);
@@ -50,7 +53,9 @@
 
     public void oneway(Command command) throws IOException {
         try {
-
+            if (command.getDataStructureType()==ConnectionInfo.DATA_STRUCTURE_TYPE)
+                clientID=((ConnectionInfo)command).getClientId();
+            
             HttpURLConnection connection = getSendConnection();
             String text = getTextWireFormat().toString(command);
             Writer writer = new OutputStreamWriter(connection.getOutputStream());
@@ -60,6 +65,8 @@
             if (answer != HttpURLConnection.HTTP_OK) {
                 throw new IOException("Failed to post command: " + command + " as response
was: " + answer);
             }
+            checkSession(connection);
+            
         }
         catch (IOException e) {
             throw IOExceptionSupport.create("Could not post command: " + command + " due
to: " + e, e);
@@ -82,7 +89,9 @@
                     }
                 }
                 else {
+                    checkSession(connection);
                     Command command = getTextWireFormat().readCommand(new DataInputStream(connection.getInputStream()));
+                    
                     if (command == null) {
                         log.warn("Received null packet from url: " + remoteUrl);
                     }
@@ -101,6 +110,8 @@
             }
         }
     }
+    
+    
 
     // Implementation methods
     // -------------------------------------------------------------------------
@@ -123,12 +134,23 @@
         return conn;
     }
 
+    protected void checkSession(HttpURLConnection connection)
+    {
+        String set_cookie=connection.getHeaderField("Set-Cookie");
+        if (set_cookie!=null && set_cookie.startsWith("JSESSIONID="))
+        {
+            String[] bits=set_cookie.split("[=;]");
+            sessionID=bits[1];
+        }
+    }
+    
     protected void configureConnection(HttpURLConnection connection) {
-        /**
-         * TODO String clientID = getClientID(); if (clientID != null) {
-         * connection.setRequestProperty("clientID", clientID); //
-         * connection.addRequestProperty("clientID", clientID); }
-         */
+        if (sessionID !=null) {
+            connection.addRequestProperty("Cookie", "JSESSIONID="+sessionID);
+        }
+        else if (clientID != null) {
+            connection.setRequestProperty("clientID", clientID);
+        }
     }
 
     protected URL getRemoteURL() {

Modified: incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpTransportFactory.java?rev=355591&r1=355590&r2=355591&view=diff
==============================================================================
--- incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpTransportFactory.java
(original)
+++ incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpTransportFactory.java
Fri Dec  9 12:31:51 2005
@@ -18,6 +18,8 @@
 package org.activemq.transport.http;
 
 import org.activeio.command.WireFormat;
+import org.activemq.transport.MutexTransport;
+import org.activemq.transport.ResponseCorrelator;
 import org.activemq.transport.Transport;
 import org.activemq.transport.TransportFactory;
 import org.activemq.transport.TransportServer;
@@ -53,7 +55,10 @@
     }
 
     protected Transport createTransport(URI location, WireFormat wf) throws MalformedURLException
{
-        return new HttpTransport(asTextWireFormat(wf), location);
+        Transport transport = new HttpTransport(asTextWireFormat(wf), location);
+        transport = new MutexTransport(transport);
+        transport = new ResponseCorrelator(transport);
+        return transport;
     }
 
 }

Modified: incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpTransportServer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpTransportServer.java?rev=355591&r1=355590&r2=355591&view=diff
==============================================================================
--- incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpTransportServer.java
(original)
+++ incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpTransportServer.java
Fri Dec  9 12:31:51 2005
@@ -25,9 +25,11 @@
 import org.mortbay.jetty.Server;
 import org.mortbay.jetty.bio.SocketConnector;
 import org.mortbay.jetty.handler.ContextHandler;
+import org.mortbay.jetty.nio.SelectChannelConnector;
 import org.mortbay.jetty.servlet.ServletHandler;
 import org.mortbay.jetty.servlet.ServletHolder;
 import org.mortbay.jetty.servlet.ServletMapping;
+import org.mortbay.jetty.servlet.SessionHandler;
 
 import java.net.URI;
 
@@ -37,8 +39,8 @@
 public class HttpTransportServer extends TransportServerSupport {
     private URI bindAddress;
     private TextWireFormat wireFormat;
-    private Server server = new Server();
-    private Connector connector = new SocketConnector();
+    private Server server;
+    private Connector connector;
 
     public HttpTransportServer(URI uri) {
         super(uri);
@@ -46,31 +48,37 @@
     }
 
     public void start() throws Exception {
+        server = new Server();
+        if (connector==null)
+            connector = new SocketConnector();
         connector.setHost(bindAddress.getHost());
         connector.setPort(bindAddress.getPort());
         connector.setServer(server);
         server.setConnectors(new Connector[] { connector });
 
-        ContextHandler context = new ContextHandler();
-        context.setContextPath("/");
-        context.setServer(server);
-        server.setHandler(context);
-
-        ServletHandler handler = new ServletHandler();
-        context.setHandler(handler);
+        ContextHandler context_handler = new ContextHandler();
+        context_handler.setContextPath("/");
+        context_handler.setServer(server);
+        server.setHandler(context_handler);
+
+        SessionHandler session_handler = new SessionHandler();
+        context_handler.setHandler(session_handler);
+        
+        ServletHandler servlet_handler = new ServletHandler();
+        session_handler.setHandler(servlet_handler);
 
         ServletHolder holder = new ServletHolder();
         holder.setName("httpTunnel");
         holder.setClassName(HttpTunnelServlet.class.getName());
-        handler.setServlets(new ServletHolder[] { holder });
+        servlet_handler.setServlets(new ServletHolder[] { holder });
 
         ServletMapping mapping = new ServletMapping();
         mapping.setServletName("httpTunnel");
         mapping.setPathSpec("/*");
-        handler.setServletMappings(new ServletMapping[] { mapping });
+        servlet_handler.setServletMappings(new ServletMapping[] { mapping });
 
-        context.setAttribute("acceptListener", getAcceptListener());
-        context.setAttribute("wireFormat", getWireFormat());
+        context_handler.setAttribute("acceptListener", getAcceptListener());
+        context_handler.setAttribute("wireFormat", getWireFormat());
         server.start();
     }
 

Modified: incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpTunnelServlet.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpTunnelServlet.java?rev=355591&r1=355590&r2=355591&view=diff
==============================================================================
--- incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpTunnelServlet.java
(original)
+++ incubator/activemq/activemq-optional/src/main/java/org/activemq/transport/http/HttpTunnelServlet.java
Fri Dec  9 12:31:51 2005
@@ -76,17 +76,21 @@
         // lets return the next response
         Command packet = null;
         try {
+            System.err.println("\nrequest="+request);
             BlockingQueueTransport transportChannel = getTransportChannel(request);
             if (transportChannel == null) {
-                log("No transport available!");
+                log("No transport available! ");
                 return;
             }
             packet = (Command) transportChannel.getQueue().poll(requestTimeout, TimeUnit.MILLISECONDS);
+            System.err.println("packet="+packet);
         }
         catch (InterruptedException e) {
             // ignore
         }
         if (packet == null) {
+            // TODO temporary hack to prevent busy loop.  Replace with continuations
+            try{ Thread.sleep(250);}catch (InterruptedException e) { e.printStackTrace();
}
             response.setStatus(HttpServletResponse.SC_REQUEST_TIMEOUT);
         }
         else {
@@ -98,6 +102,7 @@
 
         // String body = readRequestBody(request);
         // Command command = wireFormat.readCommand(body);
+        
         Command command = wireFormat.readCommand(request.getReader());
 
         if (command instanceof ConnectionInfo) {
@@ -150,8 +155,9 @@
             clientID = (String) session.getAttribute("clientID");
         }
         if (clientID == null) {
-            request.getHeader("clientID");
+            clientID = request.getHeader("clientID");
         }
+        System.out.println("clientID="+clientID);
         /**
          * if (clientID == null) { clientID = request.getParameter("clientID"); }
          */



Mime
View raw message