qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1228748 [1/2] - in /qpid/trunk/qpid/java: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ broker/src/main/java/org/apache/qpid/qmf/ broker/src/main/java/org/apache/qpid/server/configuration/ broker/src/main/java/org/apache/...
Date Sat, 07 Jan 2012 22:47:18 GMT
Author: rgodfrey
Date: Sat Jan  7 22:47:17 2012
New Revision: 1228748

URL: http://svn.apache.org/viewvc?rev=1228748&view=rev
Log:
QPID-946 , QPID-2379 : QMF and Federation fixes (now works again with qpid-config, qpid-route, qpid-tool) and store (durable) routes in the DB

Added:
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StringMapBinding.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/UUIDTupleBinding.java
Modified:
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommandCompletionCommand.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfig.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
    qpid/trunk/qpid/java/broker/src/xsl/qmf.xsl
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java?rev=1228748&r1=1228747&r2=1228748&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java Sat Jan  7 22:47:17 2012
@@ -27,13 +27,16 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.bind.tuple.StringBinding;
 import com.sleepycat.je.*;
 import org.apache.commons.configuration.Configuration;
 import org.apache.log4j.Logger;
@@ -41,6 +44,8 @@ import org.apache.qpid.AMQStoreException
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.federation.Bridge;
+import org.apache.qpid.server.federation.BrokerLink;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
@@ -100,12 +105,17 @@ public class BDBMessageStore implements 
     private String DELIVERYDB_NAME = "deliveryDb";
     private String EXCHANGEDB_NAME = "exchangeDb";
     private String QUEUEDB_NAME = "queueDb";
+    private String BRIDGEDB_NAME = "bridges";
+    private String LINKDB_NAME = "links";
+
     private Database _messageMetaDataDb;
     private Database _messageContentDb;
     private Database _queueBindingsDb;
     private Database _deliveryDb;
     private Database _exchangeDb;
     private Database _queueDb;
