From commits-return-12579-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Fri Dec 11 19:40:40 2009 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 63297 invoked from network); 11 Dec 2009 19:40:40 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 11 Dec 2009 19:40:40 -0000 Received: (qmail 8637 invoked by uid 500); 11 Dec 2009 19:40:40 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 8578 invoked by uid 500); 11 Dec 2009 19:40:40 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 8568 invoked by uid 99); 11 Dec 2009 19:40:40 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Dec 2009 19:40:40 +0000 X-ASF-Spam-Status: No, hits=-1998.8 required=10.0 tests=ALL_TRUSTED,FS_REPLICA X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Dec 2009 19:40:30 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 9979623889D7; Fri, 11 Dec 2009 19:40:08 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r889781 [3/3] - in /activemq/sandbox/activemq-apollo/activemq-kahadb-replication: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/kahadb/ src/main/java/org/apache/kahadb/replication/ sr... Date: Fri, 11 Dec 2009 19:40:02 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091211194008.9979623889D7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java?rev=889781&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java (added) +++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/transport/KDBRTransportTest.java Fri Dec 11 19:39:58 2009 @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.replication.transport; + +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import junit.framework.TestCase; + +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportAcceptListener; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.TransportListener; +import org.apache.activemq.transport.TransportServer; +import org.apache.kahadb.replication.ReplicationFrame; +import org.apache.kahadb.replication.pb.PBHeader; +import org.apache.kahadb.replication.pb.PBJournalLocation; +import org.apache.kahadb.replication.pb.PBSlaveInit; +import org.apache.kahadb.replication.pb.PBType; + +public class KDBRTransportTest extends TestCase { + + private static final String KDBR_URI = "kdbr://localhost:61618"; + private List serverQueue; + private List clientQueue; + private List serverTransports; + private TransportServer server; + private Transport client; + + private Object commandLatchMutex = new Object(); + private CountDownLatch commandLatch; + + protected void releaseCommandLatch() { + synchronized( commandLatchMutex ) { + if( commandLatch == null ) { + return; + } + commandLatch.countDown(); + commandLatch=null; + } + } + + protected CountDownLatch getCommandLatch() { + synchronized( commandLatchMutex ) { + if( commandLatch == null ) { + commandLatch = new CountDownLatch(1); + } + return commandLatch; + } + } + + @Override + protected void setUp() throws Exception { + serverQueue = Collections.synchronizedList(new ArrayList()); + clientQueue = Collections.synchronizedList(new ArrayList()); + serverTransports = Collections.synchronizedList(new ArrayList()); + + // Setup a server + server = TransportFactory.bind(new URI(KDBR_URI)); + server.setAcceptListener(new TransportAcceptListener() { + public void onAccept(Transport transport) { + try { + transport.setTransportListener(new TransportListener() { + public void onCommand(Object command) { + try { + serverQueue.add(command); + process(command); + releaseCommandLatch(); + } catch (IOException e) { + onException(e); + } + } + + public void onException(IOException error) { + serverQueue.add(error); + serverTransports.remove(this); + releaseCommandLatch(); + } + + public void transportInterupted() { + } + + public void transportResumed() { + } + }); + transport.start(); + serverTransports.add(transport); + } catch (Exception e) { + onAcceptError(e); + } + } + + public void onAcceptError(Exception error) { + error.printStackTrace(); + } + }); + server.start(); + + // Connect a client. + client = TransportFactory.connect(new URI(KDBR_URI)); + client.setTransportListener(new TransportListener() { + public void onCommand(Object command) { + clientQueue.add(command); + releaseCommandLatch(); + } + + public void onException(IOException error) { + clientQueue.add(error); + releaseCommandLatch(); + } + + public void transportInterupted() { + } + + public void transportResumed() { + } + }); + client.start(); + } + + @Override + protected void tearDown() throws Exception { + client.stop(); + server.stop(); + } + + private void process(Object command) throws IOException { + ReplicationFrame frame = (ReplicationFrame) command; + // Since we are processing the commands async in this test case we need to full read the stream before + // returning since will be be used to read the next command once we return. + if( frame.getHeader().getType() == PBType.FILE_TRANSFER_RESPONSE ) { + InputStream ais = (InputStream) frame.getPayload(); + byte actualPayload[] = new byte[(int)frame.getHeader().getPayloadSize()]; + readFully(ais, actualPayload); + frame.setPayload(actualPayload); + } + } + + /** + * Test a frame that has a streaming payload. + * + * @throws Exception + */ + public void testFileTransferResponse() throws Exception { + + byte expectedPayload[] = {1,2,3,4,5,6,7,8,9,10}; + + ReplicationFrame expected = new ReplicationFrame(); + expected.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER_RESPONSE).setPayloadSize(expectedPayload.length)); + ByteArrayInputStream is = new ByteArrayInputStream(expectedPayload); + expected.setPayload(is); + + CountDownLatch latch = getCommandLatch(); + client.oneway(expected); + is.close(); + latch.await(2, TimeUnit.SECONDS); + + assertEquals(1, serverQueue.size()); + ReplicationFrame actual = (ReplicationFrame) serverQueue.remove(0); + + assertEquals(expected.getHeader(), actual.getHeader()); + assertTrue(Arrays.equals(expectedPayload, (byte[])actual.getPayload())); + + } + + + /** + * Test out sending a frame that has a PB payload. + * + * @throws Exception + */ + public void testPBSlaveInitFrame() throws Exception { + + + ReplicationFrame expected = new ReplicationFrame(); + expected.setHeader(new PBHeader().setType(PBType.SLAVE_INIT)); + expected.setPayload(new PBSlaveInit().setNodeId("foo")); + + CountDownLatch latch = getCommandLatch(); + client.oneway(expected); + latch.await(2, TimeUnit.SECONDS); + + assertEquals(1, serverQueue.size()); + ReplicationFrame actual = (ReplicationFrame) serverQueue.remove(0); + + assertEquals(expected.getHeader(), actual.getHeader()); + assertEquals(expected.getPayload(), actual.getPayload()); + + } + + + private void readFully(InputStream ais, byte[] actualPayload) throws IOException { + int pos = 0; + int c; + while( pos < actualPayload.length && (c=ais.read(actualPayload, pos, actualPayload.length-pos))>=0 ) { + pos += c; + } + if( pos < actualPayload.length ) { + throw new EOFException(); + } + } +} Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManagerTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManagerTest.java?rev=889781&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManagerTest.java (added) +++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManagerTest.java Fri Dec 11 19:39:58 2009 @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.replication.zk; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.Socket; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import junit.framework.TestCase; + +import org.apache.activemq.util.Callback; +import org.apache.kahadb.replication.ClusterListener; +import org.apache.kahadb.replication.ClusterState; +import org.apache.kahadb.replication.pb.PBClusterNodeStatus; +import org.apache.kahadb.replication.pb.PBJournalLocation; +import org.apache.kahadb.replication.pb.PBClusterNodeStatus.State; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.NIOServerCnxn; +import org.apache.zookeeper.server.ServerStats; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.NIOServerCnxn.Factory; +import org.apache.zookeeper.server.persistence.FileTxnLog; + +public class ZooKeeperClusterStateManagerTest extends TestCase { + + private static final int PORT = 2181; + private ZooKeeperClusterStateManager zkcsm1; + private ZooKeeper zk; + private Factory serverFactory; + + @Override + protected void setUp() throws Exception { + + ServerStats.registerAsConcrete(); + File tmpDir = new File("target/test-data/zookeeper"); + tmpDir.mkdirs(); + + // Reduces startup time.. + System.setProperty("zookeeper.preAllocSize", "100"); + FileTxnLog.setPreallocSize(100); + + ZooKeeperServer zs = new ZooKeeperServer(tmpDir, tmpDir, 3000); + + serverFactory = new NIOServerCnxn.Factory(PORT); + serverFactory.startup(zs); + + zkcsm1 = new ZooKeeperClusterStateManager(); + zk = zkcsm1.createZooKeeperConnection(); + + // Wait till the ZK client gets connected.. + States state; + while( (state = zk.getState()) != States.CONNECTED ) { + Thread.sleep(100); + } + + // Cleanup after previous run... + zkRecusiveDelete(zkcsm1.getPath()); + } + + private void zkRecusiveDelete(String path) throws KeeperException, InterruptedException { + Stat stat = zk.exists(path, false); + if( stat!=null ) { + if( stat.getNumChildren() > 0 ) { + List children = zk.getChildren(path, false); + for (String node : children) { + zkRecusiveDelete(path+"/"+node); + } + } + zk.delete(path, stat.getVersion()); + } + } + + @Override + protected void tearDown() throws Exception { + zk.close(); + serverFactory.shutdown(); + ServerStats.unregister(); + } + + public void testTwoNodesGoingOnline() throws Exception { + final LinkedBlockingQueue stateEvents1 = new LinkedBlockingQueue(); + final LinkedBlockingQueue stateEvents2 = new LinkedBlockingQueue(); + + zkcsm1.addListener(new ClusterListener() { + public void onClusterChange(ClusterState config) { + stateEvents1.add(config); + } + }); + zkcsm1.start(); + zkcsm1.addMember("kdbr://localhost:60001"); + + final ZooKeeperClusterStateManager zkcsm2 = new ZooKeeperClusterStateManager(); + zkcsm2.addListener(new ClusterListener() { + public void onClusterChange(ClusterState config) { + stateEvents2.add(config); + } + }); + zkcsm2.start(); + zkcsm2.addMember("kdbr://localhost:60002"); + + // Drain the events.. + while( stateEvents1.poll(100, TimeUnit.MILLISECONDS)!=null ) { + } + while( stateEvents2.poll(100, TimeUnit.MILLISECONDS)!=null ) { + } + + // Bring node 1 online + final PBClusterNodeStatus status1 = new PBClusterNodeStatus(); + status1.setConnectUri("kdbr://localhost:60001"); + status1.setLastUpdate(new PBJournalLocation().setFileId(1).setOffset(50)); + status1.setState(State.SLAVE_UNCONNECTED); + + executeAsync(new Callback() { + public void execute() throws Exception { + zkcsm1.setMemberStatus(status1); + } + }); + + // Bring node 2 online + final PBClusterNodeStatus status2 = new PBClusterNodeStatus(); + status2.setConnectUri("kdbr://localhost:60002"); + status2.setLastUpdate(new PBJournalLocation().setFileId(2).setOffset(20)); + status2.setState(State.SLAVE_UNCONNECTED); + + executeAsync(new Callback() { + public void execute() throws Exception { + Thread.sleep(1000); + zkcsm2.setMemberStatus(status2); + } + }); + + ClusterState state = stateEvents1.poll(10, TimeUnit.SECONDS); + assertNotNull(state); + assertNotNull(state.getMaster()); + assertEquals("kdbr://localhost:60002", state.getMaster()); + assertTrue(state.getSlaves().size()==1); + + state = stateEvents2.poll(1, TimeUnit.SECONDS); + assertNotNull(state); + assertNotNull(state.getMaster()); + assertEquals("kdbr://localhost:60002", state.getMaster()); + assertTrue(state.getSlaves().size()==1); + + zkcsm2.stop(); + zkcsm1.stop(); + } + + private void executeAsync(final Callback callback) { + new Thread("Async Test Task") { + @Override + public void run() { + try { + callback.execute(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }.start(); + } + + public void testOneNodeGoingOnline() throws Exception { + final LinkedBlockingQueue stateEvents1 = new LinkedBlockingQueue(); + zkcsm1.addListener(new ClusterListener() { + public void onClusterChange(ClusterState config) { + stateEvents1.add(config); + } + }); + zkcsm1.start(); + + // Drain the events.. + while( stateEvents1.poll(100, TimeUnit.MILLISECONDS)!=null ) { + } + + // Let node1 join the cluster. + zkcsm1.addMember("kdbr://localhost:60001"); + + ClusterState state = stateEvents1.poll(1, TimeUnit.SECONDS); + assertNotNull(state); + assertNull(state.getMaster()); + assertTrue(state.getSlaves().size()==1); + + // Let the cluster know that node1 is online.. + PBClusterNodeStatus status = new PBClusterNodeStatus(); + status.setConnectUri("kdbr://localhost:60001"); + status.setLastUpdate(new PBJournalLocation().setFileId(0).setOffset(0)); + status.setState(State.SLAVE_UNCONNECTED); + zkcsm1.setMemberStatus(status); + + state = stateEvents1.poll(10, TimeUnit.SECONDS); + assertNotNull(state); + assertNotNull(state.getMaster()); + assertEquals("kdbr://localhost:60001", state.getMaster()); + assertTrue(state.getSlaves().isEmpty()); + + zkcsm1.stop(); + } +} Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker1/ha-broker.xml URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker1/ha-broker.xml?rev=889781&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker1/ha-broker.xml (added) +++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker1/ha-broker.xml Fri Dec 11 19:39:58 2009 @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker1/ha.xml URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker1/ha.xml?rev=889781&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker1/ha.xml (added) +++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker1/ha.xml Fri Dec 11 19:39:58 2009 @@ -0,0 +1,47 @@ + + + + + + + + + + + + + + + + + + + + Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker2/ha-broker.xml URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker2/ha-broker.xml?rev=889781&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker2/ha-broker.xml (added) +++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker2/ha-broker.xml Fri Dec 11 19:39:58 2009 @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker2/ha.xml URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker2/ha.xml?rev=889781&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker2/ha.xml (added) +++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/broker2/ha.xml Fri Dec 11 19:39:58 2009 @@ -0,0 +1,47 @@ + + + + + + + + + + + + + + + + + + + + Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/log4j.properties?rev=889781&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/log4j.properties (added) +++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/resources/log4j.properties Fri Dec 11 19:39:58 2009 @@ -0,0 +1,36 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +# +# The logging properties used during tests.. +# +log4j.rootLogger=INFO, out + +log4j.logger.org.apache.activemq.spring=WARN +log4j.logger.org.apache.zookeeper=WARN + +# CONSOLE appender not used by default +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n + +# File appender +log4j.appender.out=org.apache.log4j.FileAppender +log4j.appender.out.layout=org.apache.log4j.PatternLayout +log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n +log4j.appender.out.file=target/activemq-test.log +log4j.appender.out.append=true