Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 75432 invoked from network); 25 Nov 2008 14:28:17 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 25 Nov 2008 14:28:17 -0000 Received: (qmail 33241 invoked by uid 500); 25 Nov 2008 14:28:27 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 33221 invoked by uid 500); 25 Nov 2008 14:28:27 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 33212 invoked by uid 99); 25 Nov 2008 14:28:27 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Nov 2008 06:28:27 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Nov 2008 14:27:09 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 9452B23888A4; Tue, 25 Nov 2008 06:27:25 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r720505 [2/2] - in /activemq/activemq-blaze/trunk/src: main/java/org/apache/activeblaze/ main/java/org/apache/activeblaze/coordinated/ main/java/org/apache/activeblaze/group/ main/java/org/apache/activeblaze/impl/processor/ main/java/org/ap... Date: Tue, 25 Nov 2008 14:27:24 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081125142725.9452B23888A4@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java?rev=720505&r1=720504&r2=720505&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java Tue Nov 25 06:27:23 2008 @@ -24,10 +24,17 @@ import java.net.URI; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; +import java.util.Map; import org.apache.activeblaze.BlazeException; +import org.apache.activeblaze.BlazeNoRouteException; import org.apache.activeblaze.impl.processor.Packet; import org.apache.activeblaze.util.IOUtils; +import org.apache.activeblaze.util.LRUCache; +import org.apache.activeblaze.util.SendRequest; +import org.apache.activeblaze.wire.AckData; +import org.apache.activeblaze.wire.MessageType; import org.apache.activeblaze.wire.PacketData; +import org.apache.activemq.protobuf.Buffer; /** * UdpTransport @@ -37,6 +44,7 @@ private DatagramChannel channel; private ByteBuffer inBuffer; private ByteBuffer outBuffer; + private Map messageRequests = new LRUCache(1000); public boolean init() throws Exception { boolean result = super.init(); @@ -90,9 +98,35 @@ InputStream stream = IOUtils.getByteBufferInputStream(buffer); PacketData data = PacketData.parseFramed(stream); stream.close(); - Packet packet = new Packet(address, data); - if (!isEnableAudit() || !this.audit.isDuplicate(packet)) { - upStream(packet); + if (MessageType.ACK_DATA.getNumber() == data.getType()) { + synchronized (this.messageRequests) { + SendRequest request = this.messageRequests.remove(data.getCorrelationId()); + if (request != null) { + MessageType type = MessageType.ACK_DATA; + AckData ack = (AckData) type.createMessage(); + ack.mergeFramed(data.getPayload()); + request.put(data.getMessageId(), ack); + } + } + } else { + if (data.getResponseRequired()) { + MessageType type = MessageType.ACK_DATA; + AckData ack = (AckData) type.createMessage(); + ack.setMessageId(data.getMessageId()); + PacketData pd = new PacketData(); + pd.setResponseRequired(false); + pd.setCorrelationId(data.getMessageId()); + pd.setType(type.getNumber()); + pd.setFromAddress(getBufferOfLocalURI()); + pd.setPayload(ack.toFramedBuffer()); + Packet packet = new Packet(pd); + packet.setTo(address); + downStream(packet); + } + Packet packet = new Packet(address, data); + if (!isEnableAudit() || !this.audit.isDuplicate(packet)) { + upStream(packet); + } } } buffer.clear(); @@ -102,6 +136,13 @@ public void downStream(Packet packet) throws Exception { ByteBuffer buffer = this.outBuffer; if (isStarted()) { + SendRequest request = null; + if (packet.getPacketData().getResponseRequired()) { + synchronized (this.messageRequests) { + request = new SendRequest(); + this.messageRequests.put(packet.getPacketData().getMessageId(), request); + } + } buffer.clear(); OutputStream stream = IOUtils.getByteBufferOutputStream(buffer); if (isEnableAudit()) { @@ -112,8 +153,13 @@ stream.close(); buffer.flip(); this.channel.send(buffer, packet.getTo()); + if (request != null) { + if (request.get(0) == null) { + throw new BlazeNoRouteException("No response in " + getSoTimeout() + " ms from " + packet.getTo()); + } + } } else { - throw new BlazeException("Not started"); + throw new BlazeException("Not started - trying to downStream " + packet); } } } Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/AsyncGroupRequest.java (from r719718, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/AsyncGroupRequest.java) URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/AsyncGroupRequest.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/AsyncGroupRequest.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/AsyncGroupRequest.java&r1=719718&r2=720505&rev=720505&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/AsyncGroupRequest.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/AsyncGroupRequest.java Tue Nov 25 06:27:23 2008 @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activeblaze.group; +package org.apache.activeblaze.util; import java.util.HashSet; import java.util.Set; Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/ClassLoadingAwareObjectInputStream.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/ClassLoadingAwareObjectInputStream.java?rev=720505&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/ClassLoadingAwareObjectInputStream.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/ClassLoadingAwareObjectInputStream.java Tue Nov 25 06:27:23 2008 @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectStreamClass; +import java.lang.reflect.Proxy; +import java.util.HashMap; + +public class ClassLoadingAwareObjectInputStream extends ObjectInputStream { + + private static final ClassLoader FALLBACK_CLASS_LOADER = ClassLoadingAwareObjectInputStream.class.getClassLoader(); + /**

Maps primitive type names to corresponding class objects.

*/ + private static final HashMap primClasses = new HashMap(8, 1.0F); + public ClassLoadingAwareObjectInputStream(InputStream in) throws IOException { + super(in); + } + + protected Class resolveClass(ObjectStreamClass classDesc) throws IOException, ClassNotFoundException { + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + return load(classDesc.getName(), cl); + } + + protected Class resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException { + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + Class[] cinterfaces = new Class[interfaces.length]; + for (int i = 0; i < interfaces.length; i++) { + cinterfaces[i] = load(interfaces[i], cl); + } + + try { + return Proxy.getProxyClass(cinterfaces[0].getClassLoader(), cinterfaces); + } catch (IllegalArgumentException e) { + throw new ClassNotFoundException(null, e); + } + } + + private Class load(String className, ClassLoader cl) + throws ClassNotFoundException { + try { + return Class.forName(className, false, cl); + } catch (ClassNotFoundException e) { + final Class clazz = (Class) primClasses.get(className); + if (clazz != null) { + return clazz; + } else { + return Class.forName(className, false, FALLBACK_CLASS_LOADER); + } + } + } + + + + static { + primClasses.put("boolean", boolean.class); + primClasses.put("byte", byte.class); + primClasses.put("char", char.class); + primClasses.put("short", short.class); + primClasses.put("int", int.class); + primClasses.put("long", long.class); + primClasses.put("float", float.class); + primClasses.put("double", double.class); + primClasses.put("void", void.class); + } + +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/ClassLoadingAwareObjectInputStream.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LRUCache.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LRUCache.java?rev=720505&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LRUCache.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LRUCache.java Tue Nov 25 06:27:23 2008 @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.util; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * A Simple LRU Cache + * + * @version $Revision$ + * @param + * @param + */ + +public class LRUCache extends LinkedHashMap { + private static final long serialVersionUID = -342098639681884413L; + protected int maxCacheSize = 10000; + + /** + * Default constructor for an LRU Cache The default capacity is 10000 + */ + public LRUCache() { + this(0,10000, 0.75f, true); + } + + /** + * Constructs a LRUCache with a maximum capacity + * + * @param maximumCacheSize + */ + public LRUCache(int maximumCacheSize) { + this(0, maximumCacheSize, 0.75f, true); + } + + /** + * Constructs an empty LRUCache instance with the specified + * initial capacity, maximumCacheSize,load factor and ordering mode. + * + * @param initialCapacity the initial capacity. + * @param maximumCacheSize + * @param loadFactor the load factor. + * @param accessOrder the ordering mode - true for access-order, + * false for insertion-order. + * @throws IllegalArgumentException if the initial capacity is negative or + * the load factor is non-positive. + */ + + public LRUCache(int initialCapacity, int maximumCacheSize, float loadFactor, boolean accessOrder) { + super(initialCapacity, loadFactor, accessOrder); + this.maxCacheSize = maximumCacheSize; + } + + /** + * @return Returns the maxCacheSize. + */ + public int getMaxCacheSize() { + return maxCacheSize; + } + + /** + * @param maxCacheSize The maxCacheSize to set. + */ + public void setMaxCacheSize(int maxCacheSize) { + this.maxCacheSize = maxCacheSize; + } + + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > maxCacheSize; + } +} + Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LRUCache.java ------------------------------------------------------------------------------ svn:eol-style = native Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/RequestCallback.java (from r719718, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java) URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/RequestCallback.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/RequestCallback.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java&r1=719718&r2=720505&rev=720505&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/RequestCallback.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/RequestCallback.java Tue Nov 25 06:27:23 2008 @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activeblaze.group; +package org.apache.activeblaze.util; import org.apache.activemq.protobuf.Buffer; Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java (from r719718, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java) URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java&r1=719718&r2=720505&rev=720505&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/SendRequest.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java Tue Nov 25 06:27:23 2008 @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activeblaze.group; +package org.apache.activeblaze.util; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.protobuf.Buffer; @@ -23,16 +23,16 @@ import org.apache.commons.logging.LogFactory; /** - * @author state on a request + * state on a request * */ -class SendRequest { +public class SendRequest { private static final Log LOG = LogFactory.getLog(SendRequest.class); private final AtomicBoolean done = new AtomicBoolean(); private Message response; private RequestCallback callback; - Object get(long timeout) { + public Object get(long timeout) { synchronized (this.done) { if (this.done.get() == false && this.response == null) { try { @@ -45,7 +45,7 @@ return this.response; } - void put(Buffer id,Message response) { + public void put(Buffer id,Message response) { this.response = response; cancel(); RequestCallback callback = this.callback; Modified: activemq/activemq-blaze/trunk/src/main/proto/blaze.proto URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/proto/blaze.proto?rev=720505&r1=720504&r2=720505&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/proto/blaze.proto (original) +++ activemq/activemq-blaze/trunk/src/main/proto/blaze.proto Tue Nov 25 06:27:23 2008 @@ -30,21 +30,33 @@ BLAZE_DATA = 0; MEMBER_DATA = 1; ELECTION_MESSAGE = 2; + ACK_DATA = 3; } message PacketData { - optional int32 type =1; - optional bytes producerId = 2; - optional bytes fromAddress =3; - optional int64 sessionId = 4; - optional int64 messageSequence = 5; - optional bool reliable = 6; - optional int32 numberOfParts= 7; - optional int32 partNumber= 8; - optional bytes payload= 9; - optional bytes messageId =10; - optional bytes correlationId = 11; + optional bool responseRequired = 1; + optional int32 type =2; + optional bytes producerId = 3; + optional bytes fromAddress =4; + optional int64 sessionId = 5; + optional int64 messageSequence = 6; + optional bool reliable = 7; + optional int32 numberOfParts= 8; + optional int32 partNumber= 9; + optional bytes payload= 10; + optional bytes messageId =11; + optional bytes correlationId = 12; } + message AckData { + //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType"; + //| option java_type_method = "MessageType"; + optional bytes messageId = 1; + optional int64 messageSequence =2; + optional bytes fromAddress =3; + optional int64 sessionId = 4; + optional int64 messageSequence = 5; + } + message DestinationData { required bool topic = 1; required bytes destination = 2; @@ -127,6 +139,11 @@ optional bytes value = 2; } + message BufferType { + optional string name = 1; + optional bytes value = 2; + } + message MapData { @@ -142,6 +159,7 @@ repeated CharType charType = 10; repeated BytesType bytesType = 11; repeated MapData mapType = 12; + repeated BufferType bufferType = 13; } Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java?rev=720505&r1=720504&r2=720505&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java (original) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java Tue Nov 25 06:27:23 2008 @@ -29,13 +29,12 @@ */ public class BlazeChannelTest extends TestCase { public void testChannel() throws Exception { - int count = 100; + int count = 10000; final AtomicInteger received = new AtomicInteger(); String destination = "test.foo"; BlazeChannelFactory factory = new BlazeChannelFactory(); BlazeChannel sender = factory.createChannel(); BlazeChannel receiver = factory.createChannel(); - receiver.getConfiguration().setUseDispatchThread(true); sender.start(); receiver.start(); final CountDownLatch latch = new CountDownLatch(count); Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java?rev=720505&r1=720504&r2=720505&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java (original) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java Tue Nov 25 06:27:23 2008 @@ -19,13 +19,12 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; -import javax.jms.JMSException; import junit.framework.TestCase; import org.apache.activeblaze.BlazeChannel; import org.apache.activeblaze.BlazeMessage; /** - * @author rajdavies + * Test BlazeGroupChannel * */ public class BlazeGroupChannelTest extends TestCase { Added: activemq/activemq-blaze/trunk/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/resources/log4j.properties?rev=720505&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/test/resources/log4j.properties (added) +++ activemq/activemq-blaze/trunk/src/test/resources/log4j.properties Tue Nov 25 06:27:23 2008 @@ -0,0 +1,36 @@ +## ------------------------------------------------------------------------ +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## ------------------------------------------------------------------------ + +# +# The logging properties used for eclipse testing, We want to see debug output on the console. +# +log4j.rootLogger=INFO, out + + + +# CONSOLE appender not used by default +log4j.appender.out=org.apache.log4j.ConsoleAppender +log4j.appender.out.layout=org.apache.log4j.PatternLayout +log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n +#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n + +# File appender +log4j.appender.fout=org.apache.log4j.FileAppender +log4j.appender.fout.layout=org.apache.log4j.PatternLayout +log4j.appender.fout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n +log4j.appender.fout.file=target/amq-testlog.log +log4j.appender.fout.append=true \ No newline at end of file