activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1053982 - in /activemq/activemq-apollo/trunk: apollo-bdb/ apollo-bdb/src/main/proto/ apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-broker/ apollo-broker/src/main/proto/ apollo-broker/src/main/scala/org/apach...
Date Thu, 30 Dec 2010 19:29:38 GMT
Author: chirino
Date: Thu Dec 30 19:29:37 2010
New Revision: 1053982

URL: http://svn.apache.org/viewvc?rev=1053982&view=rev
Log:
refactored the common protobuf classes to the broker module.

Added:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/
    activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
      - copied, changed from r1053959, activemq/activemq-apollo/trunk/apollo-bdb/src/main/proto/data.proto
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
Removed:
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/proto/data.proto
    activemq/activemq-apollo/trunk/apollo-cassandra/src/main/proto/data.proto
    activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/proto/data.proto
Modified:
    activemq/activemq-apollo/trunk/apollo-bdb/pom.xml
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
    activemq/activemq-apollo/trunk/apollo-broker/pom.xml
    activemq/activemq-apollo/trunk/apollo-cassandra/pom.xml
    activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala
    activemq/activemq-apollo/trunk/apollo-jdbm2/pom.xml
    activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala

Modified: activemq/activemq-apollo/trunk/apollo-bdb/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/pom.xml?rev=1053982&r1=1053981&r2=1053982&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/pom.xml Thu Dec 30 19:29:37 2010
@@ -45,18 +45,7 @@
       <artifactId>je</artifactId>
       <version>${bdb-version}</version>
     </dependency>
-    
-    <dependency>
-      <groupId>org.fusesource.hawtbuf</groupId>
-      <artifactId>hawtbuf-proto</artifactId>
-      <version>${hawtbuf-version}</version>
-    </dependency>
 
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-      <version>${slf4j-version}</version>
-    </dependency>
 
     <!-- Since we implement a jade template to display the BDB status -->
     <dependency>
@@ -130,22 +119,6 @@
     <plugins>
 
       <plugin>
-        <groupId>org.fusesource.hawtbuf</groupId>
-        <artifactId>hawtbuf-proto</artifactId>
-        <version>${hawtbuf-version}</version>
-        <configuration>
-          <type>alt</type>
-        </configuration>
-         <executions>
-          <execution>
-            <goals>
-              <goal>compile</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      
-      <plugin>
         <groupId>org.fusesource.scalate</groupId>
         <artifactId>maven-scalate-plugin</artifactId>
         <version>${scalate-version}</version>

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1053982&r1=1053981&r2=1053982&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
Thu Dec 30 19:29:37 2010
@@ -280,7 +280,7 @@ class BDBClient(store: BDBStore) extends
   def getQueue(queue_key: Long): Option[QueueRecord] = {
     with_ctx { ctx=>
       import ctx._
-      queues_db.get(tx, to_DatabaseEntry(queue_key)).map( x=> to_QueueRecord(x)  )
+      queues_db.get(tx, to_database_entry(queue_key)).map( x=> to_queue_record(x)  )
     }
   }
 
