activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r713835 - in /activemq/sandbox/kahadb/src: main/java/org/apache/kahadb/replication/ReplicationMaster.java main/java/org/apache/kahadb/store/KahaDBStore.java test/java/org/apache/kahadb/store/PBMesssagesTest.java
Date Thu, 13 Nov 2008 21:40:17 GMT
Author: chirino
Date: Thu Nov 13 13:40:16 2008
New Revision: 713835

URL: http://svn.apache.org/viewvc?rev=713835&view=rev
Log:
Updating to work with the latest API changes done to the activemq-protobuf stuff

Added:
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/PBMesssagesTest.java
Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=713835&r1=713834&r2=713835&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
(original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
Thu Nov 13 13:40:16 2008
@@ -31,7 +31,6 @@
 import java.util.zip.Checksum;
 
 import org.apache.activemq.Service;
-import org.apache.activemq.protobuf.ByteString;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportAcceptListener;
 import org.apache.activemq.transport.TransportFactory;
@@ -121,7 +120,7 @@
 		frame.setHeader(new PBHeader().setType(PBType.JOURNAL_UPDATE));
 		PBJournalUpdate payload = new PBJournalUpdate();
 		payload.setLocation(convert(location));
-		payload.setData(ByteString.copyFrom(sequence.getData(), sequence.getOffset(), sequence.getLength()));
+		payload.setData(new org.apache.activemq.protobuf.Buffer(sequence.getData(), sequence.getOffset(),
sequence.getLength()));
 		frame.setPayload(payload);
 
 		for (ReplicationSession session : sessions) {

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java?rev=713835&r1=713834&r2=713835&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBStore.java Thu Nov
13 13:40:16 2008
@@ -39,7 +39,7 @@
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.protobuf.ByteString;
+import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -48,7 +48,6 @@
 import org.apache.activemq.store.TransactionStore;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.Transaction;
@@ -138,8 +137,8 @@
             command.setMessageId(message.getMessageId().toString());
             command.setTransactionInfo( createTransactionInfo(message.getTransactionId())
);
 
-            ByteSequence packet = wireFormat.marshal(message);
-            command.setMessage(ByteString.copyFrom(packet.getData(), packet.getOffset(),
packet.getLength()));
+            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
+            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
 
             store(command, isSyncWrites() && message.isResponseRequired());
             
@@ -276,8 +275,8 @@
             command.setDestination(dest);
             command.setSubscriptionKey(subscriptionKey);
             command.setRetroactive(retroactive);
-            ByteSequence packet = wireFormat.marshal(subscriptionInfo);
-            command.setSubscriptionInfo(ByteString.copyFrom(packet.getData(), packet.getOffset(),
packet.getLength()));
+            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
+            command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(),
packet.getLength()));
             store(command, isSyncWrites() && true);
         }
 
@@ -521,8 +520,8 @@
         } else {
             XATransactionId t = (XATransactionId)txid;
             KahaXATransactionId kahaTxId = new KahaXATransactionId();
-            kahaTxId.setBranchQualifier(ByteString.copyFrom(t.getBranchQualifier()));
-            kahaTxId.setGlobalTransactionId(ByteString.copyFrom(t.getGlobalTransactionId()));
+            kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
+            kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
             kahaTxId.setFormatId(t.getFormatId());
             rc.setXaTransacitonId(kahaTxId);
         }

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/PBMesssagesTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/PBMesssagesTest.java?rev=713835&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/PBMesssagesTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/PBMesssagesTest.java Thu
Nov 13 13:40:16 2008
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.store;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.store.data.KahaAddMessageCommand;
+import org.apache.kahadb.store.data.KahaDestination;
+import org.apache.kahadb.store.data.KahaEntryType;
+import org.apache.kahadb.store.data.KahaDestination.DestinationType;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.DataByteArrayInputStream;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+
+public class PBMesssagesTest extends TestCase {
+
+    public void testKahaAddMessageCommand() throws IOException {
+
+       KahaAddMessageCommand expected = new KahaAddMessageCommand();
+       expected.setDestination(new KahaDestination().setName("Foo").setType(DestinationType.QUEUE));
+       expected.setMessage(new Buffer(new byte[] {1,2,3,4,5,6} ));
+       expected.setMessageId("Hello World");
+       
+       int size = expected.serializedSizeFramed();
+       DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
+       os.writeByte(expected.type().getNumber());
+       expected.writeFramed(os);
+       ByteSequence seq = os.toByteSequence();
+       
+       DataByteArrayInputStream is = new DataByteArrayInputStream(seq);
+       KahaEntryType type = KahaEntryType.valueOf(is.readByte());
+       JournalCommand message = (JournalCommand)type.createMessage();
+       message.mergeFramed(is);
+       
+       assertEquals(expected, message);
+    }
+    
+}



Mime
View raw message