Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 73366 invoked from network); 11 Jun 2009 02:18:20 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 11 Jun 2009 02:18:20 -0000 Received: (qmail 51617 invoked by uid 500); 11 Jun 2009 02:18:32 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 51565 invoked by uid 500); 11 Jun 2009 02:18:31 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 51556 invoked by uid 99); 11 Jun 2009 02:18:31 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Jun 2009 02:18:31 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Jun 2009 02:18:29 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 4BC6D23888D1; Thu, 11 Jun 2009 02:18:09 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r783607 [2/2] - in /activemq/sandbox/activemq-flow: activemq-all/src/test/java/org/apache/activemq/broker/openwire/ activemq-bio/src/main/java/org/apache/activemq/transport/tcp/ activemq-broker/src/main/java/org/apache/activemq/apollo/ acti... Date: Thu, 11 Jun 2009 02:18:08 -0000 To: commits@activemq.apache.org From: cmacnaug@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090611021809.4BC6D23888D1@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java?rev=783607&r1=783606&r2=783607&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java (original) +++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java Thu Jun 11 02:18:07 2009 @@ -33,83 +33,68 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -public class MultiWireFormatFactory implements WireFormatFactory{ - +public class MultiWireFormatFactory implements WireFormatFactory { + private static final Log LOG = LogFactory.getLog(MultiWireFormatFactory.class); - - private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/"); - private String wireFormats="openwire,stomp"; - private ArrayList wireFormatFactories; - - static public class WireFormatConnected { - final private DiscriminatableWireFormatFactory wireFormatFactory; - final private WireFormat wireFormat; - - public WireFormatConnected(DiscriminatableWireFormatFactory wireFormatFactory, WireFormat wireFormat) { - this.wireFormatFactory = wireFormatFactory; - this.wireFormat = wireFormat; - } + private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/"); - public DiscriminatableWireFormatFactory getWireFormatFactory() { - return wireFormatFactory; - } + private String wireFormats; + private ArrayList wireFormatFactories; - public WireFormat getWireFormat() { - return wireFormat; - } - } - static class MultiWireFormat implements WireFormat { - ArrayList wireFormatFactories = new ArrayList(); + public static final String WIREFORMAT_NAME = "multi"; + + ArrayList wireFormatFactories = new ArrayList(); WireFormat wireFormat; int maxHeaderLength; - + public int getVersion() { return 0; } + public boolean inReceive() { return wireFormat.inReceive(); } + public void setVersion(int version) { wireFormat.setVersion(version); } private ByteArrayOutputStream baos = new ByteArrayOutputStream(); private ByteArrayInputStream peeked; - + public Object unmarshal(DataInput in) throws IOException { - while( wireFormat == null ) { - - int readByte = ((InputStream)in).read(); - if( readByte < 0 ) { + while (wireFormat == null) { + + int readByte = ((InputStream) in).read(); + if (readByte < 0) { throw new EOFException(); } baos.write(readByte); - + // Try to discriminate what we have read so far. - for (DiscriminatableWireFormatFactory wff : wireFormatFactories) { - if( wff.matchesWireformatHeader(baos.toByteSequence()) ) { + for (WireFormatFactory wff : wireFormatFactories) { + if (wff.matchesWireformatHeader(baos.toByteSequence())) { wireFormat = wff.createWireFormat(); - peeked = new ByteArrayInputStream(baos.toByteSequence()); - return new WireFormatConnected(wff, wireFormat); + break; } } - - if( baos.size() >= maxHeaderLength ) { + + if (baos.size() >= maxHeaderLength) { throw new IOException("Could not discriminate the protocol."); } } - + // If we have some peeked data we need to feed that back.. Only happens // for the first few bytes of the protocol header. - if( peeked!=null ) { - in = new DataInputStream( new ConcatInputStream(peeked, (InputStream)in) ); + if (peeked != null) { + in = new DataInputStream(new ConcatInputStream(peeked, (InputStream) in)); Object rc = wireFormat.unmarshal(in); - if( peeked.available() <= 0 ) { - peeked=null; + if (peeked.available() <= 0) { + peeked = null; } return rc; } @@ -117,7 +102,6 @@ return wireFormat.unmarshal(in); } - public void marshal(Object command, DataOutput out) throws IOException { wireFormat.marshal(command, out); } @@ -125,34 +109,51 @@ public ByteSequence marshal(Object command) throws IOException { throw new UnsupportedOperationException(); } + public Object unmarshal(ByteSequence packet) throws IOException { throw new UnsupportedOperationException(); } - public ArrayList getWireFormatFactories() { + + public ArrayList getWireFormatFactories() { return wireFormatFactories; } - public void setWireFormatFactories(ArrayList wireFormatFactories) { + + private void setWireFormatFactories(ArrayList wireFormatFactories) { this.wireFormatFactories = wireFormatFactories; - maxHeaderLength=0; - for (DiscriminatableWireFormatFactory wff : wireFormatFactories) { - maxHeaderLength = Math.max( maxHeaderLength, wff.maxWireformatHeaderLength()); + maxHeaderLength = 0; + for (WireFormatFactory wff : wireFormatFactories) { + maxHeaderLength = Math.max(maxHeaderLength, wff.maxWireformatHeaderLength()); } } + public Transport createTransportFilters(Transport transport, Map options) { return transport; } + + public String getName() { + if (wireFormat == null) { + return WIREFORMAT_NAME; + } else { + return wireFormat.getName(); + } + } } - + public WireFormat createWireFormat() { MultiWireFormat rc = new MultiWireFormat(); - if( wireFormatFactories == null ) { - wireFormatFactories = new ArrayList(); + if (wireFormatFactories == null) { + wireFormatFactories = new ArrayList(); String[] formats = getWireFormats().split("\\,"); for (int i = 0; i < formats.length; i++) { try { - wireFormatFactories.add((DiscriminatableWireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(formats[i])); + WireFormatFactory wff = (WireFormatFactory) WIREFORMAT_FACTORY_FINDER.newInstance(formats[i]); + if (wff.isDiscriminatable()) { + wireFormatFactories.add(wff); + } else { + throw new Exception("Not Discriminitable"); + } } catch (Exception e) { - LOG.warn("Invalid wireformat '"+formats[i]+"': "+e.getMessage()); + LOG.warn("Invalid wireformat '" + formats[i] + "': " + e.getMessage()); } } } @@ -168,5 +169,36 @@ this.wireFormats = formats; } + /* + * (non-Javadoc) + * + * @see org.apache.activemq.wireformat.WireFormatFactory#isDiscriminatable() + */ + public boolean isDiscriminatable() { + return false; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.activemq.wireformat.WireFormatFactory#matchesWireformatHeader + * (org.apache.activemq.util.ByteSequence) + */ + public boolean matchesWireformatHeader(ByteSequence byteSequence) { + throw new UnsupportedOperationException(); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.activemq.wireformat.WireFormatFactory#maxWireformatHeaderLength + * () + */ + public int maxWireformatHeaderLength() { + throw new UnsupportedOperationException(); + } + } Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java?rev=783607&r1=783606&r2=783607&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java (original) +++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java Thu Jun 11 02:18:07 2009 @@ -39,6 +39,7 @@ */ public class ObjectStreamWireFormat implements WireFormat { + public static final String WIREFORMAT_NAME = "object"; public ByteSequence marshal(Object command) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream ds = new DataOutputStream(baos); @@ -52,7 +53,7 @@ } public void marshal(Object command, DataOutput ds) throws IOException { - ObjectOutputStream out = new ObjectOutputStream((OutputStream)ds); + ObjectOutputStream out = new ObjectOutputStream((OutputStream) ds); out.writeObject(command); out.flush(); out.reset(); @@ -60,13 +61,13 @@ public Object unmarshal(DataInput ds) throws IOException { try { - ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream((InputStream)ds); + ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream((InputStream) ds); Object command; command = in.readObject(); in.close(); return command; } catch (ClassNotFoundException e) { - throw (IOException)new IOException("unmarshal failed: " + e).initCause(e); + throw (IOException) new IOException("unmarshal failed: " + e).initCause(e); } } @@ -77,10 +78,14 @@ return 0; } - public boolean inReceive() { - // TODO implement the inactivity monitor - return false; - } + public String getName() { + return WIREFORMAT_NAME; + } + + public boolean inReceive() { + // TODO implement the inactivity monitor + return false; + } public Transport createTransportFilters(Transport transport, Map options) { return transport; Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java?rev=783607&r1=783606&r2=783607&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java (original) +++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormat.java Thu Jun 11 02:18:07 2009 @@ -65,6 +65,11 @@ int getVersion(); /** + * @return The name of the wireformat + */ + String getName(); + + /** * @return true if message is being received */ boolean inReceive(); Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java?rev=783607&r1=783606&r2=783607&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java (original) +++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/WireFormatFactory.java Thu Jun 11 02:18:07 2009 @@ -16,6 +16,36 @@ */ package org.apache.activemq.wireformat; +import org.apache.activemq.util.ByteSequence; + public interface WireFormatFactory { - WireFormat createWireFormat(); + + /** + * @return an instance of the wire format. + * + */ + WireFormat createWireFormat(); + + /** + * @return true if this wire format factory is isDiscriminatable. A discriminatable + * WireFormat's will first write a header to the stream + */ + boolean isDiscriminatable(); + + /** + * @return Returns the maximum length of the header used to discriminate the wire format if it + * {@link #isDiscriminatable()} + * @throws UnsupportedOperationException If {@link #isDiscriminatable()} is false + */ + int maxWireformatHeaderLength(); + + /** + * Called to test if this wireformat matches the provided header. + * + * @param byteSequence The byte sequence representing the herader data read so far. + * @return True if the ByteSequence matches the wire format header. + */ + boolean matchesWireformatHeader(ByteSequence byteSequence); + + }