@@ -329,7 +329,7 @@ class BDBClient(store: BDBStore) extends
       import ctx._
 
       with_entries_db(queue_key) { entries_db=>
-        entries_db.cursor_from(tx, to_DatabaseEntry(firstSeq)) { (key, value) =>
+        entries_db.cursor_from(tx, to_database_entry(firstSeq)) { (key, value) =>
           val entry_seq:Long = key
           val entry:QueueEntryRecord = value
           rc += entry
@@ -349,7 +349,7 @@ class BDBClient(store: BDBStore) extends
 
       requests.flatMap { case (message_key, callback)=>
         val record = metric_load_from_index_counter.time {
-          messages_db.get(tx, to_DatabaseEntry(message_key)).map ( to_MessageRecord _ )
+          messages_db.get(tx, to_database_entry(message_key)).map ( to_message_record _ )
         }
         record match {
           case None =>

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala?rev=1053982&r1=1053981&r2=1053982&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
Thu Dec 30 19:29:37 2010
@@ -16,85 +16,33 @@
  */
 package org.apache.activemq.apollo.broker.store.bdb
 
-import model._
-import org.apache.activemq.apollo.broker.store.{MessageRecord, QueueRecord, QueueEntryRecord}
 import java.util.Comparator
 import java.nio.ByteBuffer
 import com.sleepycat.je._
 import java.io.Serializable
+import org.apache.activemq.apollo.broker.store.{PBSupport, MessageRecord, QueueRecord, QueueEntryRecord}
+import PBSupport._
 
 object HelperTrait {
 
-  implicit def to_MessageRecord(entry: DatabaseEntry): MessageRecord = {
-    val pb =  MessagePB.FACTORY.parseUnframed(entry.getData)
-    val rc = new MessageRecord
-    rc.key = pb.getMessageKey
-    rc.protocol = pb.getProtocol
-    rc.size = pb.getSize
-    rc.buffer = pb.getValue
-    rc.expiration = pb.getExpiration
-    rc
-  }
-
-  implicit def to_DatabaseEntry(v: MessageRecord): DatabaseEntry = {
-    val pb = new MessagePB.Bean
-    pb.setMessageKey(v.key)
-    pb.setProtocol(v.protocol)
-    pb.setSize(v.size)
-    pb.setValue(v.buffer)
-    pb.setExpiration(v.expiration)
-    new DatabaseEntry(pb.freeze.toUnframedByteArray)
-  }
-
-  implicit def to_QueueEntryRecord(entry: DatabaseEntry): QueueEntryRecord = {
-    val pb =  QueueEntryPB.FACTORY.parseUnframed(entry.getData)
-    val rc = new QueueEntryRecord
-    rc.queue_key = pb.getQueueKey
-    rc.entry_seq = pb.getQueueSeq
-    rc.message_key = pb.getMessageKey
-    rc.attachment = pb.getAttachment
-    rc.size = pb.getSize
-    rc.redeliveries = pb.getRedeliveries.toShort
-    rc
-  }
+  implicit def to_message_record(entry: DatabaseEntry): MessageRecord = entry.getData
+  implicit def to_database_entry(v: MessageRecord): DatabaseEntry = new DatabaseEntry(v)
 
-  implicit def to_DatabaseEntry(v: QueueEntryRecord): DatabaseEntry = {
-    val pb = new QueueEntryPB.Bean
-    pb.setQueueKey(v.queue_key)
-    pb.setQueueSeq(v.entry_seq)
-    pb.setMessageKey(v.message_key)
-    pb.setAttachment(v.attachment)
-    pb.setSize(v.size)
-    pb.setRedeliveries(v.redeliveries)
-    new DatabaseEntry(pb.freeze.toUnframedByteArray)
-  }
+  implicit def to_queue_entry_record(entry: DatabaseEntry): QueueEntryRecord = entry.getData
+  implicit def to_database_entry(v: QueueEntryRecord): DatabaseEntry = new DatabaseEntry(v)
 
-  implicit def to_QueueRecord(entry: DatabaseEntry): QueueRecord = {
-    val pb = QueuePB.FACTORY.parseUnframed(entry.getData)
-    val rc = new QueueRecord
-    rc.key = pb.getKey
-    rc.binding_data = pb.getBindingData
-    rc.binding_kind = pb.getBindingKind
-    rc
-  }
-
-  implicit def to_DatabaseEntry(v: QueueRecord): DatabaseEntry = {
-    val pb = new QueuePB.Bean
-    pb.setKey(v.key)
-    pb.setBindingData(v.binding_data)
-    pb.setBindingKind(v.binding_kind)
-    new DatabaseEntry(pb.freeze.toUnframedByteArray)
-  }
+  implicit def to_queue_record(entry: DatabaseEntry): QueueRecord = entry.getData
+  implicit def to_database_entry(v: QueueRecord): DatabaseEntry = new DatabaseEntry(v)
 
 
   implicit def to_bytes(l:Long):Array[Byte] = ByteBuffer.wrap(new Array[Byte](8)).putLong(l).array()
   implicit def to_long(bytes:Array[Byte]):Long = ByteBuffer.wrap(bytes).getLong()
-  implicit def to_DatabaseEntry(l:Long):DatabaseEntry = new DatabaseEntry(to_bytes(l))
+  implicit def to_database_entry(l:Long):DatabaseEntry = new DatabaseEntry(to_bytes(l))
   implicit def to_long(value:DatabaseEntry):Long = to_long(value.getData)
 
   implicit def to_bytes(l:Int):Array[Byte] = ByteBuffer.wrap(new Array[Byte](4)).putInt(l).array()
   implicit def to_int(bytes:Array[Byte]):Int = ByteBuffer.wrap(bytes).getInt()
-  implicit def to_DatabaseEntry(l:Int):DatabaseEntry = new DatabaseEntry(to_bytes(l))
+  implicit def to_database_entry(l:Int):DatabaseEntry = new DatabaseEntry(to_bytes(l))
   implicit def to_int(value:DatabaseEntry):Int = to_int(value.getData)
 
 
@@ -168,7 +116,8 @@ object HelperTrait {
       }
     }
   }
-  implicit def DatabaseWrapper(x: Database) = new RichDatabase(x)
+
+  implicit def to_rich_database(x: Database) = new RichDatabase(x)
 
 
   def entries_db_name(queue_key: Long): String =  "entries-" + queue_key

Modified: activemq/activemq-apollo/trunk/apollo-broker/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/pom.xml?rev=1053982&r1=1053981&r2=1053982&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/pom.xml Thu Dec 30 19:29:37 2010
@@ -67,6 +67,12 @@
       <version>${jasypt-version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.fusesource.hawtbuf</groupId>
+      <artifactId>hawtbuf-proto</artifactId>
+      <version>${hawtbuf-version}</version>
+    </dependency>
+
 
     <!-- Scala Support -->
     <dependency>
@@ -134,6 +140,22 @@
         </executions>
       </plugin>
 
+      <plugin>
+        <groupId>org.fusesource.hawtbuf</groupId>
+        <artifactId>hawtbuf-proto</artifactId>
+        <version>${hawtbuf-version}</version>
+        <configuration>
+          <type>alt</type>
+        </configuration>
+         <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
     </plugins>
   </build>
 

Copied: activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto (from r1053959,
activemq/activemq-apollo/trunk/apollo-bdb/src/main/proto/data.proto)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto&p1=activemq/activemq-apollo/trunk/apollo-bdb/src/main/proto/data.proto&r1=1053959&r2=1053982&rev=1053982&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/proto/data.proto (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto Thu Dec 30 19:29:37
2010
@@ -14,7 +14,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 //
-package org.apache.activemq.apollo.broker.store.bdb.model;
+package org.apache.activemq.apollo.broker.store;
 
 option java_multiple_files = true;
 

Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala?rev=1053982&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
Thu Dec 30 19:29:37 2010
@@ -0,0 +1,108 @@
+package org.apache.activemq.apollo.broker.store
+
+import java.io.{OutputStream, InputStream}
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object PBSupport {
+
+  implicit def to_pb(v: MessageRecord):MessagePB.Buffer = {
+    val pb = new MessagePB.Bean
+    pb.setMessageKey(v.key)
+    pb.setProtocol(v.protocol)
+    pb.setSize(v.size)
+    pb.setValue(v.buffer)
+    pb.setExpiration(v.expiration)
+    pb.freeze
+  }
+
+  implicit def from_pb(pb: MessagePB.Getter):MessageRecord = {
+    val rc = new MessageRecord
+    rc.key = pb.getMessageKey
+    rc.protocol = pb.getProtocol
+    rc.size = pb.getSize
+    rc.buffer = pb.getValue
+    rc.expiration = pb.getExpiration
+    rc
+  }
+
+  def encode_message_record(out: OutputStream, v: MessageRecord) = to_pb(v).writeUnframed(out)
+  def decode_message_record(in: InputStream):MessageRecord = MessagePB.FACTORY.parseUnframed(in)
+
+  implicit def encode_message_record(v: MessageRecord) = to_pb(v).toUnframedByteArray
+  implicit def decode_message_record(data: Array[Byte]):MessageRecord = MessagePB.FACTORY.parseUnframed(data)
+
+
+
+  implicit def to_pb(v: QueueRecord):QueuePB.Buffer = {
+    val pb = new QueuePB.Bean
+    pb.setKey(v.key)
+    pb.setBindingData(v.binding_data)
+    pb.setBindingKind(v.binding_kind)
+    pb.freeze
+  }
+
+  implicit def from_pb(pb: QueuePB.Getter):QueueRecord = {
+    val rc = new QueueRecord
+    rc.key = pb.getKey
+    rc.binding_data = pb.getBindingData
+    rc.binding_kind = pb.getBindingKind
+    rc
+  }
+
+  def encode_queue_record(out: OutputStream, v: QueueRecord) = to_pb(v).writeUnframed(out)
+  def decode_queue_record(in: InputStream):QueueRecord = QueuePB.FACTORY.parseUnframed(in)
+
+  implicit def encode_queue_record(v: QueueRecord) = to_pb(v).toUnframedByteArray
+  implicit def decode_queue_record(data: Array[Byte]):QueueRecord = QueuePB.FACTORY.parseUnframed(data)
+
+
+  implicit def to_pb(v: QueueEntryRecord):QueueEntryPB.Buffer = {
+    val pb = new QueueEntryPB.Bean
+    pb.setQueueKey(v.queue_key)
+    pb.setQueueSeq(v.entry_seq)
+    pb.setMessageKey(v.message_key)
+    pb.setAttachment(v.attachment)
+    pb.setSize(v.size)
+    pb.setRedeliveries(v.redeliveries)
+    pb.freeze
+  }
+
+  implicit def from_pb(pb: QueueEntryPB.Getter):QueueEntryRecord = {
+    val rc = new QueueEntryRecord
+    rc.queue_key = pb.getQueueKey
+    rc.entry_seq = pb.getQueueSeq
+    rc.message_key = pb.getMessageKey
+    rc.attachment = pb.getAttachment
+    rc.size = pb.getSize
+    rc.redeliveries = pb.getRedeliveries.toShort
+    rc
+  }
+
+  def encode_queue_entry_record(out: OutputStream, v: QueueEntryRecord) = to_pb(v).writeUnframed(out)
+  def decode_queue_entry_record(in: InputStream):QueueEntryRecord = QueueEntryPB.FACTORY.parseUnframed(in)
+
+  implicit def encode_queue_entry_record(v: QueueEntryRecord) = to_pb(v).toUnframedByteArray
+  implicit def decode_queue_entry_record(data: Array[Byte]):QueueEntryRecord = QueueEntryPB.FACTORY.parseUnframed(data)
+
+}
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-cassandra/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cassandra/pom.xml?rev=1053982&r1=1053981&r2=1053982&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cassandra/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-cassandra/pom.xml Thu Dec 30 19:29:37 2010
@@ -53,18 +53,6 @@
       <version>${cascal-version}</version>
     </dependency>
 
-    <dependency>
-      <groupId>org.fusesource.hawtbuf</groupId>
-      <artifactId>hawtbuf-proto</artifactId>
-      <version>${hawtbuf-version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-      <version>${slf4j-version}</version>
-    </dependency>
-    
-
     <!-- Scala Support -->
     <dependency>
       <groupId>org.scala-lang</groupId>
@@ -170,22 +158,6 @@
   <build>
     <plugins>
 
-      <plugin>
-        <groupId>org.fusesource.hawtbuf</groupId>
-        <artifactId>hawtbuf-proto</artifactId>
-        <version>${hawtbuf-version}</version>
-        <configuration>
-          <type>alt</type>
-        </configuration>
-         <executions>
-          <execution>
-            <goals>
-              <goal>compile</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-
       <!-- Tests are failing on windows, need to investigate -->
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>

Modified: activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala?rev=1053982&r1=1053981&r2=1053982&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraClient.scala
Thu Dec 30 19:29:37 2010
@@ -18,11 +18,10 @@ package org.apache.activemq.apollo.broke
 
 import com.shorrockin.cascal.session._
 import com.shorrockin.cascal.utils.Conversions._
-import java.util.{HashMap}
-import org.fusesource.hawtbuf.AsciiBuffer._
-import org.fusesource.hawtbuf.{AsciiBuffer, DataByteArrayInputStream, DataByteArrayOutputStream,
Buffer}
+import org.fusesource.hawtbuf.Buffer
 import org.apache.activemq.apollo.broker.store._
 import collection.mutable.ListBuffer
+import org.apache.activemq.apollo.broker.store.PBSupport._
 
 /**
  *
@@ -54,64 +53,6 @@ class CassandraClient() {
     }
   }
 
-  def decodeMessageRecord(v: Array[Byte]): MessageRecord = {
-    import PBMessageRecord._
-    val pb = PBMessageRecord.FACTORY.parseUnframed(v)
-    val rc = new MessageRecord
-    rc.protocol = pb.getProtocol
-    rc.size = pb.getSize
-    rc.buffer = pb.getValue
-    rc.expiration = pb.getExpiration
-    rc
-  }
-
-  def encodeMessageRecord(v: MessageRecord): Array[Byte] = {
-    val pb = new PBMessageRecord.Bean
-    pb.setProtocol(v.protocol)
-    pb.setSize(v.size)
-    pb.setValue(v.buffer)
-    pb.setExpiration(v.expiration)
-    pb.freeze.toUnframedByteArray
-  }
-  
-  implicit def decodeQueueEntryRecord(v: Array[Byte]): QueueEntryRecord = {
-    import PBQueueEntryRecord._
-    val pb = PBQueueEntryRecord.FACTORY.parseUnframed(v)
-    val rc = new QueueEntryRecord
-    rc.message_key = pb.getMessageKey
-    rc.attachment = pb.getAttachment
-    rc.size = pb.getSize
-    rc.redeliveries = pb.getRedeliveries.toShort
-    rc
-  }
-
-  implicit def encodeQueueEntryRecord(v: QueueEntryRecord): Array[Byte] = {
-    val pb = new PBQueueEntryRecord.Bean
-    pb.setMessageKey(v.message_key)
-    pb.setAttachment(v.attachment)
-    pb.setSize(v.size)
-    pb.setRedeliveries(v.redeliveries)
-    pb.freeze.toUnframedByteArray
-  }
-
-  implicit def decodeQueueRecord(v: Array[Byte]): QueueRecord = {
-    import PBQueueRecord._
-    val pb = PBQueueRecord.FACTORY.parseUnframed(v)
-    val rc = new QueueRecord
-    rc.key = pb.getKey
-    rc.binding_kind = pb.getBindingKind
-    rc.binding_data = pb.getBindingData
-    rc
-  }
-
-  implicit def encodeQueueRecord(v: QueueRecord): Array[Byte] = {
-    val pb = new PBQueueRecord.Bean
-    pb.setKey(v.key)
-    pb.setBindingKind(v.binding_kind)
-    pb.setBindingData(v.binding_data)
-    pb.freeze.toUnframedByteArray
-  }
-
   def purge() = {
     withSession {
       session =>
@@ -176,7 +117,7 @@ class CassandraClient() {
               case (msg, action) =>
                 var rc =
                 if (action.messageRecord != null) {
-                  operations ::= Insert( schema.message_data \ (msg, encodeMessageRecord(action.messageRecord)
) ) 
+                  operations ::= Insert( schema.message_data \ (msg, action.messageRecord
) )
                 }
                 action.enqueues.foreach {
                   queueEntry =>
@@ -201,8 +142,7 @@ class CassandraClient() {
       session =>
         session.get(schema.message_data \ id) match {
           case Some(x) =>
-            val rc: MessageRecord = decodeMessageRecord(x.value)
-            rc.key = id
+            val rc: MessageRecord = x.value
             Some(rc)
           case None =>
             None
@@ -246,8 +186,6 @@ class CassandraClient() {
       session =>
         session.list(schema.entries \ queue_key, RangePredicate(firstSeq, lastSeq)).map {
x=>
           val rc:QueueEntryRecord = x.value
-          rc.queue_key = queue_key
-          rc.entry_seq = x.name
           rc
         }
     }

Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/pom.xml?rev=1053982&r1=1053981&r2=1053982&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/pom.xml Thu Dec 30 19:29:37 2010
@@ -45,18 +45,6 @@
       <artifactId>jdbm</artifactId>
       <version>${jdbm-version}</version>
     </dependency>
-    
-    <dependency>
-      <groupId>org.fusesource.hawtbuf</groupId>
-      <artifactId>hawtbuf-proto</artifactId>
-      <version>${hawtbuf-version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-      <version>${slf4j-version}</version>
-    </dependency>
 
     <!-- Since we implement a jade template to display the JDBM status -->
     <dependency>
@@ -130,22 +118,6 @@
     <plugins>
 
       <plugin>
-        <groupId>org.fusesource.hawtbuf</groupId>
-        <artifactId>hawtbuf-proto</artifactId>
-        <version>${hawtbuf-version}</version>
-        <configuration>
-          <type>alt</type>
-        </configuration>
-         <executions>
-          <execution>
-            <goals>
-              <goal>compile</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      
-      <plugin>
         <groupId>org.fusesource.scalate</groupId>
         <artifactId>maven-scalate-plugin</artifactId>
         <version>${scalate-version}</version>

Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala?rev=1053982&r1=1053981&r2=1053982&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
Thu Dec 30 19:29:37 2010
@@ -20,7 +20,6 @@ import dto.JDBM2StoreDTO
 import java.{lang=>jl}
 import java.{util=>ju}
 
-import java.util.concurrent.atomic.AtomicInteger
 import collection.mutable.ListBuffer
 import org.apache.activemq.apollo.broker.store._
 import org.apache.activemq.apollo.util._
@@ -28,80 +27,25 @@ import jdbm._
 import btree.BTree
 import htree.HTree
 import java.util.Comparator
-import model._
 import java.io.Serializable
 import jdbm.helper._
+import PBSupport._
 
 object JDBM2Client extends Log {
 
   object MessageRecordSerializer extends Serializer[MessageRecord] {
-
-    def serialize(out: SerializerOutput, v: MessageRecord) = {
-      val pb = new MessagePB.Bean
-      pb.setMessageKey(v.key)
-      pb.setProtocol(v.protocol)
-      pb.setSize(v.size)
-      pb.setValue(v.buffer)
-      pb.setExpiration(v.expiration)
-      pb.freeze.writeUnframed(out)
-    }
-
-    def deserialize(in: SerializerInput) = {
-      val pb =  MessagePB.FACTORY.parseUnframed(in)
-      val rc = new MessageRecord
-      rc.key = pb.getMessageKey
-      rc.protocol = pb.getProtocol
-      rc.size = pb.getSize
-      rc.buffer = pb.getValue
-      rc.expiration = pb.getExpiration
-      rc
-    }
+    def serialize(out: SerializerOutput, v: MessageRecord) = encode_message_record(out, v)
+    def deserialize(in: SerializerInput) = decode_message_record(in)
   }
 
   object QueueRecordSerializer extends Serializer[QueueRecord] {
-
-    def serialize(out: SerializerOutput, v: QueueRecord) = {
-      val pb = new QueuePB.Bean
-      pb.setKey(v.key)
-      pb.setBindingData(v.binding_data)
-      pb.setBindingKind(v.binding_kind)
-      pb.freeze.writeUnframed(out)
-    }
-
-    def deserialize(in: SerializerInput) = {
-      val pb = QueuePB.FACTORY.parseUnframed(in)
-      val rc = new QueueRecord
-      rc.key = pb.getKey
-      rc.binding_data = pb.getBindingData
-      rc.binding_kind = pb.getBindingKind
-      rc
-    }
+    def serialize(out: SerializerOutput, v: QueueRecord) = encode_queue_record(out, v)
+    def deserialize(in: SerializerInput) = decode_queue_record(in)
   }
 
   object QueueEntryRecordSerializer extends Serializer[QueueEntryRecord] {
-
-    def serialize(out: SerializerOutput, v: QueueEntryRecord) = {
-      val pb = new QueueEntryPB.Bean
-      pb.setQueueKey(v.queue_key)
-      pb.setQueueSeq(v.entry_seq)
-      pb.setMessageKey(v.message_key)
-      pb.setAttachment(v.attachment)
-      pb.setSize(v.size)
-      pb.setRedeliveries(v.redeliveries)
-      pb.freeze.writeUnframed(out)
-    }
-
-    def deserialize(in: SerializerInput) = {
-      val pb =  QueueEntryPB.FACTORY.parseUnframed(in)
-      val rc = new QueueEntryRecord
-      rc.queue_key = pb.getQueueKey
-      rc.entry_seq = pb.getQueueSeq
-      rc.message_key = pb.getMessageKey
-      rc.attachment = pb.getAttachment
-      rc.size = pb.getSize
-      rc.redeliveries = pb.getRedeliveries.toShort
-      rc
-    }
+    def serialize(out: SerializerOutput, v: QueueEntryRecord) = encode_queue_entry_record(out,
v)
+    def deserialize(in: SerializerInput) = decode_queue_entry_record(in)
   }
 
   object QueueEntryKeySerializer extends Serializer[(Long,Long)] {



Mime
View raw message