Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 011DC9FF for ; Wed, 27 Apr 2011 17:34:58 +0000 (UTC) Received: (qmail 48806 invoked by uid 500); 27 Apr 2011 17:34:57 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 48771 invoked by uid 500); 27 Apr 2011 17:34:57 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 48764 invoked by uid 99); 27 Apr 2011 17:34:57 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Apr 2011 17:34:57 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Apr 2011 17:34:53 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 82FD72388CB3; Wed, 27 Apr 2011 17:33:19 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1097189 [41/42] - in /activemq/activemq-apollo/trunk: ./ apollo-openwire/ apollo-openwire/src/ apollo-openwire/src/main/ apollo-openwire/src/main/resources/ apollo-openwire/src/main/resources/META-INF/ apollo-openwire/src/main/resources/ME... Date: Wed, 27 Apr 2011 17:33:09 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110427173319.82FD72388CB3@eris.apache.org> Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/package.html URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/package.html?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/package.html (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/package.html Wed Apr 27 17:32:51 2011 @@ -0,0 +1,25 @@ + + + + + + +Support for JMS Advisory messages as well as some helper listeners to listen to the clients, producers and consumers available. + + + Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/package.html ------------------------------------------------------------------------------ svn:executable = * Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobTransferPolicy.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobTransferPolicy.java?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobTransferPolicy.java (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobTransferPolicy.java Wed Apr 27 17:32:51 2011 @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.openwire.support.blob; + +/** + * The policy for configuring how BLOBs (Binary Large OBjects) are transferred + * out of band between producers, brokers and consumers. + * + * @version $Revision: $ + */ +public class BlobTransferPolicy { + private String defaultUploadUrl = "http://localhost:8080/uploads/"; + private String brokerUploadUrl; + private String uploadUrl; + private int bufferSize = 128 * 1024; + private org.apache.activemq.apollo.openwire.support.blob.BlobUploadStrategy uploadStrategy; + + /** + * Returns a copy of this policy object + */ + public BlobTransferPolicy copy() { + BlobTransferPolicy that = new BlobTransferPolicy(); + that.defaultUploadUrl = this.defaultUploadUrl; + that.brokerUploadUrl = this.brokerUploadUrl; + that.uploadUrl = this.uploadUrl; + that.uploadStrategy = this.uploadStrategy; + return that; + } + + public String getUploadUrl() { + if (uploadUrl == null) { + uploadUrl = getBrokerUploadUrl(); + if (uploadUrl == null) { + uploadUrl = getDefaultUploadUrl(); + } + } + return uploadUrl; + } + + /** + * Sets the upload URL to use explicitly on the client which will + * overload the default or the broker's URL. This allows the client to decide + * where to upload files to irrespective of the brokers configuration. + */ + public void setUploadUrl(String uploadUrl) { + this.uploadUrl = uploadUrl; + } + + public String getBrokerUploadUrl() { + return brokerUploadUrl; + } + + /** + * Called by the JMS client when a broker advertises its upload URL + */ + public void setBrokerUploadUrl(String brokerUploadUrl) { + this.brokerUploadUrl = brokerUploadUrl; + } + + public String getDefaultUploadUrl() { + return defaultUploadUrl; + } + + /** + * Sets the default upload URL to use if the broker does not + * have a configured upload URL + */ + public void setDefaultUploadUrl(String defaultUploadUrl) { + this.defaultUploadUrl = defaultUploadUrl; + } + + public org.apache.activemq.apollo.openwire.support.blob.BlobUploadStrategy getUploadStrategy() { + if (uploadStrategy == null) { + uploadStrategy = createUploadStrategy(); + } + return uploadStrategy; + } + + /** + * Sets the upload strategy to use for uploading BLOBs to some URL + */ + public void setUploadStrategy(org.apache.activemq.apollo.openwire.support.blob.BlobUploadStrategy uploadStrategy) { + this.uploadStrategy = uploadStrategy; + } + + public int getBufferSize() { + return bufferSize; + } + + /** + * Sets the default buffer size used when uploading or downloading files + */ + public void setBufferSize(int bufferSize) { + this.bufferSize = bufferSize; + } + + protected org.apache.activemq.apollo.openwire.support.blob.BlobUploadStrategy createUploadStrategy() { + return new org.apache.activemq.apollo.openwire.support.blob.DefaultBlobUploadStrategy(this); + } +} Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploadStrategy.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploadStrategy.java?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploadStrategy.java (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploadStrategy.java Wed Apr 27 17:32:51 2011 @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.openwire.support.blob; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; + +import org.apache.activemq.apollo.openwire.command.ActiveMQBlobMessage; +import org.apache.activemq.apollo.openwire.support.OpenwireException; + +/** + * Represents a strategy of uploading a file/stream to some remote + * + * @version $Revision: $ + */ +public interface BlobUploadStrategy { + + URL uploadFile(ActiveMQBlobMessage message, File file) throws OpenwireException, IOException; + + URL uploadStream(ActiveMQBlobMessage message, InputStream in) throws OpenwireException, IOException; +} Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploader.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploader.java?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploader.java (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/BlobUploader.java Wed Apr 27 17:32:51 2011 @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.openwire.support.blob; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; + +import org.apache.activemq.apollo.openwire.command.ActiveMQBlobMessage; +import org.apache.activemq.apollo.openwire.support.OpenwireException; + +/** + * A helper class to represent a required upload of a BLOB to some remote URL + * + * @version $Revision: $ + */ +public class BlobUploader { + + private BlobTransferPolicy blobTransferPolicy; + private File file; + private InputStream in; + + public BlobUploader(BlobTransferPolicy blobTransferPolicy, InputStream in) { + this.blobTransferPolicy = blobTransferPolicy; + this.in = in; + } + + public BlobUploader(BlobTransferPolicy blobTransferPolicy, File file) { + this.blobTransferPolicy = blobTransferPolicy; + this.file = file; + } + + public URL upload(ActiveMQBlobMessage message) throws OpenwireException, IOException { + if (file != null) { + return getStrategy().uploadFile(message, file); + } else { + return getStrategy().uploadStream(message, in); + } + } + + public BlobTransferPolicy getBlobTransferPolicy() { + return blobTransferPolicy; + } + + public BlobUploadStrategy getStrategy() { + return getBlobTransferPolicy().getUploadStrategy(); + } +} Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/DefaultBlobUploadStrategy.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/DefaultBlobUploadStrategy.java?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/DefaultBlobUploadStrategy.java (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/DefaultBlobUploadStrategy.java Wed Apr 27 17:32:51 2011 @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.openwire.support.blob; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URL; + +import org.apache.activemq.apollo.openwire.command.ActiveMQBlobMessage; +import org.apache.activemq.apollo.openwire.support.OpenwireException; + +/** + * A default implementation of {@link BlobUploadStrategy} which uses the URL + * class to upload files or streams to a remote URL + */ +public class DefaultBlobUploadStrategy implements BlobUploadStrategy { + private BlobTransferPolicy transferPolicy; + + public DefaultBlobUploadStrategy(BlobTransferPolicy transferPolicy) { + this.transferPolicy = transferPolicy; + } + + public URL uploadFile(ActiveMQBlobMessage message, File file) throws OpenwireException, IOException { + return uploadStream(message, new FileInputStream(file)); + } + + public URL uploadStream(ActiveMQBlobMessage message, InputStream fis) throws OpenwireException, IOException { + URL url = createUploadURL(message); + + HttpURLConnection connection = (HttpURLConnection)url.openConnection(); + connection.setRequestMethod("PUT"); + connection.setDoOutput(true); + + // use chunked mode or otherwise URLConnection loads everything into + // memory + // (chunked mode not supported before JRE 1.5) + connection.setChunkedStreamingMode(transferPolicy.getBufferSize()); + + OutputStream os = connection.getOutputStream(); + + byte[] buf = new byte[transferPolicy.getBufferSize()]; + for (int c = fis.read(buf); c != -1; c = fis.read(buf)) { + os.write(buf, 0, c); + os.flush(); + } + os.close(); + fis.close(); + + if (!isSuccessfulCode(connection.getResponseCode())) { + throw new IOException("PUT was not successful: " + connection.getResponseCode() + " " + + connection.getResponseMessage()); + } + + return url; + } + + public void deleteFile(ActiveMQBlobMessage message) throws IOException, OpenwireException { + URL url = createUploadURL(message); + + HttpURLConnection connection = (HttpURLConnection)url.openConnection(); + connection.setRequestMethod("DELETE"); + connection.connect(); + connection.disconnect(); + + if (!isSuccessfulCode(connection.getResponseCode())) { + throw new IOException("DELETE was not successful: " + connection.getResponseCode() + " " + + connection.getResponseMessage()); + } + } + + private boolean isSuccessfulCode(int responseCode) { + return responseCode >= 200 && responseCode < 300; // 2xx => successful + } + + protected URL createUploadURL(ActiveMQBlobMessage message) throws OpenwireException, MalformedURLException { + return new URL(transferPolicy.getUploadUrl() + message.getMessageId().toString()); + } +} Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/package.html URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/package.html?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/package.html (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/blob/package.html Wed Apr 27 17:32:51 2011 @@ -0,0 +1,25 @@ + + + + + + +Helper classes for dealing with out-of-band BLOB objects + + + Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenWireMessageDelivery.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenWireMessageDelivery.java?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenWireMessageDelivery.java (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenWireMessageDelivery.java Wed Apr 27 17:32:51 2011 @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.openwire.support.broker.openwire; + +import java.io.IOException; + +import org.apache.activemq.apollo.dto.DestinationDTO; +import org.apache.activemq.apollo.openwire.command.Message; +import org.apache.activemq.apollo.openwire.codec.OpenWireFormat; +import org.fusesource.hawtbuf.AsciiBuffer; +import org.fusesource.hawtbuf.Buffer; + +public class OpenWireMessageDelivery { + + static final private AsciiBuffer ENCODING = new AsciiBuffer("openwire"); + + private final Message message; + private AsciiBuffer producerId; + private OpenWireFormat storeWireFormat; + private PersistListener persistListener = null; + private final int size; + + public interface PersistListener { + public void onMessagePersisted(OpenWireMessageDelivery delivery); + } + + public OpenWireMessageDelivery(Message message) { + this.message = message; + this.size = message.getSize(); + } + + public void setPersistListener(PersistListener listener) { + persistListener = listener; + } + + public DestinationDTO[] getDestination() { + return message.getDestination().toDestination(); + } + + public int getMemorySize() { + return size; + } + + public int getPriority() { + return message.getPriority(); + } + + public AsciiBuffer getMsgId() { + return new AsciiBuffer(message.getMessageId().toString()); + } + + public AsciiBuffer getProducerId() { + if (producerId == null) { + producerId = new AsciiBuffer(message.getProducerId().toString()); + } + return producerId; + } + + public Message getMessage() { + return message; + } + + public T asType(Class type) { + if (type == Message.class) { + return type.cast(message); + } + // TODO: is this right? + if (message.getClass().isAssignableFrom(type)) { + return type.cast(message); + } + return null; + } + + public boolean isPersistent() { + return message.isPersistent(); + } + + public final void onMessagePersisted() { + if (persistListener != null) { + persistListener.onMessagePersisted(this); + persistListener = null; + } + } + + public final boolean isResponseRequired() { + return message.isResponseRequired(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.activemq.broker.MessageDelivery#getTTE() + */ + public long getExpiration() { + return message.getExpiration(); + } + +// public MessageEvaluationContext createMessageEvaluationContext() { +// return new OpenwireMessageEvaluationContext(message); +// } + + public String toString() { + return message.getMessageId().toString(); + } + + public AsciiBuffer getStoreEncoding() { + return ENCODING; + } + + public Buffer getStoreEncoded() { + Buffer bytes; + try { + bytes = storeWireFormat.marshal(message); + } catch (IOException e) { + return null; + } + return bytes; + } + + + public void setStoreWireFormat(OpenWireFormat wireFormat) { + this.storeWireFormat = wireFormat; + } + +} Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireMessageEvaluationContext.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireMessageEvaluationContext.java?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireMessageEvaluationContext.java (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireMessageEvaluationContext.java Wed Apr 27 17:32:51 2011 @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.apollo.openwire.support.broker.openwire; + +public class OpenwireMessageEvaluationContext { +// +// private Message message; +// +// public OpenwireMessageEvaluationContext() { +// } +// public OpenwireMessageEvaluationContext(Message message) { +// this.message = message; +// } +// +// public Message getMessage() { +// return message; +// } +// +// public void setMessage(Message message) { +// this.message = message; +// } +// +// +// public Expression getPropertyExpression(final String name) { +// Expression expression = JMS_PROPERTY_EXPRESSIONS.get(name); +// if( expression == null ) { +// expression = new Expression() { +// public Object evaluate(MessageEvaluationContext mc) throws FilterException { +// try { +// Message message = ((OpenwireMessageEvaluationContext) mc).message; +// return message.getProperty(name); +// } catch (IOException e) { +// throw new FilterException(e); +// } +// } +// }; +// } +// return expression; +// } +// +// public T getBodyAs(Class type) throws FilterException { +// try { +// if( type == String.class ) { +// if ( message instanceof ActiveMQTextMessage ) { +// return type.cast(((ActiveMQTextMessage)message).getText()); +// } +// } +// if( type == Buffer.class ) { +// if ( message instanceof ActiveMQBytesMessage ) { +// ActiveMQBytesMessage bm = ((ActiveMQBytesMessage)message); +// byte data[] = new byte[(int) bm.getBodyLength()]; +// bm.readBytes(data); +// return type.cast(new Buffer(data)); +// } +// } +// return null; +// } catch (JMSException e) { +// throw new FilterException(e); +// } +// } +// +// public T getDestination() { +// return (T) destination; +// } +// public Object getLocalConnectionId() { +// throw new UnsupportedOperationException(); +// } +// public void setDestination(Object destination) { +// this.destination = destination; +// } + +} Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireProtocolHandler.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireProtocolHandler.java?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireProtocolHandler.java (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireProtocolHandler.java Wed Apr 27 17:32:51 2011 @@ -0,0 +1,1057 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.openwire.support.broker.openwire; + +//import java.io.IOException; +//import java.util.ArrayList; +//import java.util.HashMap; +//import java.util.LinkedList; +//import java.util.concurrent.ConcurrentHashMap; +// +//import javax.jms.JMSException; +//import javax.transaction.xa.XAException; +//import javax.transaction.xa.Xid; +// +//import org.apache.activemq.apollo.WindowLimiter; +//import org.apache.activemq.apollo.broker.Broker; +//import org.apache.activemq.apollo.broker.BrokerConnection; +//import org.apache.activemq.apollo.broker.BrokerMessageDelivery; +//import org.apache.activemq.apollo.broker.BrokerSubscription; +//import org.apache.activemq.apollo.broker.Destination; +//import org.apache.activemq.apollo.broker.MessageDelivery; +//import org.apache.activemq.apollo.broker.ProtocolHandler; +//import org.apache.activemq.apollo.broker.Router; +//import org.apache.activemq.apollo.broker.Transaction; +//import org.apache.activemq.apollo.broker.VirtualHost; +//import org.apache.activemq.apollo.broker.XidImpl; +//import org.apache.activemq.apollo.broker.Transaction.TransactionListener; +//import org.apache.activemq.broker.openwire.OpenWireMessageDelivery.PersistListener; +//import org.apache.activemq.broker.store.Store.MessageRecord; +//import org.apache.activemq.command.ActiveMQDestination; +//import org.apache.activemq.command.BrokerId; +//import org.apache.activemq.command.BrokerInfo; +//import org.apache.activemq.command.Command; +//import org.apache.activemq.command.ConnectionControl; +//import org.apache.activemq.command.ConnectionError; +//import org.apache.activemq.command.ConnectionId; +//import org.apache.activemq.command.ConnectionInfo; +//import org.apache.activemq.command.ConsumerControl; +//import org.apache.activemq.command.ConsumerId; +//import org.apache.activemq.command.ConsumerInfo; +//import org.apache.activemq.command.ControlCommand; +//import org.apache.activemq.command.DestinationInfo; +//import org.apache.activemq.command.ExceptionResponse; +//import org.apache.activemq.command.FlushCommand; +//import org.apache.activemq.command.KeepAliveInfo; +//import org.apache.activemq.command.Message; +//import org.apache.activemq.command.MessageAck; +//import org.apache.activemq.command.MessageDispatch; +//import org.apache.activemq.command.MessageDispatchNotification; +//import org.apache.activemq.command.MessageId; +//import org.apache.activemq.command.MessagePull; +//import org.apache.activemq.command.ProducerAck; +//import org.apache.activemq.command.ProducerId; +//import org.apache.activemq.command.ProducerInfo; +//import org.apache.activemq.command.RemoveInfo; +//import org.apache.activemq.command.RemoveSubscriptionInfo; +//import org.apache.activemq.command.Response; +//import org.apache.activemq.command.SessionId; +//import org.apache.activemq.command.SessionInfo; +//import org.apache.activemq.command.ShutdownInfo; +//import org.apache.activemq.command.TransactionId; +//import org.apache.activemq.command.TransactionInfo; +//import org.apache.activemq.command.WireFormatInfo; +//import org.apache.activemq.dispatch.DispatchPriority; +//import org.apache.activemq.filter.BooleanExpression; +//import org.apache.activemq.apollo.filter.FilterException; +//import org.apache.activemq.apollo.filter.LogicExpression; +//import org.apache.activemq.apollo.filter.NoLocalExpression; +//import org.apache.activemq.flow.Flow; +//import org.apache.activemq.flow.FlowController; +//import org.apache.activemq.flow.IFlowController; +//import org.apache.activemq.flow.IFlowLimiter; +//import org.apache.activemq.flow.IFlowResource; +//import org.apache.activemq.flow.ISourceController; +//import org.apache.activemq.flow.SizeLimiter; +//import org.apache.activemq.flow.ISinkController.FlowControllable; +//import org.apache.activemq.openwire.OpenWireFormat; +//import org.apache.activemq.selector.SelectorParser; +//import org.apache.activemq.state.CommandVisitor; +//import org.apache.activemq.transport.WireFormatNegotiator; +//import org.fusesource.hawtbuf.Buffer; +//import org.apache.activemq.wireformat.WireFormat; +//import org.fusesource.hawtdispatch.Dispatch; + +public class OpenwireProtocolHandler { // implements ProtocolHandler, PersistListener { + +// protected final HashMap connections = new HashMap(); +// protected final HashMap sessions = new HashMap(); +// protected final HashMap producers = new HashMap(); +// protected final HashMap consumers = new HashMap(); +// +// protected final ConcurrentHashMap transactions = new ConcurrentHashMap(); +// +// protected BrokerConnection connection; +// private OpenWireFormat wireFormat; +// private OpenWireFormat storeWireFormat; +// private Router router; +// private VirtualHost host; +// private final CommandVisitor visitor; +// +// ArrayList temporaryDestinations = new ArrayList(); +// +// public OpenwireProtocolHandler() { +// setStoreWireFormat(new OpenWireFormat()); +// +// visitor = new CommandVisitor() { +// +// // ///////////////////////////////////////////////////////////////// +// // Methods that keep track of the client state +// // ///////////////////////////////////////////////////////////////// +// public Response processAddConnection(final ConnectionInfo info) throws Exception { +// if (!connections.containsKey(info.getConnectionId())) { +// +// ClientContext connection = new AbstractClientContext(info.getConnectionId().toString(), null) { +// ConnectionInfo connectionInfo = info; +// +// public void close() { +// super.close(); +// connections.remove(connectionInfo.getConnectionId()); +// } +// }; +// connections.put(info.getConnectionId(), connection); +// +// // Connections have an implicitly created "default" session identified by session id = -1 +// SessionId sessionId = new SessionId(info.getConnectionId(), -1); +// addSession(sessionId, connection); +// } +// return ack(info); +// } +// +// public Response processAddSession(final SessionInfo info) throws Exception { +// final SessionId sessionId = info.getSessionId(); +// ClientContext connection = connections.get(sessionId.getParentId()); +// if (connection == null) { +// throw new IllegalStateException(host.getHostName() + " Cannot add a session to a connection that had not been registered: " + sessionId.getParentId()); +// } +// +// if (!sessions.containsKey(sessionId)) { +// addSession(sessionId, connection); +// } +// +// return ack(info); +// } +// +// private void addSession(final SessionId sessionId, ClientContext connection) { +// ClientContext session = new AbstractClientContext(sessionId.toString(), connection) { +// public void close() { +// super.close(); +// sessions.remove(sessionId); +// } +// }; +// sessions.put(sessionId, session); +// } +// +// public Response processAddProducer(ProducerInfo info) throws Exception { +// ClientContext session = sessions.get(info.getProducerId().getParentId()); +// if (session == null) { +// throw new IllegalStateException(host.getHostName() + " Cannot add a producer to a session that had not been registered: " + info.getProducerId().getParentId()); +// } +// if (!producers.containsKey(info.getProducerId())) { +// ProducerContext producer = new ProducerContext(info, session); +// } +// return ack(info); +// } +// +// public Response processAddConsumer(ConsumerInfo info) throws Exception { +// ClientContext session = sessions.get(info.getConsumerId().getParentId()); +// if (session == null) { +// throw new IllegalStateException(host.getHostName() + " Cannot add a consumer to a session that had not been registered: " + info.getConsumerId().getParentId()); +// } +// +// if (!consumers.containsKey(info.getConsumerId())) { +// ConsumerContext ctx = new ConsumerContext(info, session); +// ctx.start(); +// } +// +// return ack(info); +// } +// +// public Response processRemoveConnection(RemoveInfo remove, ConnectionId info, long arg1) throws Exception { +// ClientContext cc = connections.get(info); +// if (cc != null) { +// cc.close(); +// } +// ack(remove); +// return null; +// } +// +// public Response processRemoveSession(RemoveInfo remove, SessionId info, long arg1) throws Exception { +// ClientContext cc = sessions.get(info); +// if (cc != null) { +// cc.close(); +// } +// ack(remove); +// return null; +// } +// +// public Response processRemoveProducer(RemoveInfo remove, ProducerId info) throws Exception { +// ClientContext cc = producers.get(info); +// if (cc != null) { +// cc.close(); +// } +// ack(remove); +// return null; +// } +// +// public Response processRemoveConsumer(RemoveInfo remove, ConsumerId info, long arg1) throws Exception { +// ClientContext cc = consumers.get(info); +// if (cc != null) { +// cc.close(); +// } +// ack(remove); +// return null; +// } +// +// // ///////////////////////////////////////////////////////////////// +// // Message Processing Methods. +// // ///////////////////////////////////////////////////////////////// +// public Response processMessage(Message info) throws Exception { +// if (info.getOriginalDestination() == null) { +// info.setOriginalDestination(info.getDestination()); +// } +// +// ProducerId producerId = info.getProducerId(); +// ProducerContext producerContext = producers.get(producerId); +// +// OpenWireMessageDelivery md = new OpenWireMessageDelivery(info); +// md.setStoreWireFormat(storeWireFormat); +// TransactionId tid = info.getTransactionId(); +// if (tid != null) { +// Transaction t = locateTransaction(tid, true); +// md.setTransactionId(t.getTid()); +// } else { +// md.setPersistListener(OpenwireProtocolHandler.this); +// } +// +// // Only producers that are not using a window will block, +// // and if it blocks. +// // yes we block the connection's read thread. yes other +// // sessions will not get +// // serviced while we block here. The producer is depending +// // on TCP flow +// // control to slow him down so we have to stop ready from +// // the socket at this +// // point. +// while (!producerContext.controller.offer(md, null)) { +// producerContext.controller.waitForFlowUnblock(); +// } +// +// if (tid != null) { +// return ack(info); +// } else { +// return null; +// } +// } +// +// public Response processMessageAck(MessageAck info) throws Exception { +// ConsumerContext ctx = consumers.get(info.getConsumerId()); +// ctx.ack(info); +// return ack(info); +// } +// +// // Only used when client prefetch is set to zero. +// public Response processMessagePull(MessagePull info) throws Exception { +// return ack(info); +// } +// +// // ///////////////////////////////////////////////////////////////// +// // Control Methods +// // ///////////////////////////////////////////////////////////////// +// public Response processWireFormat(WireFormatInfo info) throws Exception { +// +// // Negotiate the openwire encoding options. +// WireFormatNegotiator wfn = new WireFormatNegotiator(connection.getTransport(), wireFormat, 1); +// wfn.sendWireFormat(); +// wfn.negociate(info); +// +// // Now that the encoding is negotiated.. let the client know +// // the details about this +// // broker. +// BrokerInfo brokerInfo = new BrokerInfo(); +// Broker broker = connection.getBroker(); +// brokerInfo.setBrokerId(new BrokerId(broker.getName())); +// brokerInfo.setBrokerName(broker.getName()); +// if (!broker.getConnectUris().isEmpty()) { +// brokerInfo.setBrokerURL(broker.getConnectUris().get(0)); +// } +// connection.write(brokerInfo); +// return ack(info); +// } +// +// public Response processShutdown(ShutdownInfo info) throws Exception { +// connection.setStopping(); +// return ack(info); +// } +// +// public Response processKeepAlive(KeepAliveInfo info) throws Exception { +// if (info.isResponseRequired()) { +// info.setResponseRequired(false); +// connection.write(info); +// } +// return null; +// } +// +// public Response processFlush(FlushCommand info) throws Exception { +// return ack(info); +// } +// +// public Response processConnectionControl(ConnectionControl info) throws Exception { +// if (info != null) { +// if (info.isFaultTolerant()) { +// throw new UnsupportedOperationException("Fault Tolerance"); +// } +// } +// return ack(info); +// } +// +// public Response processConnectionError(ConnectionError info) throws Exception { +// return ack(info); +// } +// +// public Response processConsumerControl(ConsumerControl info) throws Exception { +// return ack(info); +// } +// +// // ///////////////////////////////////////////////////////////////// +// // Methods for server management +// // ///////////////////////////////////////////////////////////////// +// public Response processAddDestination(DestinationInfo info) throws Exception { +// ActiveMQDestination destination = info.getDestination(); +// if (destination.isTemporary()) { +// // Keep track of it so that we can remove them this connection +// // shuts down. +// temporaryDestinations.add(destination); +// } +// host.createQueue(destination); +// return ack(info); +// } +// +// public Response processRemoveDestination(DestinationInfo info) throws Exception { +// throw new UnsupportedOperationException(); +// } +// +// public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception { +// throw new UnsupportedOperationException(); +// } +// +// public Response processControlCommand(ControlCommand info) throws Exception { +// throw new UnsupportedOperationException(); +// } +// +// // ///////////////////////////////////////////////////////////////// +// // Methods for transaction management +// // ///////////////////////////////////////////////////////////////// +// public Response processBeginTransaction(TransactionInfo info) throws Exception { +// TransactionId tid = info.getTransactionId(); +// +// Transaction t = locateTransaction(tid, false); +// if (t == null) { +// +// Buffer xid = null; +// if (tid.isXATransaction()) { +// xid = XidImpl.toBuffer((Xid) tid); +// } +// t = host.getTransactionManager().createTransaction(xid); +// transactions.put(tid, t); +// } +// +// return ack(info); +// } +// +// public Response processCommitTransactionOnePhase(final TransactionInfo info) throws Exception { +// final TransactionId tid = info.getTransactionId(); +// Transaction t = locateTransaction(tid, true); +// +// TransactionListener listener = null; +// if (info.isResponseRequired()) { +// listener = new TransactionListener() { +// +// @Override +// public void onCommit(Transaction t) { +// transactions.remove(tid); +// ack(info); +// } +// +// @Override +// public void onRollback(Transaction t) { +// transactions.remove(tid); +// ExceptionResponse r = new ExceptionResponse(new XAException("RolledBack")); +// r.setCorrelationId(info.getCommandId()); +// connection.write(r); +// } +// +// }; +// } +// +// t.commit(true, listener); +// transactions.remove(tid); +// return null; +// } +// +// public Response processCommitTransactionTwoPhase(final TransactionInfo info) throws Exception { +// final TransactionId tid = info.getTransactionId(); +// Transaction t = locateTransaction(tid, true); +// +// TransactionListener listener = null; +// if (info.isResponseRequired()) { +// listener = new TransactionListener() { +// +// @Override +// public void onCommit(Transaction t) { +// transactions.remove(tid); +// ack(info); +// } +// +// @Override +// public void onRollback(Transaction t) { +// transactions.remove(tid); +// ExceptionResponse r = new ExceptionResponse(new XAException("RolledBack")); +// r.setCorrelationId(info.getCommandId()); +// connection.write(r); +// } +// +// }; +// } +// +// t.commit(false, listener); +// return null; +// } +// +// public Response processEndTransaction(TransactionInfo info) throws Exception { +// //Shouldn't actually do anything, send by client to ensure that it is +// //in sync with broker transaction state. +// //TODO need to investigate whether this should wait for prior transaction +// //state to flush out? +// new UnsupportedOperationException().printStackTrace(); +// return ack(info); +// } +// +// public Response processForgetTransaction(TransactionInfo info) throws Exception { +// return processRollbackTransaction(info); +// } +// +// public Response processPrepareTransaction(final TransactionInfo info) throws Exception { +// final TransactionId tid = info.getTransactionId(); +// Transaction t = locateTransaction(tid, true); +// +// TransactionListener listener = null; +// if (info.isResponseRequired()) { +// listener = new TransactionListener() { +// +// @Override +// public void onPrepared(Transaction t) { +// ack(info); +// } +// }; +// } +// t.prepare(listener); +// return null; +// } +// +// public Response processRecoverTransactions(TransactionInfo info) throws Exception { +// //TODO +// throw new UnsupportedOperationException(); +// } +// +// public Response processRollbackTransaction(final TransactionInfo info) throws Exception { +// final TransactionId tid = info.getTransactionId(); +// Transaction t = locateTransaction(tid, true); +// +// TransactionListener listener = null; +// if (info.isResponseRequired()) { +// listener = new TransactionListener() { +// +// @Override +// public void onRollback(Transaction t) { +// ack(info); +// } +// }; +// } +// t.rollback(listener); +// transactions.remove(tid); +// return null; +// } +// +// // ///////////////////////////////////////////////////////////////// +// // Methods for cluster operations +// // These commands are sent to the broker when it's acting like a +// // client to another broker. +// // ///////////////////////////////////////////////////////////////// +// public Response processBrokerInfo(BrokerInfo info) throws Exception { +// throw new UnsupportedOperationException(); +// } +// +// public Response processMessageDispatch(MessageDispatch info) throws Exception { +// throw new UnsupportedOperationException(); +// } +// +// public Response processMessageDispatchNotification(MessageDispatchNotification info) throws Exception { +// throw new UnsupportedOperationException(); +// } +// +// public Response processProducerAck(ProducerAck info) throws Exception { +// return ack(info); +// } +// }; +// } +// +// private Transaction locateTransaction(TransactionId tid, boolean expected) throws XAException, JMSException { +// Transaction t; +// +// if (tid.isLocalTransaction()) { +// t = transactions.get(tid); +// } else { +// t = host.getTransactionManager().getXATransaction(XidImpl.toBuffer((Xid) tid)); +// } +// +// if (t == null && expected) { +// if (tid.isXATransaction()) { +// XAException e = new XAException("Transaction '" + tid + "' has not been started."); +// e.errorCode = XAException.XAER_NOTA; +// throw e; +// } else { +// throw new JMSException("Transaction '" + tid + "' has not been started."); +// } +// } +// return t; +// } +// +// public void start() throws Exception { +// +// } +// +// public void stop() throws Exception { +// } +// +// public void onCommand(Object o) { +// boolean responseRequired = false; +// int commandId = 0; +// try { +// Command command = (Command) o; +// commandId = command.getCommandId(); +// responseRequired = command.isResponseRequired(); +// //System.out.println(o); +// command.visit(visitor); +// } catch (Exception e) { +// if (responseRequired) { +// ExceptionResponse response = new ExceptionResponse(e); +// response.setCorrelationId(commandId); +// connection.write(response); +// } else { +// connection.onException(e); +// } +// } catch (Throwable t) { +// if (responseRequired) { +// ExceptionResponse response = new ExceptionResponse(t); +// response.setCorrelationId(commandId); +// connection.write(response); +// } else { +// connection.onException(new RuntimeException(t)); +// } +// } +// } +// +// public void onException(Exception error) { +// if (!connection.isStopping()) { +// error.printStackTrace(); +// new Thread() { +// @Override +// public void run() { +// try { +// connection.stop(); +// } catch (Exception ignore) { +// } +// } +// }.start(); +// } +// } +// +// public void onMessagePersisted(OpenWireMessageDelivery delivery) { +// // TODO This method should not block: +// // Either add to output queue, or spin off in a separate thread. +// ack(delivery.getMessage()); +// } +// +// Response ack(Command command) { +// if (command.isResponseRequired()) { +// Response rc = new Response(); +// rc.setCorrelationId(command.getCommandId()); +// connection.write(rc); +// } +// return null; +// } +// +// // ///////////////////////////////////////////////////////////////// +// // Internal Support Methods +// // ///////////////////////////////////////////////////////////////// +// +// class ProducerContext extends AbstractClientContext { +// +// protected final Object inboundMutex = new Object(); +// private IFlowController controller; +// private final ProducerInfo info; +// +// public ProducerContext(final ProducerInfo info, ClientContext parent) { +// super(info.getProducerId().toString(), parent); +// this.info = info; +// producers.put(info.getProducerId(), this); +// final Flow flow = new Flow("broker-" + super.getResourceName() + "-inbound", false); +// +// // Openwire only uses credit windows at the producer level for +// // producers that request the feature. +// IFlowLimiter limiter; +// if (info.getWindowSize() > 0) { +// limiter = new WindowLimiter(false, flow, info.getWindowSize(), info.getWindowSize() / 2) { +// @Override +// protected void sendCredit(int credit) { +// ProducerAck ack = new ProducerAck(info.getProducerId(), credit); +// connection.write(ack); +// } +// }; +// } else { +// +// limiter = new SizeLimiter(1024*64, 1024*32); +// } +// +// controller = new FlowController(new FlowControllable() { +// public void flowElemAccepted(ISourceController controller, OpenWireMessageDelivery msg) { +// router.route(msg, controller, true); +// controller.elementDispatched(msg); +// } +// +// public IFlowResource getFlowResource() { +// return ProducerContext.this; +// } +// }, flow, limiter, inboundMutex); +// +// super.onFlowOpened(controller); +// } +// +// public void close() { +// super.close(); +// producers.remove(info); +// } +// } +// +// class ConsumerContext extends AbstractClientContext implements ProtocolHandler.ConsumerContext { +// +// private final ConsumerInfo info; +// private String name; +// private BooleanExpression selector; +// private boolean isDurable; +// private boolean isQueueReceiver; +// +// private final FlowController controller; +// private final WindowLimiter limiter; +// +// private HashMap> pendingMessages = new HashMap>(); +// private LinkedList pendingMessageIds = new LinkedList(); +// private BrokerSubscription brokerSubscription; +// private int borrowedLimterCredits; +// +// public ConsumerContext(final ConsumerInfo info, ClientContext parent) throws Exception { +// super(info.getConsumerId().toString(), parent); +// this.info = info; +// this.name = info.getConsumerId().toString(); +// consumers.put(info.getConsumerId(), this); +// +// Flow flow = new Flow("broker-" + name + "-outbound", false); +// selector = parseSelector(info); +// limiter = new WindowLimiter(true, flow, info.getPrefetchSize(), info.getPrefetchSize() / 2) { +// @Override +// public int getElementSize(MessageDelivery m) { +// return 1; +// } +// }; +// +// isQueueReceiver = info.getDestination().isQueue(); +// if (info.getSubscriptionName() != null) { +// isDurable = true; +// } +// controller = new FlowController(null, flow, limiter, this); +// controller.useOverFlowQueue(false); +// controller.setExecutor(Dispatch.getGlobalQueue()); +// super.onFlowOpened(controller); +// } +// +// public void start() throws Exception { +// brokerSubscription = host.createSubscription(this); +// brokerSubscription.connect(this); +// } +// +// public boolean offer(final MessageDelivery message, ISourceController source, SubscriptionDelivery callback) { +// if (!controller.offer(message, source)) { +// return false; +// } else { +// sendInternal(message, controller, callback); +// return true; +// } +// } +// +// public void add(final MessageDelivery message, ISourceController source, SubscriptionDelivery callback) { +// controller.add(message, source); +// sendInternal(message, controller, callback); +// } +// +// private void sendInternal(final MessageDelivery message, ISourceController controller, SubscriptionDelivery callback) { +// Message msg = message.asType(Message.class); +// MessageDispatch md = new MessageDispatch(); +// md.setConsumerId(info.getConsumerId()); +// md.setMessage(msg); +// md.setDestination(msg.getDestination()); +// // Add to the pending list if persistent and we are durable: +// if (callback != null) { +// if (callback.isRedelivery()) { +// md.setRedeliveryCounter(1); +// } +// synchronized (this) { +// Object old = pendingMessages.put(msg.getMessageId(), callback); +// if (old != null) { +// new Exception("Duplicate message id: " + msg.getMessageId()).printStackTrace(); +// } +// pendingMessageIds.add(msg.getMessageId()); +// connection.write(md); +// } +// } else { +// connection.write(md); +// } +// } +// +// public void ack(MessageAck info) throws XAException, JMSException { +// // TODO: The pending message queue could probably be optimized to +// // avoid having to create a new list here. +// int flowCredit = info.getMessageCount(); +// if (info.isDeliveredAck()) { +// // This ack is just trying to expand the flow control window size without actually +// // acking the message. Keep track of how many limiter credits we borrow since they need +// // to get paid back with real acks later. +// borrowedLimterCredits += flowCredit; +// limiter.onProtocolCredit(flowCredit); +// } else if (info.isStandardAck()) { +// TransactionId tid = info.getTransactionId(); +// Transaction transaction = null; +// if (tid != null) { +// transaction = locateTransaction(tid, true); +// } +// +// LinkedList> acked = new LinkedList>(); +// synchronized (this) { +// MessageId id = info.getLastMessageId(); +// if (isDurable() || isQueueReceiver()) { +// while (!pendingMessageIds.isEmpty()) { +// MessageId pendingId = pendingMessageIds.getFirst(); +// SubscriptionDelivery callback = pendingMessages.remove(pendingId); +// acked.add(callback); +// pendingMessageIds.removeFirst(); +// if (pendingId.equals(id)) { +// break; +// } +// } +// } +// +// // Did we have DeliveredAcks previously sent? Then the +// // the flow window has already been credited. We need to +// // pay back the borrowed limiter credits before giving +// // credits directly to the limiter. +// if (borrowedLimterCredits > 0) { +// if (flowCredit > borrowedLimterCredits) { +// flowCredit -= borrowedLimterCredits; +// borrowedLimterCredits = 0; +// } else { +// borrowedLimterCredits -= flowCredit; +// flowCredit = 0; +// } +// } +// limiter.onProtocolCredit(flowCredit); +// } +// +// if (transaction == null) { +// // Delete outside of synchronization on queue to avoid contention +// // with enqueueing threads. +// for (SubscriptionDelivery callback : acked) { +// callback.acknowledge(); +// } +// } else { +// // Delete outside of synchronization on queue to avoid contention +// // with enqueueing threads. +// for (SubscriptionDelivery callback : acked) { +// transaction.addAck(callback); +// } +// } +// } +// } +// +// public boolean hasSelector() { +// return selector != null; +// } +// +// public boolean matches(MessageDelivery message) { +// Message msg = message.asType(Message.class); +// if (msg == null) { +// return false; +// } +// +// OpenwireMessageEvaluationContext selectorContext = new OpenwireMessageEvaluationContext(msg); +// selectorContext.setDestination(info.getDestination()); +// try { +// return (selector == null || selector.matches(selectorContext)); +// } catch (FilterException e) { +// e.printStackTrace(); +// return false; +// } +// } +// +// public boolean isDurable() { +// return info.isDurable(); +// } +// +// public boolean isQueueReceiver() { +// return isQueueReceiver; +// } +// +// public boolean isExclusive() { +// return info.isExclusive(); +// } +// +// /* +// * (non-Javadoc) +// * +// * @see org.apache.activemq.queue.Subscription#isBrowser() +// */ +// public boolean isBrowser() { +// return info.isBrowser(); +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.queue.Subscription#isRemoveOnDispatch(java.lang +// * .Object) +// */ +// public boolean isRemoveOnDispatch(MessageDelivery elem) { +// if (isQueueReceiver()) { +// return false; +// } +// return !elem.isPersistent() || !isDurable; +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext +// * #getDestination() +// */ +// public Destination getDestination() { +// return info.getDestination(); +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext +// * #getJMSSelector() +// */ +// public String getSelectorString() { +// return info.getSelector(); +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext +// * #getSubscriptionName() +// */ +// public String getSubscriptionName() { +// return info.getSubscriptionName(); +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext +// * #getFullSelector() +// */ +// public BooleanExpression getSelectorExpression() { +// return selector; +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext +// * #getJMSSelector() +// */ +// public String getSelector() { +// return info.getSelector(); +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext +// * #getConnection() +// */ +// public BrokerConnection getConnection() { +// return connection; +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext +// * #getConsumerId() +// */ +// public String getConsumerId() { +// return name; +// } +// +// /* +// * (non-Javadoc) +// * +// * @see org.apache.activemq.flow.IFlowSink#add(java.lang.Object, +// * org.apache.activemq.flow.ISourceController) +// */ +// public void add(MessageDelivery message, ISourceController source) { +// add(message, source, null); +// } +// +// /* +// * (non-Javadoc) +// * +// * @see org.apache.activemq.flow.IFlowSink#offer(java.lang.Object, +// * org.apache.activemq.flow.ISourceController) +// */ +// public boolean offer(MessageDelivery message, ISourceController source) { +// return offer(message, source, null); +// } +// +// public boolean autoCreateDestination() { +// return true; +// } +// +// public String toString() { +// return info.getConsumerId().toString(); +// } +// +// public void close() { +// brokerSubscription.disconnect(this); +// +// if (isDurable() || isQueueReceiver()) { +// LinkedList> unacquired = null; +// +// synchronized (this) { +// +// unacquired = new LinkedList>(); +// while (!pendingMessageIds.isEmpty()) { +// MessageId pendingId = pendingMessageIds.getLast(); +// SubscriptionDelivery callback = pendingMessages.remove(pendingId); +// unacquired.add(callback); +// pendingMessageIds.removeLast(); +// } +// limiter.onProtocolCredit(unacquired.size()); +// } +// +// if (unacquired != null) { +// // Delete outside of synchronization on queue to avoid contention +// // with enqueueing threads. +// for (SubscriptionDelivery callback : unacquired) { +// callback.unacquire(controller); +// } +// } +// } +// +// super.close(); +// consumers.remove(info.getConsumerId()); +// } +// +// public boolean isPersistent() { +// return true; +// } +// } +// +// private static BooleanExpression parseSelector(ConsumerInfo info) throws FilterException { +// BooleanExpression rc = null; +// if (info.getSelector() != null) { +// rc = SelectorParser.parse(info.getSelector()); +// } +// if (info.isNoLocal()) { +// if (rc == null) { +// rc = new NoLocalExpression(info.getConsumerId().getConnectionId()); +// } else { +// rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc); +// } +// } +// if (info.getAdditionalPredicate() != null) { +// if (rc == null) { +// rc = info.getAdditionalPredicate(); +// } else { +// rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc); +// } +// } +// return rc; +// } +// +// public BrokerConnection getConnection() { +// return connection; +// } +// +// public void setConnection(BrokerConnection connection) { +// this.connection = connection; +// this.host = connection.getBroker().getDefaultVirtualHost(); +// this.router = host.getRouter(); +// } +// +// public void setWireFormat(WireFormat wireFormat) { +// this.wireFormat = (OpenWireFormat) wireFormat; +// setStoreWireFormat(this.wireFormat.copy()); +// } +// +// private void setStoreWireFormat(OpenWireFormat wireFormat) { +// this.storeWireFormat = wireFormat; +// storeWireFormat.setVersion(OpenWireFormat.DEFAULT_VERSION); +// storeWireFormat.setCacheEnabled(false); +// storeWireFormat.setTightEncodingEnabled(false); +// storeWireFormat.setSizePrefixDisabled(false); +// } +// +// public BrokerMessageDelivery createMessageDelivery(MessageRecord record) throws IOException { +// Buffer buf = record.getBuffer(); +// Message message = (Message) storeWireFormat.unmarshal(new Buffer(buf.data, buf.offset, buf.length)); +// OpenWireMessageDelivery delivery = new OpenWireMessageDelivery(message); +// delivery.setStoreWireFormat(storeWireFormat); +// return delivery; +// } +} Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/Destination.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/Destination.java?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/Destination.java (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/Destination.java Wed Apr 27 17:32:51 2011 @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.openwire.support.broker.region; + + +/** + * @version $Revision: 1.12 $ + */ +public interface Destination { + + int getMinimumMessageSize(); +} Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/Destination.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/MessageReference.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/MessageReference.java?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/MessageReference.java (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/MessageReference.java Wed Apr 27 17:32:51 2011 @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.openwire.support.broker.region; + +import java.io.IOException; + +import org.apache.activemq.apollo.openwire.command.ConsumerId; +import org.apache.activemq.apollo.openwire.command.Message; +import org.apache.activemq.apollo.openwire.command.MessageId; + +/** + * Keeps track of a message that is flowing through the Broker. This + * object may hold a hard reference to the message or only hold the + * id of the message if the message has been persisted on in a MessageStore. + * + * @version $Revision: 1.15 $ + */ +public interface MessageReference { + + MessageId getMessageId(); + Message getMessageHardRef(); + Message getMessage() throws IOException; + boolean isPersistent(); + + int getRedeliveryCounter(); + void incrementRedeliveryCounter(); + + int getReferenceCount(); + + int incrementReferenceCount(); + int decrementReferenceCount(); + ConsumerId getTargetConsumerId(); + int getSize(); + long getExpiration(); + String getGroupID(); + int getGroupSequence(); + + /** + * Returns true if this message is expired + */ + boolean isExpired(); + + /** + * Returns true if this message is dropped. + */ + boolean isDropped(); + + /** + * @return true if the message is an advisory + */ + boolean isAdvisory(); + +} Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/MessageReference.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitor.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitor.java?rev=1097189&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitor.java (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitor.java Wed Apr 27 17:32:51 2011 @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.openwire.support.state; + +import org.apache.activemq.apollo.openwire.command.BrokerInfo; +import org.apache.activemq.apollo.openwire.command.ConnectionControl; +import org.apache.activemq.apollo.openwire.command.ConnectionError; +import org.apache.activemq.apollo.openwire.command.ConnectionId; +import org.apache.activemq.apollo.openwire.command.ConnectionInfo; +import org.apache.activemq.apollo.openwire.command.ConsumerControl; +import org.apache.activemq.apollo.openwire.command.ConsumerId; +import org.apache.activemq.apollo.openwire.command.ConsumerInfo; +import org.apache.activemq.apollo.openwire.command.ControlCommand; +import org.apache.activemq.apollo.openwire.command.DestinationInfo; +import org.apache.activemq.apollo.openwire.command.FlushCommand; +import org.apache.activemq.apollo.openwire.command.KeepAliveInfo; +import org.apache.activemq.apollo.openwire.command.Message; +import org.apache.activemq.apollo.openwire.command.MessageAck; +import org.apache.activemq.apollo.openwire.command.MessageDispatch; +import org.apache.activemq.apollo.openwire.command.MessageDispatchNotification; +import org.apache.activemq.apollo.openwire.command.MessagePull; +import org.apache.activemq.apollo.openwire.command.ProducerAck; +import org.apache.activemq.apollo.openwire.command.ProducerId; +import org.apache.activemq.apollo.openwire.command.ProducerInfo; +import org.apache.activemq.apollo.openwire.command.RemoveInfo; +import org.apache.activemq.apollo.openwire.command.RemoveSubscriptionInfo; +import org.apache.activemq.apollo.openwire.command.Response; +import org.apache.activemq.apollo.openwire.command.SessionId; +import org.apache.activemq.apollo.openwire.command.SessionInfo; +import org.apache.activemq.apollo.openwire.command.ShutdownInfo; +import org.apache.activemq.apollo.openwire.command.TransactionInfo; +import org.apache.activemq.apollo.openwire.command.WireFormatInfo; + +public interface CommandVisitor { + + Response processAddConnection(ConnectionInfo info) throws Exception; + + Response processAddSession(SessionInfo info) throws Exception; + + Response processAddProducer(ProducerInfo info) throws Exception; + + Response processAddConsumer(ConsumerInfo info) throws Exception; + + Response processRemoveConnection(RemoveInfo info, ConnectionId id, long lastDeliveredSequenceId) throws Exception; + + Response processRemoveSession(RemoveInfo info, SessionId id, long lastDeliveredSequenceId) throws Exception; + + Response processRemoveProducer(RemoveInfo info,ProducerId id) throws Exception; + + Response processRemoveConsumer(RemoveInfo info,ConsumerId id, long lastDeliveredSequenceId) throws Exception; + + Response processAddDestination(DestinationInfo info) throws Exception; + + Response processRemoveDestination(DestinationInfo info) throws Exception; + + Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception; + + Response processMessage(Message send) throws Exception; + + Response processMessageAck(MessageAck ack) throws Exception; + + Response processMessagePull(MessagePull pull) throws Exception; + + Response processBeginTransaction(TransactionInfo info) throws Exception; + + Response processPrepareTransaction(TransactionInfo info) throws Exception; + + Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception; + + Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception; + + Response processRollbackTransaction(TransactionInfo info) throws Exception; + + Response processWireFormat(WireFormatInfo info) throws Exception; + + Response processKeepAlive(KeepAliveInfo info) throws Exception; + + Response processShutdown(ShutdownInfo info) throws Exception; + + Response processFlush(FlushCommand command) throws Exception; + + Response processBrokerInfo(BrokerInfo info) throws Exception; + + Response processRecoverTransactions(TransactionInfo info) throws Exception; + + Response processForgetTransaction(TransactionInfo info) throws Exception; + + Response processEndTransaction(TransactionInfo info) throws Exception; + + Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception; + + Response processProducerAck(ProducerAck ack) throws Exception; + + Response processMessageDispatch(MessageDispatch dispatch) throws Exception; + + Response processControlCommand(ControlCommand command) throws Exception; + + Response processConnectionError(ConnectionError error) throws Exception; + + Response processConnectionControl(ConnectionControl control) throws Exception; + + Response processConsumerControl(ConsumerControl control) throws Exception; + +} Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitor.java ------------------------------------------------------------------------------ svn:executable = *