+    private Database _bridgeDb;
+    private Database _linkDb;
 
     /* =======
      * Schema:
@@ -190,6 +200,10 @@ public class BDBMessageStore implements 
             EXCHANGEDB_NAME += "_v" + version;
 
             QUEUEBINDINGSDB_NAME += "_v" + version;
+
+            LINKDB_NAME += "_v" + version;
+
+            BRIDGEDB_NAME += "_v" + version;
         }
     }
  
@@ -461,6 +475,9 @@ public class BDBMessageStore implements 
         _queueBindingsDb = _environment.openDatabase(null, QUEUEBINDINGSDB_NAME, dbConfig);
         _messageContentDb = _environment.openDatabase(null, MESSAGECONTENTDB_NAME, dbConfig);
         _deliveryDb = _environment.openDatabase(null, DELIVERYDB_NAME, dbConfig);
+        _linkDb = _environment.openDatabase(null, LINKDB_NAME, dbConfig);
+        _bridgeDb = _environment.openDatabase(null, BRIDGEDB_NAME, dbConfig);
+
 
     }
 
@@ -517,6 +534,18 @@ public class BDBMessageStore implements 
             _deliveryDb.close();
         }
 
+        if (_bridgeDb != null)
+        {
+            _log.info("Close bridge database");
+            _bridgeDb.close();
+        }
+
+        if (_linkDb != null)
+        {
+            _log.info("Close link database");
+            _linkDb.close();
+        }
+
         closeEnvironment();
 
         _state = State.CLOSED;
@@ -556,8 +585,9 @@ public class BDBMessageStore implements 
             
             BindingRecoveryHandler brh = erh.completeExchangeRecovery();
             recoverBindings(brh);
-            
-            brh.completeBindingRecovery();
+
+            ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
+            recoverBrokerLinks(lrh);
         }
         catch (DatabaseException e)
         {
@@ -674,6 +704,74 @@ public class BDBMessageStore implements 
 
     }
 
+
+    private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh)
+    {
+        Cursor cursor = null;
+
+        try
+        {
+            cursor = _linkDb.openCursor(null, null);
+            DatabaseEntry key = new DatabaseEntry();
+            DatabaseEntry value = new DatabaseEntry();
+            
+            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
+                long createTime = LongBinding.entryToLong(value);
+                Map<String,String> arguments = StringMapBinding.getInstance().entryToObject(value);
+
+                ConfigurationRecoveryHandler.BridgeRecoveryHandler brh = lrh.brokerLink(id, createTime, arguments);
+
+                recoverBridges(brh, id);
+            }
+        }
+        finally
+        {
+            if (cursor != null)
+            {
+                cursor.close();
+            }
+        }
+
+    }
+
+    private void recoverBridges(final ConfigurationRecoveryHandler.BridgeRecoveryHandler brh, final UUID linkId)
+    {
+        Cursor cursor = null;
+
+        try
+        {
+            cursor = _bridgeDb.openCursor(null, null);
+            DatabaseEntry key = new DatabaseEntry();
+            DatabaseEntry value = new DatabaseEntry();
+
+            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
+
+                UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value);
+                if(parentId.equals(linkId))
+                {
+
+                    long createTime = LongBinding.entryToLong(value);
+                    Map<String,String> arguments = StringMapBinding.getInstance().entryToObject(value);
+                    brh.bridge(id,createTime,arguments);
+                }
+            }
+            brh.completeBridgeRecoveryForLink();
+        }
+        finally
+        {
+            if (cursor != null)
+            {
+                cursor.close();
+            }
+        }
+
+    }
+
+
     private void recoverMessages(MessageStoreRecoveryHandler msrh) throws DatabaseException
     {
         StoredMessageRecoveryHandler mrh = msrh.begin();
@@ -1163,6 +1261,90 @@ public class BDBMessageStore implements 
         }
     }
 
+    public void createBrokerLink(final BrokerLink link) throws AMQStoreException
+    {
+        if (_state != State.RECOVERING)
+        {
+            DatabaseEntry key = new DatabaseEntry();
+            UUIDTupleBinding.getInstance().objectToEntry(link.getId(), key);
+
+            DatabaseEntry value = new DatabaseEntry();
+            LongBinding.longToEntry(link.getCreateTime(),value);
+            StringMapBinding.getInstance().objectToEntry(link.getArguments(), value);
+
+            try
+            {
+                _linkDb.put(null, key, value);
+            }
+            catch (DatabaseException e)
+            {
+                throw new AMQStoreException("Error writing Link  " + link
+                                            + " to database: " + e.getMessage(), e);
+            }
+        }
+    }
+
+    public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
+    {
+        DatabaseEntry key = new DatabaseEntry();
+        UUIDTupleBinding.getInstance().objectToEntry(link.getId(), key);
+        try
+        {
+            OperationStatus status = _linkDb.delete(null, key);
+            if (status == OperationStatus.NOTFOUND)
+            {
+                throw new AMQStoreException("Link " + link + " not found");
+            }
+        }
+        catch (DatabaseException e)
+        {
+            throw new AMQStoreException("Error deleting the Link " + link + " from database: " + e.getMessage(), e);
+        }
+    }
+
+    public void createBridge(final Bridge bridge) throws AMQStoreException
+    {
+        if (_state != State.RECOVERING)
+        {
+            DatabaseEntry key = new DatabaseEntry();
+            UUIDTupleBinding.getInstance().objectToEntry(bridge.getId(), key);
+
+            DatabaseEntry value = new DatabaseEntry();
+            UUIDTupleBinding.getInstance().objectToEntry(bridge.getLink().getId(),value);
+            LongBinding.longToEntry(bridge.getCreateTime(),value);
+            StringMapBinding.getInstance().objectToEntry(bridge.getArguments(), value);
+
+            try
+            {
+                _bridgeDb.put(null, key, value);
+            }
+            catch (DatabaseException e)
+            {
+                throw new AMQStoreException("Error writing Bridge  " + bridge
+                                            + " to database: " + e.getMessage(), e);
+            }
+
+        }
+    }
+
+    public void deleteBridge(final Bridge bridge) throws AMQStoreException
+    {
+        DatabaseEntry key = new DatabaseEntry();
+        UUIDTupleBinding.getInstance().objectToEntry(bridge.getId(), key);
+        try
+        {
+            OperationStatus status = _bridgeDb.delete(null, key);
+            if (status == OperationStatus.NOTFOUND)
+            {
+                throw new AMQStoreException("Bridge " + bridge + " not found");
+            }
+        }
+        catch (DatabaseException e)
+        {
+            throw new AMQStoreException("Error deleting the Bridge " + bridge + " from database: " + e.getMessage(), e);
+        }
+    }
+
     /**
      * Places a message onto a specified queue, in a given transaction.
      *

Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StringMapBinding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StringMapBinding.java?rev=1228748&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StringMapBinding.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StringMapBinding.java Sat Jan  7 22:47:17 2012
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class StringMapBinding extends TupleBinding<Map<String,String>>
+{
+    
+    private static final StringMapBinding INSTANCE = new StringMapBinding();
+    
+    public Map<String, String> entryToObject(final TupleInput tupleInput)
+    {
+        int entries = tupleInput.readInt();
+        Map<String,String> map = new HashMap<String,String>(entries);
+        for(int i = 0; i < entries; i++)
+        {
+            map.put(tupleInput.readString(), tupleInput.readString());
+        }
+        return map;
+    }
+
+    
+    public void objectToEntry(final Map<String, String> stringStringMap, final TupleOutput tupleOutput)
+    {
+        tupleOutput.writeInt(stringStringMap.size());
+        for(Map.Entry<String,String> entry : stringStringMap.entrySet())
+        {
+            tupleOutput.writeString(entry.getKey());
+            tupleOutput.writeString(entry.getValue());
+        }
+    }
+
+    public static StringMapBinding getInstance()
+    {
+        return INSTANCE;
+    }
+}

Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/UUIDTupleBinding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/UUIDTupleBinding.java?rev=1228748&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/UUIDTupleBinding.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/UUIDTupleBinding.java Sat Jan  7 22:47:17 2012
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+import java.util.UUID;
+
+public class UUIDTupleBinding extends TupleBinding<UUID>
+{
+    private static final UUIDTupleBinding INSTANCE = new UUIDTupleBinding();
+    
+    public UUID entryToObject(final TupleInput tupleInput)
+    {
+        return new UUID(tupleInput.readLong(), tupleInput.readLong());
+    }
+
+    public void objectToEntry(final UUID uuid, final TupleOutput tupleOutput)
+    {
+        tupleOutput.writeLong(uuid.getMostSignificantBits());
+        tupleOutput.writeLong(uuid.getLeastSignificantBits());        
+    }
+
+    public static UUIDTupleBinding getInstance()
+    {
+        return INSTANCE;
+    }
+
+
+}

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java?rev=1228748&r1=1228747&r2=1228748&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java Sat Jan  7 22:47:17 2012
@@ -21,6 +21,7 @@
 
 package org.apache.qpid.qmf;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.transport.codec.BBDecoder;
 import org.apache.qpid.transport.codec.BBEncoder;
 import org.apache.qpid.server.message.ServerMessage;
@@ -37,6 +38,9 @@ import java.util.List;
 public class QMFBrokerRequestCommand extends QMFCommand
 {
 
+    private static final Logger _qmfLogger = Logger.getLogger("qpid.qmf");
+
+
     public QMFBrokerRequestCommand(QMFCommandHeader header, BBDecoder buf)
     {
         super(header);
@@ -47,6 +51,8 @@ public class QMFBrokerRequestCommand ext
         String exchangeName = message.getMessageHeader().getReplyToExchange();
         String queueName = message.getMessageHeader().getReplyToRoutingKey();
 
+        _qmfLogger.debug("Execute: " + this);
+
         QMFCommand[] commands = new QMFCommand[2];
         commands[0] = new QMFBrokerResponseCommand(this, virtualHost);
         commands[1] = new QMFCommandCompletionCommand(this);

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java?rev=1228748&r1=1228747&r2=1228748&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java Sat Jan  7 22:47:17 2012
@@ -21,6 +21,7 @@
 
 package org.apache.qpid.qmf;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.transport.codec.BBDecoder;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.message.ServerMessage;
@@ -35,6 +36,9 @@ import java.util.List;
 
 public class QMFClassQueryCommand extends QMFCommand
 {
+    private static final Logger _qmfLogger = Logger.getLogger("qpid.qmf");
+
+
     private final String _package;
 
     public QMFClassQueryCommand(QMFCommandHeader header, BBDecoder decoder)
@@ -48,6 +52,8 @@ public class QMFClassQueryCommand extend
         String exchangeName = message.getMessageHeader().getReplyToExchange();
         String routingKey = message.getMessageHeader().getReplyToRoutingKey();
 
+        _qmfLogger.debug("Execute: " + this);
+
         IApplicationRegistry appRegistry = virtualHost.getApplicationRegistry();
         QMFService service = appRegistry.getQMFService();
 
@@ -88,4 +94,12 @@ public class QMFClassQueryCommand extend
         }
     }
 
+
+    @Override
+    public String toString()
+    {
+        return "QMFClassQueryCommand{" +
+               "package='" + _package + '\'' +
+               '}';
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommandCompletionCommand.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommandCompletionCommand.java?rev=1228748&r1=1228747&r2=1228748&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommandCompletionCommand.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommandCompletionCommand.java Sat Jan  7 22:47:17 2012
@@ -53,4 +53,13 @@ public class QMFCommandCompletionCommand
         encoder.writeInt32(_status.ordinal());
         encoder.writeStr8(_text);
     }
+
+    @Override
+    public String toString()
+    {
+        return "QMFCommandCompletionCommand{" +
+               "status=" + _status +
+               ",text='" + _text + '\'' +
+               '}';
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java?rev=1228748&r1=1228747&r2=1228748&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java Sat Jan  7 22:47:17 2012
@@ -21,6 +21,7 @@
 
 package org.apache.qpid.qmf;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.transport.codec.BBDecoder;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.message.ServerMessage;
@@ -33,28 +34,22 @@ import java.util.*;
 
 public class QMFGetQueryCommand extends QMFCommand
 {
-    private Map<String, Object> _map;
 
+    private static final Logger _qmfLogger = Logger.getLogger("qpid.qmf");
+
+    private String _className;
+    private String _packageName;
+    private UUID _objectId;
 
     public QMFGetQueryCommand(QMFCommandHeader header, BBDecoder decoder)
     {
         super(header);
 
-        _map = decoder.readMap();
-    }
-
-    public void process(VirtualHost virtualHost, ServerMessage message)
-    {
-        String exchangeName = message.getMessageHeader().getReplyToExchange();
-        String routingKey = message.getMessageHeader().getReplyToRoutingKey();
-
-        IApplicationRegistry appRegistry = virtualHost.getApplicationRegistry();
-        QMFService service = appRegistry.getQMFService();
-
-        String className = (String) _map.get("_class");
-        String packageName = (String) _map.get("_package");
+        Map<String, Object> _map = decoder.readMap();
+        _className = (String) _map.get("_class");
+        _packageName = (String) _map.get("_package");
         byte[] objectIdBytes = (byte[]) _map.get("_objectId");
-        UUID objectId;
+
         if(objectIdBytes != null)
         {
             long msb = 0;
@@ -68,21 +63,34 @@ public class QMFGetQueryCommand extends 
             {
                 lsb = (lsb << 8) | (objectIdBytes[i] & 0xff);
             }
-            objectId = new UUID(msb, lsb);
+            _objectId = new UUID(msb, lsb);
         }
         else
         {
-            objectId = null;
+            _objectId = null;
         }
 
+
+    }
+
+    public void process(VirtualHost virtualHost, ServerMessage message)
+    {
+        String exchangeName = message.getMessageHeader().getReplyToExchange();
+        String routingKey = message.getMessageHeader().getReplyToRoutingKey();
+
+        IApplicationRegistry appRegistry = virtualHost.getApplicationRegistry();
+        QMFService service = appRegistry.getQMFService();
+
+        _qmfLogger.debug("Execute: " + this);
+
         List<QMFCommand> commands = new ArrayList<QMFCommand>();
         final long sampleTime = System.currentTimeMillis() * 1000000l;
 
         Collection<QMFPackage> packages;
 
-        if(packageName != null && packageName.length() != 0)
+        if(_packageName != null && _packageName.length() != 0)
         {
-            QMFPackage qmfPackage = service.getPackage(packageName);
+            QMFPackage qmfPackage = service.getPackage(_packageName);
             if(qmfPackage == null)
             {
                 packages = Collections.EMPTY_LIST;
@@ -102,9 +110,9 @@ public class QMFGetQueryCommand extends 
 
             Collection<QMFClass> qmfClasses;
 
-            if(className != null && className.length() != 0)
+            if(_className != null && _className.length() != 0)
             {
-                QMFClass qmfClass = qmfPackage.getQMFClass(className);
+                QMFClass qmfClass = qmfPackage.getQMFClass(_className);
                 if(qmfClass == null)
                 {
                     qmfClasses = Collections.EMPTY_LIST;
@@ -124,9 +132,9 @@ public class QMFGetQueryCommand extends 
             {
                 Collection<QMFObject> objects;
 
-                if(objectId != null)
+                if(_objectId != null)
                 {
-                    QMFObject obj = service.getObjectById(qmfClass, objectId);
+                    QMFObject obj = service.getObjectById(qmfClass, _objectId);
                     if(obj == null)
                     {
                         objects = Collections.EMPTY_LIST;
@@ -158,7 +166,7 @@ public class QMFGetQueryCommand extends 
         for(QMFCommand cmd : commands)
         {
 
-
+            _qmfLogger.debug("Respond: " + cmd);
             QMFMessage responseMessage = new QMFMessage(routingKey, cmd);
 
             Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
@@ -179,4 +187,13 @@ public class QMFGetQueryCommand extends 
         }
     }
 
+    @Override
+    public String toString()
+    {
+        return "QMFGetQueryCommand{" +
+               "packageName='" + _packageName + '\'' +
+               ", className='" + _className + '\'' +
+               ", objectId=" + _objectId +
+               '}';
+    }
 }
\ No newline at end of file

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java?rev=1228748&r1=1228747&r2=1228748&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java Sat Jan  7 22:47:17 2012
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.qmf;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.transport.codec.BBDecoder;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.message.ServerMessage;
@@ -33,6 +34,8 @@ import java.util.ArrayList;
 
 public class QMFMethodRequestCommand extends QMFCommand
 {
+    private static final Logger _qmfLogger = Logger.getLogger("qpid.qmf");
+
     private QMFMethodInvocation _methodInstance;
     private QMFObject _object;
 
@@ -59,6 +62,9 @@ public class QMFMethodRequestCommand ext
         String queueName = message.getMessageHeader().getReplyToRoutingKey();
 
         QMFCommand[] commands = new QMFCommand[2];
+
+        _qmfLogger.debug("Execute: " + _methodInstance + " on " + _object);
+
         commands[0] = _methodInstance.execute(_object, this);
         commands[1] = new QMFCommandCompletionCommand(this);
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java?rev=1228748&r1=1228747&r2=1228748&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java Sat Jan  7 22:47:17 2012
@@ -73,4 +73,9 @@ public abstract class QMFObject<C extend
     abstract public QMFCommand asInstrumentInfoCmd(long sampleTime);
     abstract public QMFCommand asGetQueryResponseCmd(final QMFGetQueryCommand queryCommand, long sampleTime);
 
+    @Override
+    public String toString()
+    {
+        return _delegate.toString();
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java?rev=1228748&r1=1228747&r2=1228748&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java Sat Jan  7 22:47:17 2012
@@ -21,6 +21,7 @@
 
 package org.apache.qpid.qmf;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.transport.codec.BBDecoder;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.message.ServerMessage;
@@ -35,6 +36,9 @@ import java.util.List;
 
 public class QMFPackageQueryCommand extends QMFCommand
 {
+
+    private static final Logger _qmfLogger = Logger.getLogger("qpid.qmf");
+
     public QMFPackageQueryCommand(QMFCommandHeader header, BBDecoder decoder)
     {
         super(header);
@@ -53,6 +57,8 @@ public class QMFPackageQueryCommand exte
         
         QMFCommand[] commands = new QMFCommand[ supportedSchemas.size() + 1 ];
 
+        _qmfLogger.debug("Exectuting " + this);
+        
         int i = 0;
         for(QMFPackage p : supportedSchemas)
         {
@@ -84,4 +90,9 @@ public class QMFPackageQueryCommand exte
             }
         }
     }
+    
+    public String toString()
+    {
+        return "QMFPackageQueryCommand";
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java?rev=1228748&r1=1228747&r2=1228748&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java Sat Jan  7 22:47:17 2012
@@ -21,6 +21,7 @@
 
 package org.apache.qpid.qmf;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.transport.codec.BBDecoder;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.message.ServerMessage;
@@ -35,6 +36,8 @@ import java.util.List;
 
 public class QMFSchemaRequestCommand extends QMFCommand
 {
+    private static final Logger _qmfLogger = Logger.getLogger("qpid.qmf");
+
     private final String _packageName;
     private final String _className;
     private final byte[] _hash;
@@ -49,6 +52,8 @@ public class QMFSchemaRequestCommand ext
 
     public void process(VirtualHost virtualHost, ServerMessage message)
     {
+        _qmfLogger.debug("Execute: " + this);
+
         String exchangeName = message.getMessageHeader().getReplyToExchange();
         String routingKey = message.getMessageHeader().getReplyToRoutingKey();
 
@@ -86,4 +91,13 @@ public class QMFSchemaRequestCommand ext
             }
         }
     }
+
+    @Override
+    public String toString()
+    {
+        return "QMFSchemaRequestCommand{" +
+               " packageName='" + _packageName + '\'' +
+               ", className='" + _className + '\'' +
+               '}';
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java?rev=1228748&r1=1228747&r2=1228748&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java Sat Jan  7 22:47:17 2012
@@ -410,7 +410,10 @@ public class QMFService implements Confi
         ConcurrentHashMap<UUID, QMFObject> map = _managedObjectsById.get(qmfclass);
         if(map != null)
         {
-            return map.get(id);
+
+            UUID key = new UUID(id.getMostSignificantBits() & (0xFFFl << 48), id.getLeastSignificantBits());
+            return map.get(key);
+
         }
         else
         {
@@ -604,6 +607,11 @@ public class QMFService implements Confi
         {
             return _obj.getCreateTime();
         }
+
+        public String toString()
+        {
+            return _obj.toString();
+        }
     }
 
     private class BrokerDelegate implements BrokerSchema.BrokerDelegate
@@ -762,6 +770,11 @@ public class QMFService implements Confi
         {
             return _obj.getCreateTime();
         }
+
+        public String toString()
+        {
+            return _obj.toString();
+        }
     }
 
     private class VhostDelegate implements BrokerSchema.VhostDelegate
@@ -797,6 +810,11 @@ public class QMFService implements Confi
         {
             return _obj.getCreateTime();
         }
+
+        public String toString()
+        {
+            return _obj.toString();
+        }
     }
 
     private class ExchangeDelegate implements BrokerSchema.ExchangeDelegate
@@ -923,6 +941,11 @@ public class QMFService implements Confi
         {
             return _obj.getCreateTime();
         }
+
+        public String toString()
+        {
+            return _obj.toString();
+        }
     }
 
     private class QueueDelegate implements BrokerSchema.QueueDelegate
@@ -1163,6 +1186,11 @@ public class QMFService implements Confi
         {
             return _obj.getCreateTime();
         }
+
+        public String toString()
+        {
+            return _obj.toString();
+        }
     }
 
     private class BindingDelegate implements BrokerSchema.BindingDelegate
@@ -1214,6 +1242,11 @@ public class QMFService implements Confi
         {
             return _obj.getCreateTime();
         }
+
+        public String toString()
+        {
+            return _obj.toString();
+        }
     }
 
     private class ConnectionDelegate implements BrokerSchema.ConnectionDelegate
@@ -1352,6 +1385,12 @@ public class QMFService implements Confi
             // TODO
             return 0;
         }
+
+
+        public String toString()
+        {
+            return _obj.toString();
+        }
     }
 
     private class SessionDelegate implements BrokerSchema.SessionDelegate
@@ -1476,6 +1515,11 @@ public class QMFService implements Confi
         {
             return _obj.getCreateTime();
         }
+
+        public String toString()
+        {
+            return _obj.toString();
+        }
     }
 
     private class SubscriptionDelegate implements BrokerSchema.SubscriptionDelegate
@@ -1542,93 +1586,103 @@ public class QMFService implements Confi
         {
             return _obj.getCreateTime();
         }
-    }
 
-        private class BridgeDelegate implements BrokerSchema.BridgeDelegate
+        public String toString()
         {
-            private final BridgeConfig _obj;
+            return _obj.toString();
+        }
+    }
 
-            private BridgeDelegate(final BridgeConfig obj)
-            {
-                _obj = obj;
-            }
+    private class BridgeDelegate implements BrokerSchema.BridgeDelegate
+    {
+        private final BridgeConfig _obj;
 
-            public BrokerSchema.LinkObject getLinkRef()
-            {
-                return (BrokerSchema.LinkObject) adapt(_obj.getLink());
-            }
+        private BridgeDelegate(final BridgeConfig obj)
+        {
+            _obj = obj;
+        }
 
-            public Integer getChannelId()
-            {
-                return _obj.getChannelId();
-            }
+        public BrokerSchema.LinkObject getLinkRef()
+        {
+            return (BrokerSchema.LinkObject) adapt(_obj.getLink());
+        }
 
-            public Boolean getDurable()
-            {
-                return _obj.isDurable();
-            }
+        public Integer getChannelId()
+        {
+            return _obj.getChannelId();
+        }
 
-            public String getSrc()
-            {
-                return _obj.getSource();
-            }
+        public Boolean getDurable()
+        {
+            return _obj.isDurable();
+        }
 
-            public String getDest()
-            {
-                return _obj.getDestination();
-            }
+        public String getSrc()
+        {
+            return _obj.getSource();
+        }
 
-            public String getKey()
-            {
-                return _obj.getKey();
-            }
+        public String getDest()
+        {
+            return _obj.getDestination();
+        }
 
-            public Boolean getSrcIsQueue()
-            {
-                return _obj.isQueueBridge();
-            }
+        public String getKey()
+        {
+            return _obj.getKey();
+        }
 
-            public Boolean getSrcIsLocal()
-            {
-                return _obj.isLocalSource();
-            }
+        public Boolean getSrcIsQueue()
+        {
+            return _obj.isQueueBridge();
+        }
 
-            public String getTag()
-            {
-                return _obj.getTag();
-            }
+        public Boolean getSrcIsLocal()
+        {
+            return _obj.isLocalSource();
+        }
 
-            public String getExcludes()
-            {
-                return _obj.getExcludes();
-            }
+        public String getTag()
+        {
+            return _obj.getTag();
+        }
 
-            public Boolean getDynamic()
-            {
-                return _obj.isDynamic();
-            }
+        public String getExcludes()
+        {
+            return _obj.getExcludes();
+        }
 
-            public Integer getSync()
-            {
-                return _obj.getAckBatching();
-            }
+        public Boolean getDynamic()
+        {
+            return _obj.isDynamic();
+        }
 
-            public BrokerSchema.BridgeClass.CloseMethodResponseCommand close(final BrokerSchema.BridgeClass.CloseMethodResponseCommandFactory factory)
-            {
-                return null;
-            }
+        public Integer getSync()
+        {
+            return _obj.getAckBatching();
+        }
 
-            public UUID getId()
-            {
-                return _obj.getId();
-            }
+        public BrokerSchema.BridgeClass.CloseMethodResponseCommand close(final BrokerSchema.BridgeClass.CloseMethodResponseCommandFactory factory)
+        {
+            return null;
+        }
 
-            public long getCreateTime()
-            {
-                return _obj.getCreateTime();
-            }
+        public UUID getId()
+        {
+            return _obj.getId();
         }
 
+        public long getCreateTime()
+        {
+            return _obj.getCreateTime();
+        }
+        
+        public String toString()
+        {
+            return _obj.toString();
+        }
+    }
+
     private class LinkDelegate implements BrokerSchema.LinkDelegate
     {
         private final LinkConfig _obj;
@@ -1665,14 +1719,12 @@ public class QMFService implements Confi
 
         public String getState()
         {
-            // TODO
-            return "";
+            return _obj.getState();
         }
 
         public String getLastError()
         {
-            // TODO
-            return "";
+            return _obj.getLastError();
         }
 
         public BrokerSchema.LinkClass.CloseMethodResponseCommand close(final BrokerSchema.LinkClass.CloseMethodResponseCommandFactory factory)
@@ -1706,6 +1758,12 @@ public class QMFService implements Confi
         {
             return _obj.getCreateTime();
         }
+
+        @Override
+        public String toString()
+        {
+            return _obj.toString();
+        }
     }
 
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java?rev=1228748&r1=1228747&r2=1228748&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java Sat Jan  7 22:47:17 2012
@@ -40,7 +40,10 @@ public class ConfigStore
     private AtomicReference<SystemConfig> _root = new AtomicReference<SystemConfig>(null);
 
     private final AtomicLong _objectIdSource = new AtomicLong(0l);
+    private final AtomicLong _persistentObjectIdSource = new AtomicLong(0l);
 
+    // TODO - should load/increment this on broker startup
+    private long _sequenceNumber = 1L;
 
     public enum Event
     {
@@ -167,9 +170,23 @@ public class ConfigStore
 
     public UUID createId()
     {
-        return new UUID(0l, _objectIdSource.getAndIncrement());
+        return new UUID(((_sequenceNumber & 0xFFFl)<<48), _objectIdSource.incrementAndGet());
     }
 
+    public UUID createPersistentId()
+    {
+        return new UUID(0L, _persistentObjectIdSource.incrementAndGet());
+    }
+    
+    public void persistentIdInUse(UUID id)
+    {
+        long lsb = id.getLeastSignificantBits();
+        long currentId;
+        while((currentId = _persistentObjectIdSource.get()) < lsb)
+        {
+            _persistentObjectIdSource.compareAndSet(currentId, lsb);
+        }
+    }
 
     public SystemConfig getRoot()
     {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfig.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfig.java?rev=1228748&r1=1228747&r2=1228748&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfig.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfig.java Sat Jan  7 22:47:17 2012
@@ -54,4 +54,8 @@ public interface LinkConfig extends Conf
                       String src,
                       String dest,
                       String key, String tag, String excludes);
+
+    String getState();
+
+    String getLastError();
 }
\ No newline at end of file

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java?rev=1228748&r1=1228747&r2=1228748&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java Sat Jan  7 22:47:17 2012
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.federation;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.configuration.BridgeConfig;
 import org.apache.qpid.server.configuration.BridgeConfigType;
@@ -52,6 +53,15 @@ import java.util.concurrent.ConcurrentMa
 
 public class Bridge implements BridgeConfig
 {
+    private static final String DURABLE = "durable";
+    private static final String DYNAMIC = "dynamic";
+    private static final String SRC_IS_QUEUE = "srcIsQueue";
+    private static final String SRC_IS_LOCAL = "srcIsLocal";
+    private static final String SOURCE = "source";
+    private static final String DESTINATION = "destination";
+    private static final String KEY = "key";
+    private static final String TAG = "tag";
+    private static final String EXCLUDES = "excludes";
     private final boolean _durable;
     private final boolean _dynamic;
     private final boolean _queueBridge;
@@ -95,19 +105,36 @@ public class Bridge implements BridgeCon
         _key = key;
         _tag = tag;
         _excludes = excludes;
-        _id = brokerLink.getConfigStore().createId();
+        _id = durable ? brokerLink.getConfigStore().createPersistentId() : brokerLink.getConfigStore().createId();
 
         _transaction = new AutoCommitTransaction(getVirtualHost().getMessageStore());
 
-        if(dynamic)
+        if(durable)
         {
-            if(srcIsLocal)
+            try
+            {
+                brokerLink.getVirtualHost().getDurableConfigurationStore().createBridge(this);
+            }
+            catch (AMQStoreException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        createDelegate();
+    }
+
+    private void createDelegate()
+    {
+        if(_dynamic)
+        {
+            if(_localSource)
             {
                 // TODO
             }
             else
             {
-                if(srcIsQueue)
+                if(_queueBridge)
                 {
                     // TODO
                 }
@@ -119,9 +146,9 @@ public class Bridge implements BridgeCon
         }
         else
         {
-            if(srcIsLocal)
+            if(_localSource)
             {
-                if(srcIsQueue)
+                if(_queueBridge)
                 {
                     _delegate = new StaticQueuePushBridge();
                 }
@@ -132,7 +159,7 @@ public class Bridge implements BridgeCon
             }
             else
             {
-                if(srcIsQueue)
+                if(_queueBridge)
                 {
                     _delegate = new StaticQueuePullBridge();
                 }
@@ -144,6 +171,65 @@ public class Bridge implements BridgeCon
         }
     }
 
+    public Bridge(final BrokerLink brokerLink,
+                  final int bridgeNo,
+                  final UUID id,
+                  final long createTime,
+                  final Map<String, String> arguments)
+    {
+        _link = brokerLink;
+        _bridgeNo = bridgeNo;
+        _id = id;
+        brokerLink.getConfigStore().persistentIdInUse(id);
+        _createTime = createTime;
+
+        _durable = Boolean.valueOf(arguments.get(DURABLE));
+        _dynamic = Boolean.valueOf(arguments.get(DYNAMIC));
+        _queueBridge = Boolean.valueOf(arguments.get(SRC_IS_QUEUE));
+        _localSource = Boolean.valueOf(arguments.get(SRC_IS_LOCAL));
+        _source = arguments.get(SOURCE);
+        _destination = arguments.get(DESTINATION);
+        _key = arguments.get(KEY);
+        _tag = arguments.get(TAG);
+        _excludes = arguments.get(EXCLUDES);
+
+        //TODO.
+        _transaction = new AutoCommitTransaction(getVirtualHost().getMessageStore());
+
+
+        if(_durable)
+        {
+            try
+            {
+                brokerLink.getVirtualHost().getDurableConfigurationStore().createBridge(this);
+            }
+            catch (AMQStoreException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        createDelegate();
+    }
+
+
+    public Map<String,String> getArguments()
+    {
+        Map<String,String> arguments = new HashMap<String, String>();
+
+        arguments.put(DURABLE, String.valueOf(_durable));
+        arguments.put(DYNAMIC, String.valueOf(_dynamic));
+        arguments.put(SRC_IS_QUEUE, String.valueOf(_queueBridge));
+        arguments.put(SRC_IS_LOCAL, String.valueOf(_localSource));
+        arguments.put(SOURCE, _source);
+        arguments.put(DESTINATION, _destination);
+        arguments.put(KEY, _key);
+        arguments.put(TAG, _tag);
+        arguments.put(EXCLUDES, _excludes);
+
+        return Collections.unmodifiableMap(arguments);
+    }
+
     public UUID getId()
     {
         return _id;
@@ -318,6 +404,7 @@ public class Bridge implements BridgeCon
     }
 
 
+
     private interface BridgeImpl
     {
         void setSession(Session session);

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java?rev=1228748&r1=1228747&r2=1228748&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java Sat Jan  7 22:47:17 2012
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.federation;
 
+import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.common.ServerPropertyNames;
 import org.apache.qpid.server.configuration.ConfigStore;
 import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -29,16 +30,19 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.configuration.LinkConfigType;
 import org.apache.qpid.server.transport.ServerSession;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.Binary;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionException;
-import org.apache.qpid.transport.ConnectionListener;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionDelegate;
-import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.*;
+import org.apache.qpid.util.Strings;
 
-import java.util.Map;
-import java.util.UUID;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import java.io.IOException;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -55,6 +59,14 @@ public class BrokerLink implements LinkC
 
     private static final ScheduledThreadPoolExecutor _threadPool =
             new ScheduledThreadPoolExecutor(CORE_POOL_SIZE);
+    private static final String TRANSPORT   = "transport";
+    private static final String HOST = "host";
+    private static final String PORT = "port";
+    private static final String REMOTE_VHOST = "remoteVhost";
+    private static final String DURABLE = "durable";
+    private static final String AUTH_MECHANISM = "authMechanism";
+    private static final String USERNAME = "username";
+    private static final String PASSWORD = "password";
 
 
     private final String _transport;
@@ -68,7 +80,7 @@ public class BrokerLink implements LinkC
     private final VirtualHost _virtualHost;
     private UUID _id;
     private AtomicBoolean _closing = new AtomicBoolean();
-    private final long _createTime = System.currentTimeMillis();
+    private final long _createTime;
     private Connection _qpidConnection;
     private AtomicReference<Thread> _executor = new AtomicReference<Thread>();
     private AtomicInteger _bridgeId = new AtomicInteger();
@@ -88,8 +100,10 @@ public class BrokerLink implements LinkC
             {
                 doMakeConnection();
             }
-        };;
-    ;
+        };
+
+
+
 
     public static enum State
     {
@@ -205,6 +219,44 @@ public class BrokerLink implements LinkC
         }
     };
 
+    public BrokerLink(final VirtualHost virtualHost, UUID id, long createTime, Map<String, String> arguments)
+    {
+        _virtualHost = virtualHost;
+        _id = id;
+        virtualHost.getConfigStore().persistentIdInUse(id);
+        _createTime = createTime;
+        _transport = arguments.get(TRANSPORT);
+
+        _host = arguments.get(HOST);
+        _port = Integer.parseInt(arguments.get(PORT));
+        _remoteVhost = arguments.get(REMOTE_VHOST);
+        _durable = Boolean.parseBoolean(arguments.get(DURABLE));
+        _authMechanism = arguments.get("authMechanism");
+        _username = arguments.get("username");
+        _password = arguments.get("password");
+
+        if(_durable)
+        {
+            try
+            {
+                _virtualHost.getDurableConfigurationStore().createBrokerLink(this);
+            }
+            catch (AMQStoreException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+
+        _qpidConnection = new Connection();
+        _connectionConfig = new ConnectionConfigAdapter();
+        _qpidConnection.addConnectionListener(this);
+
+
+        makeConnection();
+        
+    }
+
 
     public BrokerLink(final VirtualHost virtualHost,
                       final String transport,
@@ -212,10 +264,13 @@ public class BrokerLink implements LinkC
                       final int port,
                       final String remoteVhost,
                       final boolean durable,
-                      final String authMechanism, final String username, final String password)
+                      final String authMechanism,
+                      final String username,
+                      final String password)
     {
         _virtualHost = virtualHost;
         _transport = transport;
+        _createTime = System.currentTimeMillis();
         _host = host;
         _port = port;
         _remoteVhost = remoteVhost;
@@ -223,15 +278,42 @@ public class BrokerLink implements LinkC
         _authMechanism = authMechanism;
         _username = username;
         _password = password;
-        _id = virtualHost.getConfigStore().createId();
+        _id = durable ? virtualHost.getConfigStore().createPersistentId() : virtualHost.getConfigStore().createId();
+
+        if(durable)
+        {
+            try
+            {
+                _virtualHost.getDurableConfigurationStore().createBrokerLink(this);
+            }
+            catch (AMQStoreException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
         _qpidConnection = new Connection();
         _connectionConfig = new ConnectionConfigAdapter();
         _qpidConnection.addConnectionListener(this);
 
-
         makeConnection();
     }
 
+    public Map<String,String> getArguments()
+    {
+        Map<String,String> arguments = new HashMap<String, String>();
+        
+        arguments.put(TRANSPORT, _transport);
+        arguments.put(HOST, _host);
+        arguments.put(PORT, String.valueOf(_port));
+        arguments.put(REMOTE_VHOST, _remoteVhost);
+        arguments.put(DURABLE, String.valueOf(_durable));
+        arguments.put(AUTH_MECHANISM, _authMechanism);
+        arguments.put(USERNAME, _username);
+        arguments.put(PASSWORD, _password);
+
+        return Collections.unmodifiableMap(arguments);
+    }
+    
     private final boolean updateState(State expected, State newState)
     {
         return _stateUpdater.compareAndSet(this,expected,newState);
@@ -250,9 +332,50 @@ public class BrokerLink implements LinkC
         {
             try
             {
+                _qpidConnection.setConnectionDelegate(new ClientDelegate(new ConnectionSettings())
+                {
+                    protected SaslClient createSaslClient(List<Object> brokerMechs) throws ConnectionException,
+                                                                                           SaslException
+                    {
+                        Map<String,Object> saslProps = new HashMap<String,Object>();
+
+
+                        CallbackHandler cbh = new CallbackHandler()
+                        {
+                            public void handle(final Callback[] callbacks)
+                                    throws IOException, UnsupportedCallbackException
+                            {
+                                for (int i = 0; i < callbacks.length; i++)
+                                {
+                                    Callback cb = callbacks[i];
+                                    if (cb instanceof NameCallback)
+                                    {
+                                        ((NameCallback)cb).setName(_username);
+                                    }
+                                    else if (cb instanceof PasswordCallback)
+                                    {
+                                        ((PasswordCallback)cb).setPassword(_password.toCharArray());
+                                    }
+                                    else
+                                    {
+                                        throw new UnsupportedCallbackException(cb);
+                                    }
+                            }
+
+                            }
+                        };
+                        final SaslClient sc = Sasl.createSaslClient(new String[] {"PLAIN"}, null,
+                                                                    _conSettings.getSaslProtocol(),
+                                                                    _conSettings.getSaslServerName(),
+                                                                    saslProps, cbh);
+
+                        return sc;
+                }});
+
                 _qpidConnection.connect(_host, _port, _remoteVhost, _username, _password, "ssl".equals(_transport), _authMechanism);
 
                 final Map<String,Object> serverProps = _qpidConnection.getServerProperties();
+
                 _remoteFederationTag = (String) serverProps.get(ServerPropertyNames.FEDERATION_TAG);
                 if(_remoteFederationTag == null)
                 {
@@ -445,6 +568,20 @@ public class BrokerLink implements LinkC
 
     }
 
+    public void createBridge(final UUID id, final long createTime, final Map<String, String> arguments)
+    {
+        if(!_closing.get())
+        {
+            Bridge bridge = new Bridge(this, _bridgeId.incrementAndGet(), id, createTime, arguments);
+            if(_bridges.putIfAbsent(bridge, bridge) == null)
+            {
+
+                addBridge(bridge);
+            }
+        }
+    }
+
+
     private void addBridge(final Bridge bridge)
     {
         getConfigStore().addConfiguredObject(bridge);
@@ -509,4 +646,34 @@ public class BrokerLink implements LinkC
     {
         return _remoteFederationTag;
     }
+
+    public String getState()
+    {
+        return _state.name();
+    }
+
+    public String getLastError()
+    {
+        return _lastErrorMessage;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "BrokerLink{" +
+               " _id=" + _id +
+               ", _transport='" + _transport + '\'' +
+               ", _host='" + _host + '\'' +
+               ", _port=" + _port +
+               ", _remoteVhost='" + _remoteVhost + '\'' +
+               ", _durable=" + _durable +
+               ", _authMechanism='" + _authMechanism + '\'' +
+               ", _username='" + _username + '\'' +
+               ", _password='" + _password + '\'' +
+               ", _virtualHost=" + _virtualHost +
+               ", _createTime=" + _createTime +
+               ", _remoteFederationTag='" + _remoteFederationTag + '\'' +
+               ", _state=" + _state +
+               '}';
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java?rev=1228748&r1=1228747&r2=1228748&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java Sat Jan  7 22:47:17 2012
@@ -72,13 +72,16 @@ public class ChannelLogSubject extends A
          * 3 - Virtualhost
          * 4 - Channel ID
          */
