Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D578010AEE for ; Wed, 12 Mar 2014 14:13:34 +0000 (UTC) Received: (qmail 97845 invoked by uid 500); 12 Mar 2014 14:13:30 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 97669 invoked by uid 500); 12 Mar 2014 14:13:27 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 95722 invoked by uid 99); 12 Mar 2014 14:13:11 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Mar 2014 14:13:11 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 636D8941CAD; Wed, 12 Mar 2014 14:13:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hadrian@apache.org To: commits@activemq.apache.org Date: Wed, 12 Mar 2014 14:13:17 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [08/13] git commit: If a replicated leveldb slave's connection gets slow, lets merge together journal write events to avoid them queuing up on the master side. If a replicated leveldb slave's connection gets slow, lets merge together journal write events to avoid them queuing up on the master side. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/554efe3e Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/554efe3e Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/554efe3e Branch: refs/heads/activemq-5.9 Commit: 554efe3e169e079dfdda62b5de7fa16d4b512db0 Parents: 971d924 Author: Hiram Chirino Authored: Tue Nov 5 10:49:40 2013 -0500 Committer: Hadrian Zbarcea Committed: Wed Mar 12 09:01:15 2014 -0400 ---------------------------------------------------------------------- .../leveldb/replicated/MasterLevelDBStore.scala | 84 ++++++++++++++----- .../replicated/ReplicationProtocolCodec.scala | 8 +- .../leveldb/replicated/SlaveLevelDBStore.scala | 20 +++-- .../leveldb/replicated/TransportHandler.scala | 19 +++-- .../test/ReplicatedLevelDBStoreTest.java | 88 ++++++++++++++++++-- .../leveldb/test/ReplicationTestSupport.java | 17 ++-- 6 files changed, 187 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/554efe3e/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala index 0381627..249e0c4 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala @@ -28,6 +28,7 @@ import java.io.{IOException, File} import java.net.{SocketAddress, InetSocketAddress, URI} import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import scala.reflect.BeanProperty +import org.fusesource.hawtbuf.{Buffer, AsciiBuffer} class PositionSync(val position:Long, count:Int) extends CountDownLatch(count) @@ -132,7 +133,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { def start_protocol_server = { transport_server = new TcpTransportServer(new URI(bind)) transport_server.setBlockingExecutor(blocking_executor) - transport_server.setDispatchQueue(createQueue("replication server")) + transport_server.setDispatchQueue(createQueue("master: "+node_id)) transport_server.setTransportServerListener(new TransportServerListener(){ def onAccept(transport: Transport) { transport.setDispatchQueue(createQueue("connection from "+transport.getRemoteAddress)) @@ -266,7 +267,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { sendError("Invalid length") } sendOk(null) - send(FileTransferFrame(file, req.offset, req.length)) + send(new FileTransferFrame(file, req.offset, req.length)) } } @@ -282,6 +283,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { def start(session:Session) = { debug("SlaveState:start") socketAddress = session.transport.getRemoteAddress + session.queue.setLabel(transport_server.getDispatchQueue.getLabel+" -> "+slave_id) val resp = this.synchronized { if( this.session!=null ) { @@ -311,16 +313,69 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { } } - def replicate_wal(frame1:ReplicationFrame, frame2:FileTransferFrame=null ) = { + def queue(func: (Session)=>Unit) = { val h = this.synchronized { session } if( h !=null ) { h.queue { - h.send(frame1) - if( frame2!=null ) { - h.send(frame2) - } + func(session) + } + } + } + + def replicate(value:LogDelete):Unit = { + val frame = new ReplicationFrame(LOG_DELETE_ACTION, JsonCodec.encode(value)) + queue { session => + session.send(frame) + } + } + + var unflushed_replication_frame:DeferredReplicationFrame = null + + class DeferredReplicationFrame(file:File, val position:Long, _offset:Long, initialLength:Long) extends ReplicationFrame(WAL_ACTION, null) { + val fileTransferFrame = new FileTransferFrame(file, _offset, initialLength) + var encoded:Buffer = null + + def offset = fileTransferFrame.offset + def length = fileTransferFrame.length + + override def body: Buffer = { + if( encoded==null ) { + val value = new LogWrite + value.file = position; + value.offset = offset; + value.length = fileTransferFrame.length + value.date = date + encoded = JsonCodec.encode(value) + } + encoded + } + } + + def replicate(file:File, position:Long, offset:Long, length:Long):Unit = { + queue { session => + + // Check to see if we can merge the replication event /w the previous event.. + if( unflushed_replication_frame == null || + unflushed_replication_frame.position!=position || + (unflushed_replication_frame.offset+unflushed_replication_frame.length)!=offset ) { + + // We could not merge the replication event /w the previous event.. + val frame = new DeferredReplicationFrame(file, position, offset, length) + unflushed_replication_frame = frame + session.send(frame, ()=>{ + trace("%s: Sent WAL update: (file:%s, offset: %d, length: %d) to %s", directory, file, frame.offset, frame.length, slave_id) + if( unflushed_replication_frame eq frame ) { + unflushed_replication_frame = null + } + }) + session.send(frame.fileTransferFrame) + + } else { + // We were able to merge.. yay! + assert(unflushed_replication_frame.encoded == null) + unflushed_replication_frame.fileTransferFrame.length += length } } } @@ -392,18 +447,8 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { def replicate_wal(file:File, position:Long, offset:Long, length:Long):Unit = { if( length > 0 ) { - val value = new LogWrite - value.file = position; - value.offset = offset; - value.length = length - value.date = date - wal_date = value.date; - value.sync = (syncToMask & SYNC_TO_REMOTE_DISK)!=0 - trace("%s: Sending WAL update: (file:%d, offset: %d, length: %d)", directory, value.file, value.offset, value.length) - val frame1 = ReplicationFrame(WAL_ACTION, JsonCodec.encode(value)) - val frame2 = FileTransferFrame(file, offset, length) for( slave <- slaves.values() ) { - slave.replicate_wal(frame1, frame2) + slave.replicate(file, position, offset, length) } } } @@ -411,9 +456,8 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { def replicate_log_delete(log:Long):Unit = { val value = new LogDelete value.log = log - val frame = ReplicationFrame(LOG_DELETE_ACTION, JsonCodec.encode(value)) for( slave <- slaves.values() ) { - slave.replicate_wal(frame) + slave.replicate(value) } } http://git-wip-us.apache.org/repos/asf/activemq/blob/554efe3e/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationProtocolCodec.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationProtocolCodec.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationProtocolCodec.scala index 7e075e4..b218f4c 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationProtocolCodec.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationProtocolCodec.scala @@ -25,8 +25,10 @@ import java.io.{OutputStream, File} import org.fusesource.hawtdispatch.transport.ProtocolCodec.BufferState import java.util -case class ReplicationFrame(action:AsciiBuffer, body:Buffer) -case class FileTransferFrame(file:File, offset:Long, length:Long) +class ReplicationFrame(val action:AsciiBuffer, _body:Buffer) { + def body = _body +} +class FileTransferFrame(val file:File, val offset:Long, var length:Long) class ReplicationProtocolCodec extends AbstractProtocolCodec { import ReplicationSupport._ @@ -86,7 +88,7 @@ class ReplicationProtocolCodec extends AbstractProtocolCodec { if( data!=null ) { data.moveTail(-1); nextDecodeAction = readHeader - ReplicationFrame(action, data) + new ReplicationFrame(action, data) } else { null } http://git-wip-us.apache.org/repos/asf/activemq/blob/554efe3e/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala index 07ef0ee..2fd7c1e 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala @@ -64,6 +64,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { } override def doStart() = { + queue.setLabel("slave: "+node_id) client.init() if (purgeOnStatup) { purgeOnStatup = false @@ -97,10 +98,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { } def start_slave_connections = { - val transport = new TcpTransport() - transport.setBlockingExecutor(blocking_executor) - transport.setDispatchQueue(queue) - transport.connecting(new URI(connect), null) + val transport: TcpTransport = create_transport status = "Attaching to master: "+connect info(status) @@ -120,6 +118,14 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { wal_session.start } + def create_transport: TcpTransport = { + val transport = new TcpTransport() + transport.setBlockingExecutor(blocking_executor) + transport.setDispatchQueue(queue) + transport.connecting(new URI(connect), null) + transport + } + def stop_connections(cb:Task) = { var then = ^{ unstash(directory) @@ -156,7 +162,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { val ack = new WalAck() ack.position = wal_append_position // info("Sending ack: "+wal_append_position) - wal_session.send(ACK_ACTION, ack) + wal_session.send_replication_frame(ACK_ACTION, ack) if( replay_from != ack.position ) { val old_replay_from = replay_from replay_from = ack.position @@ -240,7 +246,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { } def disconnect(cb:Task) = queue { - send(DISCONNECT_ACTION, null) + send_replication_frame(DISCONNECT_ACTION, null) transport.flush() transport.stop(cb) } @@ -268,7 +274,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { def request(action:AsciiBuffer, body:AnyRef)(cb:(ReplicationFrame)=>Unit) = { response_callbacks.addLast(cb) - send(action, body) + send_replication_frame(action, body) } def response_handler: (AnyRef)=>Unit = (command)=> { http://git-wip-us.apache.org/repos/asf/activemq/blob/554efe3e/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala index b13b680..d516703 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala @@ -28,7 +28,7 @@ import org.fusesource.hawtbuf.AsciiBuffer */ abstract class TransportHandler(val transport: Transport) extends TransportListener { - var outbound = new util.LinkedList[AnyRef]() + var outbound = new util.LinkedList[(AnyRef, ()=>Unit)]() val codec = new ReplicationProtocolCodec transport.setProtocolCodec(codec) @@ -45,23 +45,26 @@ abstract class TransportHandler(val transport: Transport) extends TransportListe def drain:Unit = { while( !outbound.isEmpty ) { - val value = outbound.peekFirst() + val (value, on_send) = outbound.peekFirst() if( transport.offer(value) ) { outbound.removeFirst() + if( on_send!=null ) { + on_send() + } } else { return } } } - - def send(value:AnyRef):Unit = { + def send(value:AnyRef):Unit = send(value, null) + def send(value:AnyRef, on_send: ()=>Unit):Unit = { transport.getDispatchQueue.assertExecuting() - outbound.add(value) + outbound.add((value, on_send)) drain } - def send(action:AsciiBuffer, body:AnyRef):Unit = send(ReplicationFrame(action, if(body==null) null else JsonCodec.encode(body))) - def sendError(error:String) = send(ERROR_ACTION, error) - def sendOk(body:AnyRef) = send(OK_ACTION, body) + def send_replication_frame(action:AsciiBuffer, body:AnyRef):Unit = send(new ReplicationFrame(action, if(body==null) null else JsonCodec.encode(body))) + def sendError(error:String) = send_replication_frame(ERROR_ACTION, error) + def sendOk(body:AnyRef) = send_replication_frame(OK_ACTION, body) } http://git-wip-us.apache.org/repos/asf/activemq/blob/554efe3e/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java index f7be2af..119b08f 100644 --- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java +++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java @@ -24,6 +24,7 @@ import org.apache.activemq.leveldb.replicated.MasterLevelDBStore; import org.apache.activemq.leveldb.replicated.SlaveLevelDBStore; import org.apache.activemq.leveldb.util.FileSupport; import org.apache.activemq.store.MessageStore; +import org.fusesource.hawtdispatch.transport.TcpTransport; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +35,7 @@ import java.util.LinkedList; import java.util.concurrent.TimeUnit; import static org.apache.activemq.leveldb.test.ReplicationTestSupport.addMessage; +import static org.apache.activemq.leveldb.test.ReplicationTestSupport.createPlayload; import static org.apache.activemq.leveldb.test.ReplicationTestSupport.getMessages; import static org.junit.Assert.*; @@ -170,13 +172,87 @@ public class ReplicatedLevelDBStoreTest { } } + @Test(timeout = 1000*60*60) + public void testSlowSlave() throws Exception { + + File node1Dir = new File("target/activemq-data/leveldb-node1"); + File node2Dir = new File("target/activemq-data/leveldb-node2"); + File node3Dir = new File("target/activemq-data/leveldb-node3"); + + FileSupport.toRichFile(node1Dir).recursiveDelete(); + FileSupport.toRichFile(node2Dir).recursiveDelete(); + FileSupport.toRichFile(node3Dir).recursiveDelete(); + + node2Dir.mkdirs(); + node3Dir.mkdirs(); + FileSupport.toRichFile(new File(node2Dir, "nodeid.txt")).writeText("node2", "UTF-8"); + FileSupport.toRichFile(new File(node3Dir, "nodeid.txt")).writeText("node3", "UTF-8"); + + + ArrayList expected_list = new ArrayList(); + + MasterLevelDBStore node1 = createMaster(node1Dir); + CountDownFuture masterStart = asyncStart(node1); + + // Lets create a 1 slow slave... + SlaveLevelDBStore node2 = new SlaveLevelDBStore() { + boolean hitOnce = false; + @Override + public TcpTransport create_transport() { + if( hitOnce ) { + return super.create_transport(); + } + hitOnce = true; + TcpTransport transport = super.create_transport(); + transport.setMaxReadRate(64*1024); + return transport; + } + }; + configureSlave(node2, node1, node2Dir); + SlaveLevelDBStore node3 = createSlave(node1, node3Dir); + + asyncStart(node2); + asyncStart(node3); + masterStart.await(); + + LOG.info("Adding messages..."); + String playload = createPlayload(64 * 1024); + MessageStore ms = node1.createQueueMessageStore(new ActiveMQQueue("TEST")); + final int TOTAL = 10; + for (int i = 0; i < TOTAL; i++) { + if (i == 8) { + // Stop the fast slave so that we wait for the slow slave to + // catch up.. + node3.stop(); + } + + String msgid = "m:" + ":" + i; + addMessage(ms, msgid, playload); + expected_list.add(msgid); + } + + LOG.info("Checking node1 state"); + assertEquals(expected_list, getMessages(ms)); + + LOG.info("Stopping node1: " + node1.node_id()); + node1.stop(); + LOG.info("Stopping slave: " + node2.node_id()); + node2.stop(); + } + + private SlaveLevelDBStore createSlave(MasterLevelDBStore master, File directory) { - SlaveLevelDBStore slave1 = new SlaveLevelDBStore(); - slave1.setDirectory(directory); - slave1.setConnect("tcp://127.0.0.1:" + master.getPort()); - slave1.setSecurityToken("foo"); - slave1.setLogSize(1023 * 200); - return slave1; + SlaveLevelDBStore slave = new SlaveLevelDBStore(); + configureSlave(slave, master, directory); + return slave; + } + + private SlaveLevelDBStore configureSlave(SlaveLevelDBStore slave, MasterLevelDBStore master, File directory) { + slave.setDirectory(directory); + slave.setConnect("tcp://127.0.0.1:" + master.getPort()); + slave.setSecurityToken("foo"); + slave.setLogSize(1023 * 200); + return slave; } private MasterLevelDBStore createMaster(File directory) { http://git-wip-us.apache.org/repos/asf/activemq/blob/554efe3e/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java index b3576cf..181d11d 100644 --- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java +++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java @@ -32,18 +32,25 @@ import java.util.ArrayList; public class ReplicationTestSupport { static long id_counter = 0L; - static String payload = ""; - { - for (int i = 0; i < 1024; i++) { + static String payload = createPlayload(1024); + + public static String createPlayload(int size) { + String payload = ""; + for (int i = 0; i < size; i++) { payload += "x"; } + return payload; + } + + static public ActiveMQTextMessage addMessage(MessageStore ms, String id) throws JMSException, IOException { + return addMessage(ms, id, payload); } - static public ActiveMQTextMessage addMessage(MessageStore ms, String body) throws JMSException, IOException { + static public ActiveMQTextMessage addMessage(MessageStore ms, String id, String payload) throws JMSException, IOException { ActiveMQTextMessage message = new ActiveMQTextMessage(); message.setPersistent(true); message.setResponseRequired(true); - message.setStringProperty("id", body); + message.setStringProperty("id", id); message.setText(payload); id_counter += 1; MessageId messageId = new MessageId("ID:localhost-56913-1254499826208-0:0:1:1:" + id_counter);