(), factory);
- exec.allowCoreThreadTimeOut(true);
- return exec;
+ return configured;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java Fri Aug 12 20:29:29 2011
@@ -23,14 +23,12 @@ import org.apache.activemq.Service;
/**
* Represents the client side of a transport allowing messages to be sent
* synchronously, asynchronously and consumed.
- *
- *
*/
public interface Transport extends Service {
/**
* A one way asynchronous send
- *
+ *
* @param command
* @throws IOException
*/
@@ -40,7 +38,7 @@ public interface Transport extends Servi
* An asynchronous request response where the Receipt will be returned in
* the future. If responseCallback is not null, then it will be called when
* the response has been completed.
- *
+ *
* @param command
* @param responseCallback TODO
* @return the FutureResponse
@@ -50,7 +48,7 @@ public interface Transport extends Servi
/**
* A synchronous request response
- *
+ *
* @param command
* @return the response
* @throws IOException
@@ -59,7 +57,7 @@ public interface Transport extends Servi
/**
* A synchronous request response
- *
+ *
* @param command
* @param timeout
* @return the repsonse or null if timeout
@@ -67,53 +65,16 @@ public interface Transport extends Servi
*/
Object request(Object command, int timeout) throws IOException;
- // /**
- // * A one way asynchronous send
- // * @param command
- // * @throws IOException
- // */
- // void oneway(Command command) throws IOException;
- //
- // /**
- // * An asynchronous request response where the Receipt will be returned
- // * in the future. If responseCallback is not null, then it will be called
- // * when the response has been completed.
- // *
- // * @param command
- // * @param responseCallback TODO
- // * @return the FutureResponse
- // * @throws IOException
- // */
- // FutureResponse asyncRequest(Command command, ResponseCallback
- // responseCallback) throws IOException;
- //
- // /**
- // * A synchronous request response
- // * @param command
- // * @return the response
- // * @throws IOException
- // */
- // Response request(Command command) throws IOException;
- //
- // /**
- // * A synchronous request response
- // * @param command
- // * @param timeout
- // * @return the repsonse or null if timeout
- // * @throws IOException
- // */
- // Response request(Command command, int timeout) throws IOException;
-
/**
* Returns the current transport listener
- *
+ *
* @return
*/
TransportListener getTransportListener();
/**
* Registers an inbound command listener
- *
+ *
* @param commandListener
*/
void setTransportListener(TransportListener commandListener);
@@ -131,26 +92,26 @@ public interface Transport extends Servi
/**
* Indicates if the transport can handle faults
- *
+ *
* @return true if fault tolerant
*/
boolean isFaultTolerant();
-
+
/**
* @return true if the transport is disposed
*/
boolean isDisposed();
-
+
/**
* @return true if the transport is connected
*/
boolean isConnected();
-
+
/**
* @return true if reconnect is supported
*/
boolean isReconnectSupported();
-
+
/**
* @return true if updating uris is supported
*/
@@ -161,10 +122,10 @@ public interface Transport extends Servi
* @throws IOException on failure of if not supported
*/
void reconnect(URI uri) throws IOException;
-
+
/**
* Provide a list of available alternative locations
- * @param rebalance
+ * @param rebalance
* @param uris
* @throws IOException
*/
@@ -172,10 +133,10 @@ public interface Transport extends Servi
/**
* Returns a counter which gets incremented as data is read from the transport.
- * It should only be used to determine if there is progress being made in reading the next command from the transport.
- * The value may wrap into the negative numbers.
- *
+ * It should only be used to determine if there is progress being made in reading the next command from the transport.
+ * The value may wrap into the negative numbers.
+ *
* @return a counter which gets incremented as data is read from the transport.
*/
- int getReceiveCounter();
+ int getReceiveCounter();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java Fri Aug 12 20:29:29 2011
@@ -44,7 +44,7 @@ public abstract class TransportFactory {
private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout";
private static final String THREAD_NAME_FILTER = "threadName";
-
+
public abstract TransportServer doBind(URI location) throws IOException;
public Transport doConnect(URI location, Executor ex) throws Exception {
@@ -57,7 +57,7 @@ public abstract class TransportFactory {
/**
* Creates a normal transport.
- *
+ *
* @param location
* @return the transport
* @throws Exception
@@ -69,7 +69,7 @@ public abstract class TransportFactory {
/**
* Creates a normal transport.
- *
+ *
* @param location
* @param ex
* @return the transport
@@ -83,7 +83,7 @@ public abstract class TransportFactory {
/**
* Creates a slimmed down transport that is more efficient so that it can be
* used by composite transports like reliable and HA.
- *
+ *
* @param location
* @return the Transport
* @throws Exception
@@ -96,7 +96,7 @@ public abstract class TransportFactory {
/**
* Creates a slimmed down transport that is more efficient so that it can be
* used by composite transports like reliable and HA.
- *
+ *
* @param location
* @param ex
* @return the Transport
@@ -113,12 +113,12 @@ public abstract class TransportFactory {
}
/**
- * @deprecated
+ * @deprecated
*/
public static TransportServer bind(String brokerId, URI location) throws IOException {
return bind(location);
}
-
+
public static TransportServer bind(BrokerService brokerService, URI location) throws IOException {
TransportFactory tf = findTransportFactory(location);
if( brokerService!=null && tf instanceof BrokerServiceAware ) {
@@ -132,7 +132,7 @@ public abstract class TransportFactory {
} finally {
SslContext.setCurrentSslContext(null);
}
- }
+ }
public Transport doConnect(URI location) throws Exception {
try {
@@ -164,7 +164,7 @@ public abstract class TransportFactory {
throw IOExceptionSupport.create(e);
}
}
-
+
/**
* Allow registration of a transport factory without wiring via META-INF classes
* @param scheme
@@ -176,7 +176,7 @@ public abstract class TransportFactory {
/**
* Factory method to create a new transport
- *
+ *
* @throws IOException
* @throws UnknownHostException
*/
@@ -235,13 +235,14 @@ public abstract class TransportFactory {
/**
* Fully configures and adds all need transport filters so that the
* transport can be used by the JMS client.
- *
+ *
* @param transport
* @param wf
* @param options
* @return
* @throws Exception
*/
+ @SuppressWarnings("rawtypes")
public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
transport = compositeConfigure(transport, wf, options);
@@ -256,14 +257,15 @@ public abstract class TransportFactory {
* transport can be used by the ActiveMQ message broker. The main difference
* between this and the configure() method is that the broker does not issue
* requests to the client so the ResponseCorrelator is not needed.
- *
+ *
* @param transport
* @param format
* @param options
* @return
* @throws Exception
*/
- public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
+ @SuppressWarnings("rawtypes")
+ public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
if (options.containsKey(THREAD_NAME_FILTER)) {
transport = new ThreadNameFilter(transport);
}
@@ -276,12 +278,13 @@ public abstract class TransportFactory {
* Similar to configure(...) but this avoid adding in the MutexTransport and
* ResponseCorrelator transport layers so that the resulting transport can
* more efficiently be used as part of a composite transport.
- *
+ *
* @param transport
* @param format
* @param options
* @return
*/
+ @SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
if (options.containsKey(WRITE_TIMEOUT_FILTER)) {
transport = new WriteTimeoutFilter(transport);
@@ -294,6 +297,7 @@ public abstract class TransportFactory {
return transport;
}
+ @SuppressWarnings("rawtypes")
protected String getOption(Map options, String key, String def) {
String rc = (String) options.remove(key);
if( rc == null ) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java Fri Aug 12 20:29:29 2011
@@ -22,10 +22,7 @@ import java.net.Socket;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
import org.apache.activemq.transport.tcp.TimeStampStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,15 +36,15 @@ import org.slf4j.LoggerFactory;
* transport.soWriteTimeout=.
* For example (15 second timeout on write operations to the socket):
*
- * <transportConnector
- * name="tcp1"
+ * <transportConnector
+ * name="tcp1"
* uri="tcp://127.0.0.1:61616?transport.soTimeout=10000&transport.soWriteTimeout=15000"
* />
*
* For example (enable default timeout on the socket):
*
- * <transportConnector
- * name="tcp1"
+ * <transportConnector
+ * name="tcp1"
* uri="tcp://127.0.0.1:61616?transport.soTimeout=10000&transport.soWriteTimeout=15000"
* />
*
@@ -59,12 +56,12 @@ public class WriteTimeoutFilter extends
private static final Logger LOG = LoggerFactory.getLogger(WriteTimeoutFilter.class);
protected static ConcurrentLinkedQueue writers = new ConcurrentLinkedQueue();
protected static AtomicInteger messageCounter = new AtomicInteger(0);
- protected static TimeoutThread timeoutThread = new TimeoutThread();
-
+ protected static TimeoutThread timeoutThread = new TimeoutThread();
+
protected static long sleep = 5000l;
protected long writeTimeout = -1;
-
+
public WriteTimeoutFilter(Transport next) {
super(next);
}
@@ -80,7 +77,7 @@ public class WriteTimeoutFilter extends
deRegisterWrite(this,false,null);
}
}
-
+
public long getWriteTimeout() {
return writeTimeout;
}
@@ -88,7 +85,7 @@ public class WriteTimeoutFilter extends
public void setWriteTimeout(long writeTimeout) {
this.writeTimeout = writeTimeout;
}
-
+
public static long getSleep() {
return sleep;
}
@@ -97,21 +94,21 @@ public class WriteTimeoutFilter extends
WriteTimeoutFilter.sleep = sleep;
}
-
+
protected TimeStampStream getWriter() {
return next.narrow(TimeStampStream.class);
}
-
+
protected Socket getSocket() {
return next.narrow(Socket.class);
}
-
+
protected static void registerWrite(WriteTimeoutFilter filter) {
writers.add(filter);
}
-
+
protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail, IOException iox) {
- boolean result = writers.remove(filter);
+ boolean result = writers.remove(filter);
if (result) {
if (fail) {
String message = "Forced write timeout for:"+filter.getNext().getRemoteAddress();
@@ -129,17 +126,17 @@ public class WriteTimeoutFilter extends
}
return result;
}
-
+
@Override
public void start() throws Exception {
super.start();
}
-
+
@Override
public void stop() throws Exception {
super.stop();
}
-
+
protected static class TimeoutThread extends Thread {
static AtomicInteger instance = new AtomicInteger(0);
boolean run = true;
@@ -150,14 +147,14 @@ public class WriteTimeoutFilter extends
start();
}
-
+
public void run() {
while (run) {
- boolean error = false;
+ boolean error = false;
try {
- if (!interrupted()) {
- Iterator filters = writers.iterator();
- while (run && filters.hasNext()) {
+ if (!interrupted()) {
+ Iterator filters = writers.iterator();
+ while (run && filters.hasNext()) {
WriteTimeoutFilter filter = filters.next();
if (filter.getWriteTimeout()<=0) continue; //no timeout set
long writeStart = filter.getWriter().getWriteTimestamp();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java Fri Aug 12 20:29:29 2011
@@ -22,12 +22,9 @@ import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.Message;
/**
* Implementations of this interface are used to map back and forth from Stomp
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java Fri Aug 12 20:29:29 2011
@@ -34,8 +34,6 @@ import org.apache.activemq.command.Activ
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.util.JettisonMappedXmlDriver;
import org.codehaus.jettison.mapped.Configuration;
-import org.springframework.beans.BeansException;
-import org.springframework.context.ApplicationContext;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.HierarchicalStreamReader;
@@ -50,206 +48,208 @@ import com.thoughtworks.xstream.io.xml.X
* @author Dejan Bosanac
*/
public class JmsFrameTranslator extends LegacyFrameTranslator implements
- BrokerContextAware {
+ BrokerContextAware {
- XStream xStream = null;
- BrokerContext brokerContext;
+ XStream xStream = null;
+ BrokerContext brokerContext;
- public ActiveMQMessage convertFrame(ProtocolConverter converter,
- StompFrame command) throws JMSException, ProtocolException {
- Map headers = command.getHeaders();
- ActiveMQMessage msg;
- String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION);
- if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
- msg = super.convertFrame(converter, command);
- } else {
- HierarchicalStreamReader in;
-
- try {
- String text = new String(command.getContent(), "UTF-8");
- switch (Stomp.Transformations.getValue(transformation)) {
- case JMS_OBJECT_XML:
- in = new XppReader(new StringReader(text));
- msg = createObjectMessage(in);
- break;
- case JMS_OBJECT_JSON:
- in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
- msg = createObjectMessage(in);
- break;
- case JMS_MAP_XML:
- in = new XppReader(new StringReader(text));
- msg = createMapMessage(in);
- break;
- case JMS_MAP_JSON:
- in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
- msg = createMapMessage(in);
- break;
- default:
- throw new Exception("Unkown transformation: " + transformation);
- }
- } catch (Throwable e) {
- command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage());
- msg = super.convertFrame(converter, command);
- }
- }
- FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
- return msg;
- }
-
- public StompFrame convertMessage(ProtocolConverter converter,
- ActiveMQMessage message) throws IOException, JMSException {
- if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
- StompFrame command = new StompFrame();
- command.setAction(Stomp.Responses.MESSAGE);
- Map headers = new HashMap(25);
- command.setHeaders(headers);
+ public ActiveMQMessage convertFrame(ProtocolConverter converter,
+ StompFrame command) throws JMSException, ProtocolException {
+ Map headers = command.getHeaders();
+ ActiveMQMessage msg;
+ String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION);
+ if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
+ msg = super.convertFrame(converter, command);
+ } else {
+ HierarchicalStreamReader in;
+
+ try {
+ String text = new String(command.getContent(), "UTF-8");
+ switch (Stomp.Transformations.getValue(transformation)) {
+ case JMS_OBJECT_XML:
+ in = new XppReader(new StringReader(text));
+ msg = createObjectMessage(in);
+ break;
+ case JMS_OBJECT_JSON:
+ in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
+ msg = createObjectMessage(in);
+ break;
+ case JMS_MAP_XML:
+ in = new XppReader(new StringReader(text));
+ msg = createMapMessage(in);
+ break;
+ case JMS_MAP_JSON:
+ in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
+ msg = createMapMessage(in);
+ break;
+ default:
+ throw new Exception("Unkown transformation: " + transformation);
+ }
+ } catch (Throwable e) {
+ command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage());
+ msg = super.convertFrame(converter, command);
+ }
+ }
+ FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
+ return msg;
+ }
- FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
- converter, message, command, this);
+ public StompFrame convertMessage(ProtocolConverter converter,
+ ActiveMQMessage message) throws IOException, JMSException {
+ if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
+ StompFrame command = new StompFrame();
+ command.setAction(Stomp.Responses.MESSAGE);
+ Map headers = new HashMap(25);
+ command.setHeaders(headers);
+
+ FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
+ converter, message, command, this);
if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
- headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_XML.toString());
+ headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_XML.toString());
} else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
- headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_JSON.toString());
+ headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_JSON.toString());
}
ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
- command.setContent(marshall(msg.getObject(),
- headers.get(Stomp.Headers.TRANSFORMATION))
- .getBytes("UTF-8"));
- return command;
-
- } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
- StompFrame command = new StompFrame();
- command.setAction(Stomp.Responses.MESSAGE);
- Map headers = new HashMap(25);
- command.setHeaders(headers);
+ command.setContent(marshall(msg.getObject(),
+ headers.get(Stomp.Headers.TRANSFORMATION))
+ .getBytes("UTF-8"));
+ return command;
+
+ } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
+ StompFrame command = new StompFrame();
+ command.setAction(Stomp.Responses.MESSAGE);
+ Map headers = new HashMap(25);
+ command.setHeaders(headers);
- FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
- converter, message, command, this);
+ FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
+ converter, message, command, this);
if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
- headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_XML.toString());
+ headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_XML.toString());
} else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
- headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_JSON.toString());
+ headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_JSON.toString());
}
- ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
- command.setContent(marshall((Serializable)msg.getContentMap(),
- headers.get(Stomp.Headers.TRANSFORMATION))
- .getBytes("UTF-8"));
- return command;
+ ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
+ command.setContent(marshall((Serializable)msg.getContentMap(),
+ headers.get(Stomp.Headers.TRANSFORMATION))
+ .getBytes("UTF-8"));
+ return command;
} else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
- StompFrame command = new StompFrame();
- command.setAction(Stomp.Responses.MESSAGE);
- Map headers = new HashMap(25);
- command.setHeaders(headers);
+ StompFrame command = new StompFrame();
+ command.setAction(Stomp.Responses.MESSAGE);
+ Map headers = new HashMap(25);
+ command.setHeaders(headers);
FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
- converter, message, command, this);
+ converter, message, command, this);
if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
- headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_XML.toString());
+ headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_XML.toString());
} else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
- headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString());
+ headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString());
}
String body = marshallAdvisory(message.getDataStructure(),
- headers.get(Stomp.Headers.TRANSFORMATION));
+ headers.get(Stomp.Headers.TRANSFORMATION));
command.setContent(body.getBytes("UTF-8"));
return command;
- } else {
- return super.convertMessage(converter, message);
- }
- }
-
- /**
- * Marshalls the Object to a string using XML or JSON encoding
- */
- protected String marshall(Serializable object, String transformation)
- throws JMSException {
- StringWriter buffer = new StringWriter();
- HierarchicalStreamWriter out;
- if (transformation.toLowerCase().endsWith("json")) {
- out = new JettisonMappedXmlDriver(new Configuration(), false).createWriter(buffer);
- } else {
- out = new PrettyPrintWriter(buffer);
- }
- getXStream().marshal(object, out);
- return buffer.toString();
- }
-
- protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException {
- ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
- Object obj = getXStream().unmarshal(in);
- objMsg.setObject((Serializable) obj);
- return objMsg;
- }
-
- protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
- ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
- Map map = (Map)getXStream().unmarshal(in);
- for (String key : map.keySet()) {
- mapMsg.setObject(key, map.get(key));
- }
- return mapMsg;
- }
+ } else {
+ return super.convertMessage(converter, message);
+ }
+ }
+
+ /**
+ * Marshalls the Object to a string using XML or JSON encoding
+ */
+ protected String marshall(Serializable object, String transformation)
+ throws JMSException {
+ StringWriter buffer = new StringWriter();
+ HierarchicalStreamWriter out;
+ if (transformation.toLowerCase().endsWith("json")) {
+ out = new JettisonMappedXmlDriver(new Configuration(), false).createWriter(buffer);
+ } else {
+ out = new PrettyPrintWriter(buffer);
+ }
+ getXStream().marshal(object, out);
+ return buffer.toString();
+ }
+
+ protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException {
+ ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
+ Object obj = getXStream().unmarshal(in);
+ objMsg.setObject((Serializable) obj);
+ return objMsg;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
+ ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
+ Map map = (Map)getXStream().unmarshal(in);
+ for (String key : map.keySet()) {
+ mapMsg.setObject(key, map.get(key));
+ }
+ return mapMsg;
+ }
protected String marshallAdvisory(final DataStructure ds, String transformation) {
- StringWriter buffer = new StringWriter();
- HierarchicalStreamWriter out;
- if (transformation.toLowerCase().endsWith("json")) {
- out = new JettisonMappedXmlDriver().createWriter(buffer);
- } else {
- out = new PrettyPrintWriter(buffer);
- }
+ StringWriter buffer = new StringWriter();
+ HierarchicalStreamWriter out;
+ if (transformation.toLowerCase().endsWith("json")) {
+ out = new JettisonMappedXmlDriver().createWriter(buffer);
+ } else {
+ out = new PrettyPrintWriter(buffer);
+ }
- XStream xstream = getXStream();
+ XStream xstream = getXStream();
xstream.setMode(XStream.NO_REFERENCES);
xstream.aliasPackage("", "org.apache.activemq.command");
- xstream.marshal(ds, out);
- return buffer.toString();
+ xstream.marshal(ds, out);
+ return buffer.toString();
+ }
+
+ // Properties
+ // -------------------------------------------------------------------------
+ public XStream getXStream() {
+ if (xStream == null) {
+ xStream = createXStream();
+ }
+ return xStream;
+ }
+
+ public void setXStream(XStream xStream) {
+ this.xStream = xStream;
}
- // Properties
- // -------------------------------------------------------------------------
- public XStream getXStream() {
- if (xStream == null) {
- xStream = createXStream();
- }
- return xStream;
- }
-
- public void setXStream(XStream xStream) {
- this.xStream = xStream;
- }
-
- // Implementation methods
- // -------------------------------------------------------------------------
- protected XStream createXStream() {
- XStream xstream = null;
- if (brokerContext != null) {
- Map beans = brokerContext.getBeansOfType(XStream.class);
- for (XStream bean : beans.values()) {
- if (bean != null) {
- xstream = bean;
- break;
- }
- }
- }
-
- if (xstream == null) {
- xstream = new XStream();
- }
- return xstream;
-
- }
-
- public void setBrokerContext(BrokerContext brokerContext) {
- this.brokerContext = brokerContext;
- }
+ // Implementation methods
+ // -------------------------------------------------------------------------
+ @SuppressWarnings("unchecked")
+ protected XStream createXStream() {
+ XStream xstream = null;
+ if (brokerContext != null) {
+ Map beans = brokerContext.getBeansOfType(XStream.class);
+ for (XStream bean : beans.values()) {
+ if (bean != null) {
+ xstream = bean;
+ break;
+ }
+ }
+ }
+
+ if (xstream == null) {
+ xstream = new XStream();
+ }
+ return xstream;
+
+ }
+
+ public void setBrokerContext(BrokerContext brokerContext) {
+ this.brokerContext = brokerContext;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java Fri Aug 12 20:29:29 2011
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.stomp;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -23,10 +24,17 @@ import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver;
-import org.apache.activemq.advisory.AdvisorySupport;
-import org.apache.activemq.command.*;
/**
* Implements ActiveMQ 4.0 translations
@@ -35,7 +43,7 @@ public class LegacyFrameTranslator imple
public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException {
- final Map headers = command.getHeaders();
+ final Map, ?> headers = command.getHeaders();
final ActiveMQMessage msg;
/*
* To reduce the complexity of this method perhaps a Chain of Responsibility
@@ -46,7 +54,12 @@ public class LegacyFrameTranslator imple
if(intendedType.equalsIgnoreCase("text")){
ActiveMQTextMessage text = new ActiveMQTextMessage();
try {
- text.setText(new String(command.getContent(), "UTF-8"));
+ //text.setText(new String(command.getContent(), "UTF-8"));
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4);
+ DataOutputStream data = new DataOutputStream(bytes);
+ data.writeInt(command.getContent().length);
+ data.write(command.getContent());
+ text.setContent(bytes.toByteSequence());
} catch (Throwable e) {
throw new ProtocolException("Text could not bet set: " + e, false, e);
}
@@ -66,7 +79,12 @@ public class LegacyFrameTranslator imple
} else {
ActiveMQTextMessage text = new ActiveMQTextMessage();
try {
- text.setText(new String(command.getContent(), "UTF-8"));
+ //text.setText(new String(command.getContent(), "UTF-8"));
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4);
+ DataOutputStream data = new DataOutputStream(bytes);
+ data.writeInt(command.getContent().length);
+ data.write(command.getContent());
+ text.setContent(bytes.toByteSequence());
} catch (Throwable e) {
throw new ProtocolException("Text could not bet set: " + e, false, e);
}
@@ -86,8 +104,17 @@ public class LegacyFrameTranslator imple
if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
- ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
- command.setContent(msg.getText().getBytes("UTF-8"));
+ if (!message.isCompressed() && message.getContent() != null) {
+ ByteSequence msgContent = message.getContent();
+ if (msgContent.getLength() > 4) {
+ byte[] content = new byte[msgContent.getLength() - 4];
+ System.arraycopy(msgContent.data, 4, content, 0, content.length);
+ command.setContent(content);
+ }
+ } else {
+ ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
+ command.setContent(msg.getText().getBytes("UTF-8"));
+ }
} else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
@@ -96,13 +123,13 @@ public class LegacyFrameTranslator imple
byte[] data = new byte[(int)msg.getBodyLength()];
msg.readBytes(data);
- headers.put(Stomp.Headers.CONTENT_LENGTH, "" + data.length);
+ headers.put(Stomp.Headers.CONTENT_LENGTH, Integer.toString(data.length));
command.setContent(data);
} else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
- converter, message, command, this);
+ converter, message, command, this);
String body = marshallAdvisory(message.getDataStructure());
command.setContent(body.getBytes("UTF-8"));
@@ -119,10 +146,10 @@ public class LegacyFrameTranslator imple
String rc = converter.getCreatedTempDestinationName(activeMQDestination);
if( rc!=null ) {
- return rc;
+ return rc;
}
- StringBuffer buffer = new StringBuffer();
+ StringBuilder buffer = new StringBuilder();
if (activeMQDestination.isQueue()) {
if (activeMQDestination.isTemporary()) {
buffer.append("/remote-temp-queue/");