-        ServerConnection connection = (ServerConnection) session.getConnection();
-        setLogStringWithFormat(CHANNEL_FORMAT,
-                               connection == null ? -1L : connection.getConnectionId(),
-                               session.getAuthorizedPrincipal() == null ? "?" : session.getAuthorizedPrincipal().getName(),
-                               (connection == null || connection.getConfig() == null) ? "?" : connection.getConfig().getAddress(),
-                               session.getVirtualHost().getName(),
-                               session.getChannel());
+        if(session.getConnection() instanceof ServerConnection)
+        {
+            ServerConnection connection = (ServerConnection) session.getConnection();
+            setLogStringWithFormat(CHANNEL_FORMAT,
+                                   connection == null ? -1L : connection.getConnectionId(),
+                                   session.getAuthorizedPrincipal() == null ? "?" : session.getAuthorizedPrincipal().getName(),
+                                   (connection == null || connection.getConfig() == null) ? "?" : connection.getConfig().getAddress(),
+                                   session.getVirtualHost().getName(),
+                                   session.getChannel());
+        }
     }
 
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=1228748&r1=1228747&r2=1228748&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Sat Jan  7 22:47:17 2012
@@ -148,7 +148,7 @@ public abstract class ApplicationRegistr
 
         BrokerConfig broker = new BrokerConfigAdapter(instance);
 
