activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject git commit: Fixes bug in replicated leveldb where log files on slaves were not getting GCed.
Date Mon, 16 Sep 2013 15:59:25 GMT
Updated Branches:
  refs/heads/trunk a69379d5f -> d771ebb97


Fixes bug in replicated leveldb where log files on slaves were not getting GCed.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d771ebb9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d771ebb9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d771ebb9

Branch: refs/heads/trunk
Commit: d771ebb97e9c01e5751d76bd043dbd4f6ae73253
Parents: a69379d
Author: Hiram Chirino <hiram@hiramchirino.com>
Authored: Mon Sep 16 11:58:03 2013 -0400
Committer: Hiram Chirino <hiram@hiramchirino.com>
Committed: Mon Sep 16 11:58:03 2013 -0400

----------------------------------------------------------------------
 .../leveldb/replicated/dto/LogDelete.java       | 36 ++++++++++++++++++++
 .../org/apache/activemq/leveldb/RecordLog.scala |  4 +++
 .../replicated/MasterLevelDBClient.scala        |  5 +++
 .../leveldb/replicated/MasterLevelDBStore.scala | 15 ++++++--
 .../leveldb/replicated/ReplicationSupport.scala |  1 +
 .../leveldb/replicated/SlaveLevelDBStore.scala  | 15 ++++++++
 6 files changed, 74 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/d771ebb9/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogDelete.java
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogDelete.java
b/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogDelete.java
new file mode 100644
index 0000000..6c44e69
--- /dev/null
+++ b/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogDelete.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.leveldb.replicated.dto;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+@XmlRootElement(name="remove_request")
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class LogDelete {
+    @XmlAttribute(name="log")
+    public long log;
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/d771ebb9/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
index 1cbf95b..5a4629e 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
@@ -88,12 +88,16 @@ case class RecordLog(directory: File, logSuffix:String) {
       if( current_appender.position != id ) {
         Option(log_infos.get(id)).foreach { info =>
           onDelete(info.file)
+          onDelete(id)
           log_infos.remove(id)
         }
       }
     }
   }
 
+  protected def onDelete(file:Long) = {
+  }
+
   protected def onDelete(file:File) = {
     file.delete()
   }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d771ebb9/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala
index 1d347f4..95a7e73 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBClient.scala
@@ -150,5 +150,10 @@ class MasterLevelDBClient(val store:MasterLevelDBStore) extends LevelDBClient(st
 
       }
     }
+
+    override protected def onDelete(file: Long) = {
+      super.onDelete(file)
+      store.replicate_log_delete(file)
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d771ebb9/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
index 0318eff..6572aa9 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
@@ -309,14 +309,16 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait
{
       }
     }
 
-    def replicate_wal(frame1:ReplicationFrame, frame2:FileTransferFrame ) = {
+    def replicate_wal(frame1:ReplicationFrame, frame2:FileTransferFrame=null ) = {
       val h = this.synchronized {
         session
       }
       if( h !=null ) {
         h.queue {
           h.send(frame1)
-          h.send(frame2)
+          if( frame2!=null ) {
+            h.send(frame2)
+          }
         }
       }
     }
@@ -400,6 +402,15 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait
{
     }
   }
 
+  def replicate_log_delete(log:Long):Unit = {
+    val value = new LogDelete
+    value.log = log
+    val frame = ReplicationFrame(LOG_DELETE_ACTION, JsonCodec.encode(value))
+    for( slave <- slaves.values() ) {
+      slave.replicate_wal(frame)
+    }
+  }
+
   def wal_append_position = client.wal_append_position
 
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/d771ebb9/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala
index 0b45bf6..4e41a2e 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ReplicationSupport.scala
@@ -38,6 +38,7 @@ object ReplicationSupport {
   val OK_ACTION = ascii("ok")
   val DISCONNECT_ACTION = ascii("disconnect")
   val ERROR_ACTION = ascii("error")
+  val LOG_DELETE_ACTION = ascii("rm")
 
   def unmap(buffer:MappedByteBuffer ) {
     try {

http://git-wip-us.apache.org/repos/asf/activemq/blob/d771ebb9/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
index 0d14e6d..cf22cf9 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
@@ -152,6 +152,8 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait
{
     }
   }
 
+  val pending_log_removes = new util.ArrayList[Long]()
+
   def wal_handler(session:Session): (AnyRef)=>Unit = (command)=>{
     command match {
       case command:ReplicationFrame =>
@@ -178,6 +180,15 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait
{
                 send_wal_ack
               }
             })
+          case LOG_DELETE_ACTION =>
+
+            val value = JsonCodec.decode(command.body, classOf[LogDelete])
+            if( !caughtUp ) {
+              pending_log_removes.add(value.log)
+            } else {
+              client.log.delete(value.log)
+            }
+
           case OK_ACTION =>
             // This comes in as response to a disconnect we send.
           case _ => session.fail("Unexpected command action: "+command.action)
@@ -394,6 +405,10 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait
{
         caughtUp = true
         client.log.open(wal_append_offset)
         send_wal_ack
+        for( i <- pending_log_removes ) {
+          client.log.delete(i);
+        }
+        pending_log_removes.clear()
       }
     })
     state.snapshot_position


Mime
View raw message