Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 45941 invoked from network); 21 Nov 2008 20:46:01 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 21 Nov 2008 20:46:01 -0000 Received: (qmail 75789 invoked by uid 500); 21 Nov 2008 20:46:11 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 75729 invoked by uid 500); 21 Nov 2008 20:46:10 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 75720 invoked by uid 99); 21 Nov 2008 20:46:10 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Nov 2008 12:46:10 -0800 X-ASF-Spam-Status: No, hits=-1998.5 required=10.0 tests=ALL_TRUSTED,WEIRD_PORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Nov 2008 20:44:54 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id ECB992388A1B; Fri, 21 Nov 2008 12:44:46 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r719706 [6/6] - in /activemq/activemq-blaze: ./ branches/ tags/ trunk/ trunk/src/ trunk/src/main/ trunk/src/main/java/ trunk/src/main/java/org/ trunk/src/main/java/org/apache/ trunk/src/main/java/org/apache/activeblaze/ trunk/src/main/java/... Date: Fri, 21 Nov 2008 20:44:43 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081121204446.ECB992388A1B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelTest.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelTest.java (added) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelTest.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.coordinated; + +import java.util.ArrayList; +import java.util.List; +import junit.framework.TestCase; + +/** + * Test for coordinated channel + * + */ +public class BlazeCoordinatedGroupChannelTest extends TestCase { + public void testGroup() throws Exception { + final int number = 3; + List channels = new ArrayList(); + BlazeCoordinatedGroupChannelFactory factory = new BlazeCoordinatedGroupChannelFactory(); + for (int i = 0; i < number; i++) { + BlazeCoordinatedGroupChannel channel = factory.createChannel("test" + i); + channel.getCoordinatedGroupConfiguration().setMinimumGroupSize(number); + channel.start(); + channels.add(channel); + } + channels.get(number - 1).waitForElection(5000); + int coordinatorNumber = 0; + BlazeCoordinatedGroupChannel coordinator = null; + for (BlazeCoordinatedGroupChannel channel : channels) { + if (channel.isCoordinator()) { + coordinatorNumber++; + coordinator = channel; + } + } + assertNotNull(coordinator); + assertEquals(1, coordinatorNumber); + // kill the coordinator + coordinator.shutDown(); + Thread.sleep(factory.getConfiguration().getHeartBeatInterval() * 2); + coordinatorNumber = 0; + coordinator = null; + for (BlazeCoordinatedGroupChannel channel : channels) { + if (channel.isCoordinator()) { + coordinatorNumber++; + coordinator = channel; + } + } + assertNotNull(coordinator); + assertEquals(1, coordinatorNumber); + for (BlazeCoordinatedGroupChannel channel : channels) { + channel.shutDown(); + } + } + + public void testWeightedGroup() throws Exception { + final int number = 4; + List channels = new ArrayList(); + BlazeCoordinatedGroupChannelFactory factory = new BlazeCoordinatedGroupChannelFactory(); + BlazeCoordinatedGroupChannel weightedCoordinator = null; + for (int i = 0; i < number; i++) { + BlazeCoordinatedGroupChannel channel = factory.createChannel("test" + i); + channel.getCoordinatedGroupConfiguration().setMinimumGroupSize(number); + if (i == number / 2) { + channel.getCoordinatedGroupConfiguration().setCoordinatorWeight(10); + weightedCoordinator=channel; + }else { + channel.getCoordinatedGroupConfiguration().setCoordinatorWeight(0); + } + channel.start(); + channels.add(channel); + + } + channels.get(number - 1).waitForElection(5000); + int coordinatorNumber = 0; + BlazeCoordinatedGroupChannel coordinator = null; + for (BlazeCoordinatedGroupChannel channel : channels) { + if (channel.isCoordinator()) { + coordinatorNumber++; + coordinator = channel; + } + } + assertNotNull(coordinator); + assertTrue(coordinator==weightedCoordinator); + assertEquals(1, coordinatorNumber); + for (BlazeCoordinatedGroupChannel channel : channels) { + channel.shutDown(); + } + } +} Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelTest.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java (added) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.group; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.JMSException; +import junit.framework.TestCase; +import org.apache.activeblaze.BlazeChannel; +import org.apache.activeblaze.BlazeMessage; + +/** + * @author rajdavies + * + */ +public class BlazeGroupChannelTest extends TestCase { + /** + * Test method for + * {@link org.apache.activeblaze.group.BlazeGroupChannel#send(org.apache.activeblaze.group.Member, org.apache.activeblaze.BlazeMessage)}. + * + * @throws Exception + */ + public void testSendMemberBlazeMessage() throws Exception { + final int number = 5; + final AtomicInteger count = new AtomicInteger(); + List channels = new ArrayList(); + BlazeGroupChannelFactory factory = new BlazeGroupChannelFactory(); + for (int i = 0; i < number; i++) { + BlazeGroupChannel channel = factory.createGroupChannel("test" + i); + channel.start(); + channels.add(channel); + channel.setInboxListener(new BlazeQueueListener() { + public void onMessage(BlazeMessage message) { + synchronized (count) { + synchronized (count) { + count.incrementAndGet(); + count.notifyAll(); + } + } + } + }); + } + channels.get(0).getAndWaitForMemberByName(channels.get(number-1).getName(), 2000); + BlazeMessage msg = new BlazeMessage(); + msg.setString("test", "hello"); + channels.get(0).send(channels.get(1).getLocalMember(), msg); + synchronized (count) { + if (count.get() == 0) { + count.wait(5000); + } + } + // wait a while to check that only one got it + Thread.sleep(2000); + assertEquals(1, count.get()); + for (BlazeGroupChannel channel : channels) { + channel.shutDown(); + } + } + + /** + * Test method for + * {@link org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(org.apache.activeblaze.group.Member, org.apache.activeblaze.BlazeMessage)}. + * + * @throws Exception + */ + public void testSendRequestMemberBlazeMessage() throws Exception { + + final int number = 100; + final List requests = new ArrayList(); + final List replies = new ArrayList(); + for (int i = 0; i < number; i++) { + requests.add(new BlazeMessage("request" + i)); + replies.add(new BlazeMessage("reply" + i)); + } + BlazeGroupChannelFactory factory = new BlazeGroupChannelFactory(); + final BlazeGroupChannel request = factory.createGroupChannel("request"); + final BlazeGroupChannel reply = factory.createGroupChannel("reply"); + request.start(); + reply.start(); + reply.setInboxListener(new BlazeQueueListener() { + public void onMessage(BlazeMessage message) { + if (!replies.isEmpty()) { + BlazeMessage replyMsg = replies.remove(0); + try { + Member to = reply.getMemberById(message.getFromId()); + reply.sendReply(to, replyMsg, message.getMessageId()); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + }); + Thread.sleep(100); + Member target = request.getMemberByName("reply"); + for (int i = 0; i < requests.size(); i++) { + BlazeMessage requestMsg = requests.get(i); + BlazeMessage replyMsg = request.sendRequest(target, requestMsg); + assertNotNull(replyMsg); + } + assertTrue(replies.isEmpty()); + request.shutDown(); + reply.shutDown(); + } + + + public void testSendRequestString() throws Exception{ + String destination = "/test/foo"; + final int number = 100; + final List requests = new ArrayList(); + final List replies = new ArrayList(); + for (int i = 0; i < number; i++) { + requests.add(new BlazeMessage("request" + i)); + replies.add(new BlazeMessage("reply" + i)); + } + BlazeGroupChannelFactory factory = new BlazeGroupChannelFactory(); + final BlazeGroupChannel request = factory.createGroupChannel("request"); + final BlazeGroupChannel reply = factory.createGroupChannel("reply"); + reply.addBlazeQueueMessageListener(destination,new BlazeQueueListener() { + public void onMessage(BlazeMessage message) { + if (!replies.isEmpty()) { + BlazeMessage replyMsg = replies.remove(0); + try { + Member to = reply.getMemberById(message.getFromId()); + reply.sendReply(to, replyMsg, message.getMessageId()); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + }); + request.start(); + reply.start(); + Member result = request.getAndWaitForMemberByName("reply",1000); + assertNotNull(result); + for (int i = 0; i < requests.size(); i++) { + BlazeMessage requestMsg = requests.get(i); + BlazeMessage replyMsg = request.sendRequest(destination, requestMsg,1000); + assertNotNull(replyMsg); + } + assertTrue(replies.isEmpty()); + request.shutDown(); + reply.shutDown(); + } + + + /** + * Test method for + * {@link org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String, org.apache.activeblaze.BlazeMessage)}. + * + * @throws Exception + */ + public void testSendStringBlazeMessage() throws Exception { + final int number = 2; + String destination = "test.foo"; + final AtomicInteger count = new AtomicInteger(); + List channels = new ArrayList(); + BlazeGroupChannelFactory factory = new BlazeGroupChannelFactory(); + for (int i = 0; i < number; i++) { + BlazeGroupChannel channel = factory.createGroupChannel("test" + i); + channel.start(); + channels.add(channel); + channel.addBlazeQueueMessageListener(destination, new BlazeQueueListener() { + public void onMessage(BlazeMessage message) { + synchronized (count) { + synchronized (count) { + count.incrementAndGet(); + count.notifyAll(); + } + } + } + }); + } + Thread.sleep(2000); + BlazeMessage msg = new BlazeMessage(); + msg.setString("test", "hello"); + channels.get(0).send(destination, msg); + synchronized (count) { + if (count.get() == 0) { + count.wait(5000); + } + } + // wait a while to check that only one got it + Thread.sleep(2000); + assertEquals(1, count.get()); + for (BlazeChannel channel : channels) { + channel.shutDown(); + } + } +} Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/destination/DestinationMatchTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/destination/DestinationMatchTest.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/destination/DestinationMatchTest.java (added) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/destination/DestinationMatchTest.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.impl.destination; + +import org.apache.activeblaze.impl.destination.DestinationMatch; +import org.apache.activemq.protobuf.Buffer; +import junit.framework.TestCase; + +/** + * @author rajdavies + * + */ +public class DestinationMatchTest extends TestCase { + + /** + * Test method for {@link org.apache.activeblaze.impl.destination.DestinationMatch#isMatch(byte[], byte[])}. + */ + public void testIsMatch() { + String base = "foo.fred.blah"; + byte[] data = base.getBytes(); + byte[] test = new byte[data.length+4]; + System.arraycopy(data, 0, test, 4, data.length); + doMatchTest(new Buffer(test,4,data.length)); + } + private void doMatchTest(Buffer base) { + String match1 = "foo.fred.blah"; + String match2 = "foo.*.blah"; + String match3 = ">"; + String match4 = ">.*.blah"; + String match5 = ">.*.*"; + String match6 = "*.*.*"; + String match7 = "foo.fred.blah.>"; + + String fail1 = "blah.fred.foo"; + String fail2 = "*.fred.foo"; + String fail3 = "foo.*.foo.>"; + + assertTrue(DestinationMatch.isMatch(base, match1)); + assertTrue(DestinationMatch.isMatch(base, match2)); + assertTrue(DestinationMatch.isMatch(base, match3)); + assertTrue(DestinationMatch.isMatch(base, match4)); + assertTrue(DestinationMatch.isMatch(base, match5)); + assertTrue(DestinationMatch.isMatch(base, match6)); + assertTrue(DestinationMatch.isMatch(base, match7)); + + assertTrue(DestinationMatch.isMatch(match1,base)); + assertTrue(DestinationMatch.isMatch(match2,base)); + assertTrue(DestinationMatch.isMatch(match3,base)); + assertTrue(DestinationMatch.isMatch(match4,base)); + assertTrue(DestinationMatch.isMatch(match5,base)); + assertTrue(DestinationMatch.isMatch(match6,base)); + assertTrue(DestinationMatch.isMatch(match7,base)); + + assertFalse(DestinationMatch.isMatch(base, fail1)); + assertFalse(DestinationMatch.isMatch(base, fail2)); + assertFalse(DestinationMatch.isMatch(base, fail3)); + } +} Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/destination/DestinationMatchTest.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java (added) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.impl.processor; + +import java.util.concurrent.atomic.AtomicBoolean; +import junit.framework.TestCase; +import org.apache.activeblaze.wire.PacketData; + +/** + * Test some basics in ChainedProcessor + * + */ +public class ChainedProcessorTest extends TestCase { + public void testStart() throws Exception { + final AtomicBoolean test = new AtomicBoolean(); + ChainedProcessor target = new ChainedProcessor() { + public boolean start() { + return test.getAndSet(true); + } + }; + ChainedProcessor A = new ChainedProcessor(); + ChainedProcessor B = new ChainedProcessor(); + A.setNext(B); + A.setEnd(target); + A.start(); + assertTrue(test.get()); + } + + public void testStop() throws Exception { + final AtomicBoolean test = new AtomicBoolean(); + ChainedProcessor target = new ChainedProcessor() { + public boolean stop() { + return test.getAndSet(true); + } + }; + ChainedProcessor A = new ChainedProcessor(); + ChainedProcessor B = new ChainedProcessor(); + A.setNext(B); + A.setEnd(target); + A.start(); + A.stop(); + assertTrue(test.get()); + } + + public void testDownStream() throws Exception { + final AtomicBoolean test = new AtomicBoolean(); + ChainedProcessor target = new ChainedProcessor() { + public void downStream(Packet p) { + test.set(true); + } + }; + ChainedProcessor A = new ChainedProcessor(); + ChainedProcessor B = new ChainedProcessor(); + ChainedProcessor C = new ChainedProcessor(); + ChainedProcessor D = new ChainedProcessor(); + A.setEnd(B); + A.setEnd(C); + A.setEnd(D); + A.setEnd(target); + A.start(); + Packet p = new Packet(new PacketData()); + D.downStream(p); + assertTrue(test.get()); + } + + public void testUpStream() throws Exception { + final AtomicBoolean test = new AtomicBoolean(); + ChainedProcessor target = new ChainedProcessor() { + public void upStream(Packet p) { + test.set(true); + } + }; + ChainedProcessor A = new ChainedProcessor(); + ChainedProcessor B = new ChainedProcessor(); + ChainedProcessor C = new ChainedProcessor(); + ChainedProcessor D = new ChainedProcessor(); + target.setEnd(A); + A.setEnd(B); + A.setEnd(C); + A.setEnd(D); + A.start(); + Packet p = new Packet(new PacketData()); + D.upStream(p); + assertTrue(test.get()); + } +} Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java (added) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,42 @@ +/** + * + */ +package org.apache.activeblaze.impl.processor; + +import junit.framework.TestCase; +import org.apache.activeblaze.wire.PacketData; +import org.apache.activemq.protobuf.Buffer; + +/** + * @author rajdavies + * + */ +public class CompressionProcessorTest extends TestCase { + public void testProcessor() throws Exception { + Packet packet = new Packet(new PacketData()); + byte[] d1 = new byte[1024]; + Buffer payload = new Buffer(d1); + packet.getPacketData().setPayload(payload); + TerminatedChainedProcessor test = new TerminatedChainedProcessor(); + CompressionProcessor proc = new CompressionProcessor(); + proc.setPrev(test); + proc.setNext(test); + assertFalse(CompressionProcessor.isCompressed(packet.getPacketData().getPayload())); + proc.downStream(packet); + assertFalse(CompressionProcessor.isCompressed(packet.getPacketData().getPayload())); + // bigger payload + byte[] d2 = new byte[proc.getCompressionLimit() * 2]; + for (int i = 0; i < d2.length; i++) { + d2[i] = (byte) i; + } + payload = new Buffer(d2); + packet.getPacketData().setPayload(payload); + proc.downStream(packet.clone()); + Packet result = test.getResult(); + assertTrue(CompressionProcessor.isCompressed(result.getPacketData().getPayload())); + proc.upStream(result.clone()); + result = test.getResult(); + assertFalse(CompressionProcessor.isCompressed(result.getPacketData().getPayload())); + assertEquals(result.getPacketData().getPayload().length, packet.getPacketData().getPayload().length); + } +} Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java (added) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.impl.processor; + +import java.util.ArrayList; +import java.util.List; +import junit.framework.TestCase; +import org.apache.activeblaze.wire.PacketData; +import org.apache.activemq.protobuf.Buffer; + +/** + * @author rajdavies + * + */ +public class FragmentationProcessorTest extends TestCase { + public void testProcessor() throws Exception { + Packet packet = new Packet(new PacketData()); + byte[] testData = new byte[1024 * 32]; + for (int i = 0; i < testData.length; i++) { + testData[i] = (byte) i; + } + Buffer payload = new Buffer(testData); + packet.getPacketData().setPayload(payload); + TerminatedChainedProcessor test = new TerminatedChainedProcessor(); + FragmentationProcessor proc = new FragmentationProcessor(); + proc.setPrev(test); + proc.setNext(test); + proc.setMaxPacketSize(1024); + proc.downStream(packet); + List list = new ArrayList(test.getPacketList()); + assertTrue(list.size() > 1); + test.reset(); + assertNull(test.getResult()); + for (Packet p : list) { + proc.upStream(p); + } + Packet resultPacket = test.getResult(); + byte[] result = resultPacket.getPacketData().getPayload().toByteArray(); + assertEquals(result.length, testData.length); + for (int i = 0; i < result.length; i++) { + assertEquals("Testing byte at: " + i, testData[i], result[i]); + } + } +} Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java (added) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.impl.processor; + +import junit.framework.TestCase; +import org.apache.activeblaze.wire.PacketData; +import org.apache.activemq.protobuf.Buffer; + + +/** + * + * + */ +public class PacketAuditTest extends TestCase { + + /** + * Test method for duplicates + * @throws Exception + */ + public void testAudit() throws Exception { + PacketAudit audit = new PacketAudit(); + audit.start(); + for (long i =0; i< audit.getMaxAuditDepth();i++) { + PacketData data = new PacketData(); + data.setProducerId(new Buffer("fred")); + data.setMessageSequence(i); + Packet packet = new Packet(data); + assertFalse(audit.isDuplicate(packet)); + } + + for (long i =0; i< audit.getMaxAuditDepth();i++) { + PacketData data = new PacketData(); + data.setProducerId(new Buffer("fred")); + data.setMessageSequence(i); + Packet packet = new Packet(data); + assertTrue("Testing " + i,audit.isDuplicate(packet)); + } + } +} Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java (added) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.impl.processor; + +import java.util.ArrayList; +import java.util.List; +import org.apache.activeblaze.impl.processor.ChainedProcessor; +import org.apache.activeblaze.impl.processor.Packet; + + +/** + * Test Processor + * + */ +public class TerminatedChainedProcessor extends ChainedProcessor { + + private Packet result = null; + private List list = new ArrayList(); + public TerminatedChainedProcessor() { + } + + public void downStream(Packet packet){ + this.result= packet; + this.list.add(packet); + } + + public void upStream(Packet packet){ + this.result=packet; + } + + public Packet getResult() { + return this.result; + } + + public ListgetPacketList(){ + return this.list; + } + + public void reset(){ + this.result=null; + this.list.clear(); + } +} Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/TerminatedChainedProcessor.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java (added) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.impl.transport; + +import java.net.InetSocketAddress; +import java.net.URI; +import org.apache.activeblaze.impl.processor.Packet; +import org.apache.activeblaze.impl.processor.TerminatedChainedProcessor; +import org.apache.activeblaze.wire.PacketData; +import org.apache.activemq.protobuf.Buffer; +import junit.framework.TestCase; + +/** + * Test Multicast Transport + * + */ +public class MulticastTransportTest extends TestCase { + public void testTransport() throws Exception { + URI broadcastURI = new URI("mcast://224.2.2.2:9999"); + InetSocketAddress to = new InetSocketAddress(broadcastURI.getHost(), broadcastURI.getPort()); + MulticastTransport sender = new MulticastTransport(); + sender.setLocalURI(broadcastURI); + sender.init(); + sender.start(); + TerminatedChainedProcessor test = new TerminatedChainedProcessor(); + MulticastTransport receiver = new MulticastTransport(); + receiver.setPrev(test); + receiver.setLocalURI(broadcastURI); + receiver.init(); + receiver.start(); + String payload = "test String"; + Buffer duff = new Buffer("duff"); + PacketData packetData = new PacketData(); + packetData.setType(1); + packetData.setMessageId(new Buffer("foo")); + packetData.setProducerId(duff); + packetData.setFromAddress(duff); + packetData.setSessionId(1); + packetData.setMessageSequence(0); + packetData.setPayload(new Buffer(payload)); + Packet packet = new Packet(packetData); + packet.setTo(to); + sender.downStream(packet); + Thread.sleep(500); + assertEquals(payload, test.getResult().getPacketData().getPayload().toStringUtf8()); + receiver.shutDown(); + sender.shutDown(); + } +} Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java (added) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.impl.transport; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.URI; +import junit.framework.TestCase; + +import org.apache.activeblaze.impl.processor.Packet; +import org.apache.activeblaze.impl.processor.TerminatedChainedProcessor; +import org.apache.activeblaze.wire.PacketData; +import org.apache.activemq.protobuf.Buffer; + + + +/** + * @author rajdavies + * + */ +public class UdpTransportTest extends TestCase { + + public void testTransport() throws Exception { + URI receiverURI = new URI("udp://localhost:6966"); + URI senderURI = new URI("udp://localhost:6766"); + + + UdpTransport sender = new UdpTransport(); + sender.setLocalURI(senderURI); + sender.start(); + + TerminatedChainedProcessor test = new TerminatedChainedProcessor(); + UdpTransport receiver = new UdpTransport(); + receiver.setPrev(test); + receiver.setLocalURI(receiverURI); + receiver.start(); + + + String payload = "test String"; + Buffer duff = new Buffer("duff"); + PacketData packetData = new PacketData(); + packetData.setType(1); + packetData.setMessageId(new Buffer("foo")); + packetData.setProducerId(duff); + packetData.setFromAddress(duff); + packetData.setSessionId(1); + packetData.setMessageSequence(0); + packetData.setPayload(new Buffer(payload)); + Packet packet = new Packet(receiverURI.getHost(),receiverURI.getPort(),packetData); + + + sender.downStream(packet); + + Thread.sleep(500); + + assertEquals(payload,test.getResult().getPacketData().getPayload().toStringUtf8()); + receiver.shutDown(); + sender.shutDown(); + } +} Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/util/PropertyUtilTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/util/PropertyUtilTest.java?rev=719706&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/util/PropertyUtilTest.java (added) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/util/PropertyUtilTest.java Fri Nov 21 12:44:40 2008 @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.util; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import junit.framework.TestCase; + +/** + * Test cases for PropertyUtil + * + */ +public class PropertyUtilTest extends TestCase { + /** + * Test method for {@link org.apache.activeblaze.util.PropertyUtil#parseParameters(java.net.URI)}. + * + * @throws Exception + */ + public void testParseParametersURI() throws Exception { + String query = "name=foo&type=blah"; + URI uri = new URI("http://blah:60606?" + query); + Map result = PropertyUtil.parseParameters(uri); + assertEquals(2, result.size()); + assertEquals("foo", result.get("name")); + assertEquals("blah", result.get("type")); + } + + /** + * Test method for {@link org.apache.activeblaze.util.PropertyUtil#parseParameters(java.lang.String)}. + * + * @throws Exception + */ + public void testParseParametersString() throws Exception { + String query = "name=foo&type=blah"; + String uri = "somestring?" + query; + Map result = PropertyUtil.parseParameters(uri); + assertEquals(2, result.size()); + assertEquals("foo", result.get("name")); + assertEquals("blah", result.get("type")); + } + + /** + * Test method for + * {@link org.apache.activeblaze.util.PropertyUtil#addPropertiesToURI(java.lang.String, java.util.Map)}. + * @throws Exception + */ + public void testAddPropertiesToURI() throws Exception { + String query = "type=blah&name=foo"; + Map map = new HashMap(); + map.put("name", "foo"); + map.put("type", "blah"); + String test = "somestring?"+query; + String result = PropertyUtil.addPropertiesToURI(test, map); + URI uri = new URI(result); + assertEquals(uri.getQuery(),query); + } +} Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/util/PropertyUtilTest.java ------------------------------------------------------------------------------ svn:eol-style = native