-        SystemConfig system = (SystemConfig) store.getRoot();
+        SystemConfig system = store.getRoot();
         system.addBroker(broker);
         instance.setBroker(broker);
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java?rev=1228748&r1=1228747&r2=1228748&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java Sat Jan  7 22:47:17 2012
@@ -165,7 +165,6 @@ public class BrokerConfigAdapter impleme
     /**
      * @see org.apache.qpid.server.configuration.BrokerConfig#getFeatures()
      */
-    @Override
     public List<String> getFeatures()
     {
         final List<String> features = new ArrayList<String>();
@@ -176,4 +175,16 @@ public class BrokerConfigAdapter impleme
 
         return Collections.unmodifiableList(features);
     }
+
+    @Override
+    public String toString()
+    {
+        return "BrokerConfigAdapter{" +
+               "_id=" + _id +
+               ", _system=" + _system +
+               ", _vhosts=" + _vhosts +
+               ", _createTime=" + _createTime +
+               ", _federationTag='" + _federationTag + '\'' +
+               '}';
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java?rev=1228748&r1=1228747&r2=1228748&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java Sat Jan  7 22:47:17 2012
@@ -21,6 +21,9 @@
 package org.apache.qpid.server.store;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.UUID;
+
 import org.apache.qpid.framing.FieldTable;
 
 public interface ConfigurationRecoveryHandler
@@ -42,7 +45,19 @@ public interface ConfigurationRecoveryHa
     public static interface BindingRecoveryHandler
     {
         void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf);
-        void completeBindingRecovery();
+        BrokerLinkRecoveryHandler completeBindingRecovery();
+    }
+    
+    public static interface BrokerLinkRecoveryHandler
+    {
+        BridgeRecoveryHandler brokerLink(UUID id, long createTime, Map<String,String> arguments);
+        void completeBrokerLinkRecovery();
+    }
+    
+    public static interface BridgeRecoveryHandler
+    {
+        void bridge(UUID id, long createTime, Map<String,String> arguments);
+        void completeBridgeRecoveryForLink();
     }
 
     public static interface QueueEntryRecoveryHandler

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=1228748&r1=1228747&r2=1228748&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Sat Jan  7 22:47:17 2012
@@ -21,7 +21,9 @@
 package org.apache.qpid.server.store;
 
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.lang.ref.SoftReference;
@@ -36,7 +38,10 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -47,13 +52,14 @@ import org.apache.qpid.AMQStoreException
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.federation.Bridge;
+import org.apache.qpid.server.federation.BrokerLink;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
 import org.apache.qpid.server.logging.messages.MessageStoreMessages;
 import org.apache.qpid.server.logging.messages.TransactionLogMessages;
 import org.apache.qpid.server.message.EnqueableMessage;
-import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 
 /**
@@ -82,6 +88,10 @@ public class DerbyMessageStore implement
     private static final String META_DATA_TABLE_NAME = "QPID_META_DATA";
     private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT";
 
+    private static final String LINKS_TABLE_NAME = "QPID_LINKS";
+    private static final String BRIDGES_TABLE_NAME = "QPID_BRIDGES";
+
+
     private static final int DB_VERSION = 3;
 
 
@@ -137,6 +147,49 @@ public class DerbyMessageStore implement
     private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
     private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME;
 
+    private static final String CREATE_LINKS_TABLE =
+            "CREATE TABLE "+LINKS_TABLE_NAME+" ( id_lsb bigint not null,"
+                                            + " id_msb bigint not null,"
+                                             + " create_time bigint not null,"
+                                             + " arguments blob,  PRIMARY KEY ( id_lsb, id_msb ))";
+    private static final String SELECT_FROM_LINKS =
+            "SELECT create_time, arguments FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and id_msb";
+    private static final String DELETE_FROM_LINKS = "DELETE FROM " + LINKS_TABLE_NAME 
+                                                    + " WHERE id_lsb = ? and id_msb = ?";
+    private static final String SELECT_ALL_FROM_LINKS = "SELECT id_lsb, id_msb, create_time, "
+                                                        + "arguments FROM " + LINKS_TABLE_NAME;
+    private static final String FIND_LINK = "SELECT id_lsb, id_msb FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and"
+                                            + " id_msb = ?";
+    private static final String INSERT_INTO_LINKS = "INSERT INTO " + LINKS_TABLE_NAME + "( id_lsb, "
+                                                  + "id_msb, create_time, arguments ) values (?, ?, ?, ?)";
+
+
+    private static final String CREATE_BRIDGES_TABLE =
+            "CREATE TABLE "+BRIDGES_TABLE_NAME+" ( id_lsb bigint not null,"
+            + " id_msb bigint not null,"
+            + " create_time bigint not null,"
+            + " link_id_lsb bigint not null,"
+            + " link_id_msb bigint not null,"
+            + " arguments blob,  PRIMARY KEY ( id_lsb, id_msb ))";
+    private static final String SELECT_FROM_BRIDGES =
+            "SELECT create_time, link_id_lsb, link_id_msb, arguments FROM " 
+            + BRIDGES_TABLE_NAME + " WHERE id_lsb = ? and id_msb = ?";
+    private static final String DELETE_FROM_BRIDGES = "DELETE FROM " + BRIDGES_TABLE_NAME 
+                                                      + " WHERE id_lsb = ? and id_msb = ?";
+    private static final String SELECT_ALL_FROM_BRIDGES = "SELECT id_lsb, id_msb, " 
+                                                          + " create_time," 
+                                                          + " link_id_lsb, link_id_msb, "
+                                                        + "arguments FROM " + BRIDGES_TABLE_NAME
+                                                        + " WHERE link_id_lsb = ? and link_id_msb = ?";
+    private static final String FIND_BRIDGE = "SELECT id_lsb, id_msb FROM " + BRIDGES_TABLE_NAME +
+                                              " WHERE id_lsb = ? and id_msb = ?";
+    private static final String INSERT_INTO_BRIDGES = "INSERT INTO " + BRIDGES_TABLE_NAME + "( id_lsb, id_msb, "
+                                                    + "create_time, "
+                                                    + "link_id_lsb, link_id_msb, "
+                                                    + "arguments )"
+                                                    + " values (?, ?, ?, ?, ?, ?)";
+
+
     private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
 
 
@@ -294,6 +347,8 @@ public class DerbyMessageStore implement
         createQueueEntryTable(conn);
         createMetaDataTable(conn);
         createMessageContentTable(conn);
+        createLinkTable(conn);
+        createBridgeTable(conn);
 
         conn.close();
     }
@@ -430,6 +485,40 @@ public class DerbyMessageStore implement
 
     }
 
+    private void createLinkTable(final Connection conn) throws SQLException
+    {
+        if(!tableExists(LINKS_TABLE_NAME, conn))
+        {
+            Statement stmt = conn.createStatement();
+            try
+            {
+                stmt.execute(CREATE_LINKS_TABLE);
+            }
+            finally
+            {
+                stmt.close();
+            }
+        }
+    }
+
+
+    private void createBridgeTable(final Connection conn) throws SQLException
+    {
+        if(!tableExists(BRIDGES_TABLE_NAME, conn))
+        {
+            Statement stmt = conn.createStatement();
+            try
+            {
+                stmt.execute(CREATE_BRIDGES_TABLE);
+            }
+            finally
+            {
+                stmt.close();
+            }
+        }
+    }
+
+
 
 
     private boolean tableExists(final String tableName, final Connection conn) throws SQLException
@@ -470,7 +559,8 @@ public class DerbyMessageStore implement
             List<String> exchanges = loadExchanges(erh);
             ConfigurationRecoveryHandler.BindingRecoveryHandler brh = erh.completeExchangeRecovery();
             recoverBindings(brh, exchanges);
-            brh.completeBindingRecovery();
+            ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
+            recoverBrokerLinks(lrh);
         }
         catch (SQLException e)
         {
@@ -481,6 +571,144 @@ public class DerbyMessageStore implement
 
     }
 
+    private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh)
+            throws SQLException
+    {
+        _logger.info("Recovering broker links...");
+
+        Connection conn = null;
+        try
+        {
+            conn = newAutoCommitConnection();
+
+            PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_LINKS);
+
+            try
+            {
+                ResultSet rs = stmt.executeQuery();
+
+                try
+                {
+
+                    while(rs.next())
+                    {
+                        UUID id  = new UUID(rs.getLong(2), rs.getLong(1));
+                        long createTime = rs.getLong(3);
+                        Blob argumentsAsBlob = rs.getBlob(4);
+
+                        byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length());
+                        
+                        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes));
+                        int size = dis.readInt();
+                        
+                        Map<String,String> arguments = new HashMap<String, String>();
+                        
+                        for(int i = 0; i < size; i++)
+                        {
+                            arguments.put(dis.readUTF(), dis.readUTF());
+                        }
+
+                        ConfigurationRecoveryHandler.BridgeRecoveryHandler brh = lrh.brokerLink(id, createTime, arguments);
+
+                        recoverBridges(brh, id);
+
+                    }
+                }
+                catch (IOException e)
+                {
+                    throw new SQLException(e.getMessage(), e);
+                }
+                finally
+                {
+                    rs.close();
+                }
+            }
+            finally
+            {
+                stmt.close();
+            }
+
+        }
+        finally
+        {
+            if(conn != null)
+            {
+                conn.close();
+            }
+        }
+
+    }
+
+    private void recoverBridges(final ConfigurationRecoveryHandler.BridgeRecoveryHandler brh, final UUID linkId)
+            throws SQLException
+    {
+        _logger.info("Recovering bridges for link " + linkId + "...");
+
+        Connection conn = null;
+        try
+        {
+            conn = newAutoCommitConnection();
+
+            PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_BRIDGES);
+            stmt.setLong(1, linkId.getLeastSignificantBits());
+            stmt.setLong(2, linkId.getMostSignificantBits());
+
+
+            try
+            {
+                ResultSet rs = stmt.executeQuery();
+
+                try
+                {
+
+                    while(rs.next())
+                    {
+                        UUID id  = new UUID(rs.getLong(2), rs.getLong(1));
+                        long createTime = rs.getLong(3);
+                        Blob argumentsAsBlob = rs.getBlob(6);
+
+                        byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length());
+
+                        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes));
+                        int size = dis.readInt();
+
+                        Map<String,String> arguments = new HashMap<String, String>();
+
+                        for(int i = 0; i < size; i++)
+                        {
+                            arguments.put(dis.readUTF(), dis.readUTF());
+                        }
+
+                        brh.bridge(id, createTime, arguments);
+
+                    }
+                    brh.completeBridgeRecoveryForLink();
+                }
+                catch (IOException e)
+                {
+                    throw new SQLException(e.getMessage(), e);
+                }
+                finally
+                {
+                    rs.close();
+                }
+            }
+            finally
+            {
+                stmt.close();
+            }
+
+        }
+        finally
+        {
+            if(conn != null)
+            {
+                conn.close();
+            }
+        }
+
+    }
+
     private void loadQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException
     {
         Connection conn = newAutoCommitConnection();
@@ -1191,6 +1419,233 @@ public class DerbyMessageStore implement
 
     }
 
+    public void createBrokerLink(final BrokerLink link) throws AMQStoreException
+    {
+        _logger.debug("public void createBrokerLink(BrokerLink = " + link + "): called");
+
+        if (_state != State.RECOVERING)
+        {
+            try
+            {
+                Connection conn = newAutoCommitConnection();
+
+                PreparedStatement stmt = conn.prepareStatement(FIND_LINK);
+                try
+                {
+                    
+                    stmt.setLong(1, link.getId().getLeastSignificantBits());
+                    stmt.setLong(2, link.getId().getMostSignificantBits());
+                    ResultSet rs = stmt.executeQuery();
+                    try
+                    {
+
+                        // If we don't have any data in the result set then we can add this queue
+                        if (!rs.next())
+                        {
+                            PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_LINKS);
+
+                            try
+                            {
+                                
+                                insertStmt.setLong(1, link.getId().getLeastSignificantBits());
+                                insertStmt.setLong(2, link.getId().getMostSignificantBits());
+                                insertStmt.setLong(3, link.getCreateTime());
+
+                                byte[] argumentBytes = convertStringMapToBytes(link.getArguments());
+                                ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes);
+
+                                insertStmt.setBinaryStream(4,bis,argumentBytes.length);
+
+                                insertStmt.execute();
+                            }
+                            finally
+                            {
+                                insertStmt.close();
+                            }
+                        }
+                    }
+                    finally
+                    {
+                        rs.close();
+                    }
+                }
+                finally
+                {
+                    stmt.close();
+                }
+                conn.close();
+
+            }
+            catch (SQLException e)
+            {
+                throw new AMQStoreException("Error writing " + link + " to database: " + e.getMessage(), e);
+            }
+        }
+    }
+
+    private byte[] convertStringMapToBytes(final Map<String, String> arguments) throws AMQStoreException
+    {
+        byte[] argumentBytes;
+        if(arguments == null)
+        {
+            argumentBytes = new byte[0];
+        }
+        else
+        {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream(bos);
+
+
+            try
+            {
+                dos.writeInt(arguments.size());
+                for(Map.Entry<String,String> arg : arguments.entrySet())
+                {
+                    dos.writeUTF(arg.getKey());
+                    dos.writeUTF(arg.getValue());
+                }
+            }
+            catch (IOException e)
+            {
+                // This should never happen
+                throw new AMQStoreException(e.getMessage(), e);
+            }
+            argumentBytes = bos.toByteArray();
+        }
+        return argumentBytes;
+    }
+
+    public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
+    {
+        _logger.debug("public void deleteBrokerLink( " + link + "): called");
+        Connection conn = null;
+        PreparedStatement stmt = null;
+        try
+        {
+            conn = newAutoCommitConnection();
+            stmt = conn.prepareStatement(DELETE_FROM_LINKS);
+            stmt.setLong(1, link.getId().getLeastSignificantBits());
+            stmt.setLong(2, link.getId().getMostSignificantBits());
+            int results = stmt.executeUpdate();
+
+            if (results == 0)
+            {
+                throw new AMQStoreException("Link " + link + " not found");
+            }
+        }
+        catch (SQLException e)
+        {
+            throw new AMQStoreException("Error deleting Link " + link + " from database: " + e.getMessage(), e);
+        }
+        finally
+        {
+            closePreparedStatement(stmt);
+            closeConnection(conn);
+        }
+
+
+    }
+
+    public void createBridge(final Bridge bridge) throws AMQStoreException
+    {
+        _logger.debug("public void createBridge(BrokerLink = " + bridge + "): called");
+
+        if (_state != State.RECOVERING)
+        {
+            try
+            {
+                Connection conn = newAutoCommitConnection();
+
+                PreparedStatement stmt = conn.prepareStatement(FIND_BRIDGE);
+                try
+                {
+
+                    UUID id = bridge.getId();
+                    stmt.setLong(1, id.getLeastSignificantBits());
+                    stmt.setLong(2, id.getMostSignificantBits());
+                    ResultSet rs = stmt.executeQuery();
+                    try
+                    {
+
+                        // If we don't have any data in the result set then we can add this queue
+                        if (!rs.next())
+                        {
+                            PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_BRIDGES);
+
+                            try
+                            {
+
+                                insertStmt.setLong(1, id.getLeastSignificantBits());
+                                insertStmt.setLong(2, id.getMostSignificantBits());
+
+                                insertStmt.setLong(3, bridge.getCreateTime());
+
+                                UUID linkId = bridge.getLink().getId();
+                                insertStmt.setLong(4, linkId.getLeastSignificantBits());
+                                insertStmt.setLong(5, linkId.getMostSignificantBits());
+
+                                byte[] argumentBytes = convertStringMapToBytes(bridge.getArguments());
+                                ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes);
+
+                                insertStmt.setBinaryStream(6,bis,argumentBytes.length);
+
+                                insertStmt.execute();
+                            }
+                            finally
+                            {
+                                insertStmt.close();
+                            }
+                        }
+                    }
+                    finally
+                    {
+                        rs.close();
+                    }
+                }
+                finally
+                {
+                    stmt.close();
+                }
+                conn.close();
+
+            }
+            catch (SQLException e)
+            {
+                throw new AMQStoreException("Error writing " + bridge + " to database: " + e.getMessage(), e);
+            }
+        }
+    }
+
+    public void deleteBridge(final Bridge bridge) throws AMQStoreException
+    {
+        _logger.debug("public void deleteBridge( " + bridge + "): called");
+        Connection conn = null;
+        PreparedStatement stmt = null;
+        try
+        {
+            conn = newAutoCommitConnection();
+            stmt = conn.prepareStatement(DELETE_FROM_BRIDGES);
+            stmt.setLong(1, bridge.getId().getLeastSignificantBits());
+            stmt.setLong(2, bridge.getId().getMostSignificantBits());
+            int results = stmt.executeUpdate();
+
+            if (results == 0)
+            {
+                throw new AMQStoreException("Bridge " + bridge + " not found");
+            }
+        }
+        catch (SQLException e)
+        {
+            throw new AMQStoreException("Error deleting bridge " + bridge + " from database: " + e.getMessage(), e);
+        }
+        finally
+        {
+            closePreparedStatement(stmt);
+            closeConnection(conn);
+        }
+
+    }
+
     public Transaction newTransaction()
     {
         return new DerbyTransaction();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java?rev=1228748&r1=1228747&r2=1228748&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java Sat Jan  7 22:47:17 2012
@@ -25,6 +25,8 @@ import org.apache.qpid.AMQStoreException
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.federation.Bridge;
+import org.apache.qpid.server.federation.BrokerLink;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.queue.AMQQueue;
 
@@ -128,4 +130,12 @@ public interface DurableConfigurationSto
      * @throws AMQStoreException If the operation fails for any reason.
      */
     void updateQueue(AMQQueue queue) throws AMQStoreException;
+    
+    void createBrokerLink(BrokerLink link) throws AMQStoreException;
+    
+    void deleteBrokerLink(BrokerLink link) throws AMQStoreException;
+    
+    void createBridge(Bridge bridge) throws AMQStoreException;
+    
+    void deleteBridge(Bridge bridge) throws AMQStoreException;
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message