qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tr...@apache.org
Subject svn commit: r787325 [2/2] - in /qpid/trunk/qpid/java: ./ management/console/ management/console/src/ management/console/src/main/ management/console/src/main/java/ management/console/src/main/java/org/ management/console/src/main/java/org/apache/ manag...
Date Mon, 22 Jun 2009 17:43:55 GMT
Added: qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/SequenceManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/SequenceManager.java?rev=787325&view=auto
==============================================================================
--- qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/SequenceManager.java
(added)
+++ qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/SequenceManager.java
Mon Jun 22 17:43:54 2009
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.console;
+
+import java.util.HashMap;
+
+public class SequenceManager
+{
+    private long sequence = 0;
+    private HashMap<Long, Object> pending = new HashMap<Long, Object>();
+    private Object lockObject = new Object();
+
+    public SequenceManager()
+    {
+    }
+
+    public Object release(long seq)
+    {
+        Object returnValue = null;
+        synchronized (lockObject)
+        {
+            returnValue = pending.get(seq);
+            pending.remove(seq);
+        }
+        return returnValue;
+    }
+
+    public long reserve(Object data)
+    {
+        long returnValue = 0;
+        synchronized (lockObject)
+        {
+            returnValue = sequence;
+            sequence += 1;
+            pending.put(returnValue, data);
+        }
+        return returnValue;
+    }
+}
\ No newline at end of file

Added: qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/Session.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/Session.java?rev=787325&view=auto
==============================================================================
--- qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/Session.java
(added)
+++ qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/Session.java
Mon Jun 22 17:43:54 2009
@@ -0,0 +1,1007 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.console;//
+
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import javax.jms.Message;
+
+import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.transport.codec.BBEncoder;
+import org.apache.qpid.transport.codec.Decoder;
+import org.apache.qpid.transport.codec.Encoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Session
+{
+    private static Logger log = LoggerFactory.getLogger(Session.class);
+    public static int CONTEXT_SYNC = 1;
+    public static int CONTEXT_STARTUP = 2;
+    public static int CONTEXT_MULTIGET = 3;
+    public static int DEFAULT_GET_WAIT_TIME = 60000;
+    public boolean recieveObjects = true;
+    public boolean recieveEvents = true;
+    public boolean recieveHeartbeat = true;
+    public boolean userBindings = false;
+    public Console console;
+    protected HashMap<String, HashMap<String, SchemaClass>> packages = new HashMap<String,
HashMap<String, SchemaClass>>();
+    protected ArrayList<Broker> brokers = new ArrayList<Broker>();
+    protected SequenceManager sequenceManager = new SequenceManager();
+    protected Object lockObject = new Object();
+    protected ArrayList<Long> syncSequenceList = new ArrayList<Long>();
+    protected ArrayList<QMFObject> getResult;
+    protected Object syncResult;
+
+    public Session()
+    {
+    }
+
+    public Session(Console console)
+    {
+        this.console = console;
+    }
+
+    public void addBroker(String url)
+    {
+        Broker broker = new Broker(this, url);
+        brokers.add(broker);
+        java.util.HashMap<String, Object> args = new java.util.HashMap<String, Object>();
+        args.put("_class", "agent");
+        args.put("_broker", broker);
+        this.getObjects(args);
+    }
+
+    public ArrayList<String> bindingKeys()
+    {
+        ArrayList<String> bindings = new ArrayList<String>();
+        bindings.add("schema.#");
+        if (recieveObjects & recieveEvents & recieveHeartbeat & !userBindings)
+        {
+            bindings.add("console.#");
+        } else
+        {
+            if (recieveObjects & !userBindings)
+            {
+                bindings.add("console.obj.#");
+            } else
+            {
+                bindings.add("console.obj.*.*.org.apache.qpid.broker.agent");
+            }
+            if (recieveEvents)
+            {
+                bindings.add("console.event.#");
+            }
+            if (recieveHeartbeat)
+            {
+                bindings.add("console.heartbeat.#");
+            }
+        }
+        return bindings;
+    }
+
+    public void close()
+    {
+        for (Broker broker : brokers.toArray(new Broker[0]))
+        {
+            this.removeBroker(broker);
+        }
+    }
+
+    protected QMFObject createQMFObject(SchemaClass schema,
+            boolean hasProperties, boolean hasStats, boolean isManaged)
+    {
+        Class realClass = QMFObject.class;
+        if (console != null)
+        {
+            realClass = console.typeMapping(schema.getKey());
+        }
+        Class[] types = new Class[]
+        { Session.class, SchemaClass.class, boolean.class, boolean.class,
+                boolean.class };
+        Object[] args = new Object[]
+        { this, schema, hasProperties, hasStats, isManaged };
+        try
+        {
+            Constructor ci = realClass.getConstructor(types);
+            return (QMFObject) ci.newInstance(args);
+        } catch (Exception e)
+        {
+            throw new ConsoleException(e);
+        }
+    }
+
+    protected QMFObject createQMFObject(SchemaClass schema, Decoder dec,
+            boolean hasProperties, boolean hasStats, boolean isManaged)
+    {
+        Class realClass = QMFObject.class;
+        if (console != null)
+        {
+            realClass = console.typeMapping(schema.getKey());
+        }
+        Class[] types = new Class[]
+        { Session.class, SchemaClass.class, Decoder.class, boolean.class,
+                boolean.class, boolean.class };
+        Object[] args = new Object[]
+        { this, schema, dec, hasProperties, hasStats, isManaged };
+        try
+        {
+            Constructor ci = realClass.getConstructor(types);
+            return (QMFObject) ci.newInstance(args);
+        } catch (Exception e)
+        {
+            throw new ConsoleException(e);
+        }
+    }
+
+    public Object decodeValue(Decoder dec, short type)
+    {
+        switch (type)
+        {
+        case 1: // U8
+            return dec.readUint8();
+        case 2: // U16
+            return dec.readUint16();
+        case 3: // U32
+            return dec.readUint32();
+        case 4: // U64
+            return dec.readUint64();
+        case 6: // SSTR
+            return dec.readStr8();
+        case 7: // LSTR
+            return dec.readStr16();
+        case 8: // ABSTIME
+            return dec.readDatetime();
+        case 9: // DELTATIME
+            return dec.readUint32();
+        case 10: // ref
+            return new ObjectID(dec);
+        case 11: // bool
+            return dec.readUint8() != 0;
+        case 12: // float
+            return dec.readFloat();
+        case 13: // double
+            return dec.readDouble();
+        case 14: // UUID
+            return dec.readUuid();
+        case 15: // Ftable
+            java.util.HashMap<String, Object> ftable = new java.util.HashMap<String,
Object>();
+            BBDecoder sc = new BBDecoder();
+            sc.init(ByteBuffer.wrap(dec.readVbin32()));
+            if (sc.hasRemaining())
+            {
+                long count = sc.readUint32();
+                while (count > 0)
+                {
+                    String key = sc.readStr8();
+                    short code = sc.readUint8();
+                    Object newValue = this.decodeValue(sc, code);
+                    ftable.put(key, newValue);
+                    count -= 1;
+                }
+            }
+            return ftable;
+        case 16: // int8
+            return dec.readInt8();
+        case 17: // int16
+            return dec.readInt16();
+        case 18: // int32
+            return dec.readInt32();
+        case 19: // int64
+            return dec.readInt64();
+        case 20: // Object
+            // Peek into the inner type code, make sure
+            // it is actually an object
+            Object returnValue = null;
+            short innerTypeCode = dec.readUint8();
+            if (innerTypeCode != 20)
+            {
+                returnValue = this.decodeValue(dec, innerTypeCode);
+            } else
+            {
+                ClassKey classKey = new ClassKey(dec);
+                synchronized (lockObject)
+                {
+                    SchemaClass sClass = getSchema(classKey);
+                    if (sClass != null)
+                    {
+                        returnValue = this.createQMFObject(sClass, dec, true,
+                                true, false);
+                    }
+                }
+            }
+            return returnValue;
+        case 21: // List
+            BBDecoder lDec = new BBDecoder();
+            lDec.init(ByteBuffer.wrap(dec.readVbin32()));
+            long count = lDec.readUint32();
+            ArrayList<Object> newList = new ArrayList<Object>();
+            while (count > 0)
+            {
+                short innerType = lDec.readUint8();
+                newList.add(this.decodeValue(lDec, innerType));
+                count -= 1;
+            }
+            return newList;
+        case 22: // Array
+            BBDecoder aDec = new BBDecoder();
+            aDec.init(ByteBuffer.wrap(dec.readVbin32()));
+            long cnt = aDec.readUint32();
+            short innerType = aDec.readUint8();
+            ArrayList<Object> aList = new ArrayList<Object>();
+            while (cnt > 0)
+            {
+                aList.add(this.decodeValue(aDec, innerType));
+                cnt -= 1;
+            }
+            return aList;
+        default:
+            throw new ConsoleException(String.format("Invalid Type Code: %s",
+                    type));
+        }
+    }
+
+    public void encodeValue(Encoder enc, short type, Object val)
+    {
+        try
+        {
+            switch (type)
+            {
+            case 1: // U8
+                enc.writeUint8(((Short) val).shortValue());
+                break;
+            case 2: // U16
+                enc.writeUint16(((Integer) val).intValue());
+                break;
+            case 3: // U32
+                enc.writeUint32(((Long) val).longValue());
+                break;
+            case 4: // U64
+                enc.writeUint64(((Long) val).longValue());
+                break;
+            case 6: // SSTR
+                enc.writeStr8((String) val);
+                break;
+            case 7: // LSTR
+                enc.writeStr16((String) val);
+                break;
+            case 8: // ABSTIME
+                enc.writeDatetime(((Long) val).longValue());
+                break;
+            case 9: // DELTATIME
+                enc.writeUint32(((Long) val).longValue());
+                break;
+            case 10: // ref
+                ((ObjectID) val).encode(enc);
+                break;
+            case 11:
+                if (((Boolean) val).booleanValue())
+                {
+                    enc.writeUint8((short) 1);
+                } else
+                {
+                    enc.writeUint8((short) 0);
+                }
+                break;
+            case 12: // FLOAT
+                enc.writeFloat(((Float) val).floatValue());
+                break;
+            case 13: // DOUBLE
+                enc.writeDouble(((Double) val).doubleValue());
+                break;
+            case 14: // UUID
+                enc.writeUuid((UUID) val);
+                break;
+            case 15: // Ftable
+                Map<String, Object> ftable = (Map<String, Object>) val;
+                BBEncoder sc = new BBEncoder(1);
+                sc.init();
+                sc.writeUint32(ftable.size());
+                for (String key : ftable.keySet())
+                {
+                    Object obj = ftable.get(key);
+                    short innerType = Util.qmfType(obj);
+                    sc.writeStr8(key);
+                    sc.writeUint8(innerType);
+                    this.encodeValue(sc, innerType, obj);
+                }
+                byte[] bytes = sc.segment().array();
+                enc.writeVbin32(bytes);
+                break;
+            case 16: // int8
+                enc.writeInt8((Byte) val);
+                break;
+            case 17: // int16
+                enc.writeInt16((Short) val);
+                break;
+            case 18: // int32
+                enc.writeInt32((Integer) val);
+                break;
+            case 19: // int64
+                enc.writeInt64((Long) val);
+                break;
+            case 20: // Object
+                // Check that the object has a session, if not
+                // take ownership of it
+                QMFObject qObj = (QMFObject) val;
+                if (qObj.getSession() == null)
+                {
+                    qObj.setSession(this);
+                }
+                qObj.encode(enc);
+                break;
+            case 21: // List
+                ArrayList<Object> items = (ArrayList<Object>) val;
+                BBEncoder lEnc = new BBEncoder(1);
+                lEnc.init();
+                lEnc.writeUint32(items.size());
+                for (Object obj : items)
+                {
+                    short innerType = Util.qmfType(obj);
+                    lEnc.writeUint8(innerType);
+                    this.encodeValue(lEnc, innerType, obj);
+                }
+                enc.writeVbin32(lEnc.segment().array());
+                break;
+            case 22: // Array
+                ArrayList<Object> aItems = (ArrayList<Object>) val;
+                BBEncoder aEnc = new BBEncoder(1);
+                aEnc.init();
+                long aCount = aItems.size();
+                aEnc.writeUint32(aCount);
+                if (aCount > 0)
+                {
+                    Object anObj = aItems.get(0);
+                    short innerType = Util.qmfType(anObj);
+                    aEnc.writeUint8(innerType);
+                    for (Object obj : aItems)
+                    {
+                        this.encodeValue(aEnc, innerType, obj);
+                    }
+                }
+                enc.writeVbin32(aEnc.segment().array());
+                break;
+            default:
+                throw new ConsoleException(String.format(
+                        "Invalid Type Code: %s", type));
+            }
+        } catch (ClassCastException e)
+        {
+            String msg = String.format(
+                    "Class cast exception for typecode %s, type %s ", type, val
+                            .getClass());
+            log.error(msg);
+            throw new ConsoleException(msg + type, e);
+        }
+    }
+
+    public Broker getBroker(long BrokerBank)
+    {
+        Broker returnValue = null;
+        for (Broker broker : brokers)
+        {
+            if (broker.brokerBank() == BrokerBank)
+            {
+                returnValue = broker;
+                break;
+            }
+        }
+        return returnValue;
+    }
+
+    public ArrayList<ClassKey> getClasses(String packageName)
+    {
+        ArrayList<ClassKey> returnValue = new ArrayList<ClassKey>();
+        this.waitForStable();
+        if (packages.containsKey(packageName))
+        {
+            for (SchemaClass sClass : packages.get(packageName).values())
+            {
+                returnValue.add(sClass.getKey());
+            }
+        }
+        return returnValue;
+    }
+
+    public ArrayList<QMFObject> getObjects(
+            java.util.HashMap<String, Object> args)
+    {
+        ArrayList<Broker> brokerList = null;
+        ArrayList<Agent> agentList = new ArrayList<Agent>();
+        if (args.containsKey("_broker"))
+        {
+            brokerList = new ArrayList<Broker>();
+            brokerList.add((Broker) args.get("_broker"));
+        } else
+        {
+            brokerList = this.brokers;
+        }
+        for (Broker broker : brokerList)
+        {
+            broker.waitForStable();
+        }
+        if (args.containsKey("_agent"))
+        {
+            Agent agent = (Agent) args.get("_agent");
+            if (brokerList.contains(agent.getBroker()))
+            {
+                agentList.add(agent);
+            } else
+            {
+                throw new ConsoleException(
+                        "Agent is not managed by this console or the supplied broker");
+            }
+        } else
+        {
+            if (args.containsKey("_objectId"))
+            {
+                ObjectID oid = (ObjectID) args.get("_objectId");
+                for (Broker broker : brokers)
+                {
+                    for (Agent agent : broker.Agents.values())
+                    {
+                        if ((agent.getAgentBank() == oid.agentBank())
+                                && (agent.getBrokerBank() == oid.brokerBank()))
+                        {
+                            agentList.add(agent);
+                        }
+                    }
+                }
+            } else
+            {
+                for (Broker broker : brokerList)
+                {
+                    for (Agent agent : broker.Agents.values())
+                    {
+                        if (agent.getBroker().isConnected())
+                        {
+                            agentList.add(agent);
+                        }
+                    }
+                }
+            }
+        }
+        getResult = new ArrayList<QMFObject>();
+        if (agentList.size() > 0)
+        {
+            // FIXME Add a bunch of other suff too
+            for (Agent agent : agentList)
+            {
+                HashMap<String, Object> getParameters = new HashMap<String, Object>();
+                Broker broker = agent.getBroker();
+                long seq = -1;
+                synchronized (lockObject)
+                {
+                    seq = sequenceManager.reserve(Session.CONTEXT_MULTIGET);
+                    syncSequenceList.add(seq);
+                }
+                String packageName = (String) args.get("_package");
+                String className = (String) args.get("_class");
+                ClassKey key = (ClassKey) args.get("_key");
+                Object sClass = args.get("_schema");
+                Object oid = args.get("_objectID");
+                long[] hash = (long[]) args.get("_hash");
+                if ((className == null) && (oid == null) && (oid == null))
+                {
+                    throw new ConsoleException(
+                            "No class supplied, use '_schema', '_key', '_class', or '_objectId'
argument");
+                }
+                if (oid != null)
+                {
+                    getParameters.put("_objectID", oid);
+                } else
+                {
+                    if (sClass != null)
+                    {
+                        key = (key != null) ? key : ((SchemaClass) sClass)
+                                .getKey();
+                    }
+                    if (key != null)
+                    {
+                        className = (className != null) ? className : key
+                                .getClassName();
+                        packageName = (packageName != null) ? packageName : key
+                                .getPackageName();
+                        hash = (hash != null) ? hash : key.getHash();
+                    }
+                    if (packageName != null)
+                    {
+                        getParameters.put("_package", packageName);
+                    }
+                    if (className != null)
+                    {
+                        getParameters.put("_class", className);
+                    }
+                    if (hash != null)
+                    {
+                        getParameters.put("_hash", hash);
+                    }
+                    for (java.util.Map.Entry<String, Object> pair : args
+                            .entrySet())
+                    {
+                        if (!pair.getKey().startsWith("_"))
+                        {
+                            getParameters.put(pair.getKey(), pair.getValue());
+                        }
+                    }
+                }
+                Encoder enc = broker.createEncoder('G', seq);
+                enc.writeMap(getParameters);
+                String routingKey = agent.routingCode();
+                Message msg = broker.createMessage(enc);
+                log.debug("Get Object Keys: ");
+                for (String pKey : getParameters.keySet())
+                {
+                    log.debug(String.format("\tKey: '%s' Value: '%s'", pKey,
+                            getParameters.get(pKey)));
+                }
+                broker.send(msg, routingKey);
+            }
+            int waittime = DEFAULT_GET_WAIT_TIME;
+            boolean timeout = false;
+            if (args.containsKey("_timeout"))
+            {
+                waittime = (Integer) args.get("_timeout");
+            }
+            long start = System.currentTimeMillis();
+            synchronized (lockObject)
+            {
+                // FIXME ERROR
+                while (syncSequenceList.size() > 0)
+                {
+                    try
+                    {
+                        lockObject.wait(waittime);
+                    } catch (InterruptedException e)
+                    {
+                        throw new ConsoleException(e);
+                    }
+                    long duration = System.currentTimeMillis() - start;
+                    if (duration > waittime)
+                    {
+                        for (long pendingSeq : syncSequenceList)
+                        {
+                            sequenceManager.release(pendingSeq);
+                        }
+                        syncSequenceList.clear();
+                        timeout = true;
+                    }
+                }
+            }
+            // FIXME Add the error logic
+            if ((getResult.isEmpty()) && timeout)
+            {
+                throw new ConsoleException("Get Request timed out");
+            }
+        }
+        return getResult;
+    }
+
+    public ArrayList<String> getPackages()
+    {
+        this.waitForStable();
+        ArrayList<String> returnValue = new ArrayList<String>();
+        for (String name : packages.keySet())
+        {
+            returnValue.add(name);
+        }
+        return returnValue;
+    }
+
+    public SchemaClass getSchema(ClassKey key)
+    {
+        return getSchema(key, true);
+    }
+
+    protected SchemaClass getSchema(ClassKey key, boolean waitForStable)
+    {
+        if (waitForStable)
+        {
+            this.waitForStable();
+        }
+        SchemaClass returnValue = null;
+        returnValue = packages.get(key.getPackageName())
+                .get(key.getKeyString());
+        return returnValue;
+    }
+
+    public void handleAgentRemoved(Agent agent)
+    {
+        if (console != null)
+        {
+            console.agentRemoved(agent);
+        }
+    }
+
+    public void handleBrokerConnect(Broker broker)
+    {
+        if (console != null)
+        {
+            console.brokerConnected(broker);
+        }
+    }
+
+    public void handleBrokerDisconnect(Broker broker)
+    {
+        if (console != null)
+        {
+            console.brokerDisconnected(broker);
+        }
+    }
+
+    public void handleBrokerResponse(Broker broker, Decoder decoder,
+            long sequence)
+    {
+        if (console != null)
+        {
+            console.brokerInformation(broker);
+        }
+        long seq = sequenceManager.reserve(CONTEXT_STARTUP);
+        Encoder encoder = broker.createEncoder('P', seq);
+        broker.send(encoder);
+    }
+
+    public void handleClassIndicator(Broker broker, Decoder decoder,
+            long sequence)
+    {
+        short kind = decoder.readUint8();
+        ClassKey classKey = new ClassKey(decoder);
+        boolean unknown = false;
+        synchronized (lockObject)
+        {
+            if (packages.containsKey(classKey.getPackageName()))
+            {
+                if (!packages.get(classKey.getPackageName()).containsKey(
+                        classKey.getKeyString()))
+                {
+                    unknown = true;
+                }
+            }
+        }
+        if (unknown)
+        {
+            broker.incrementOutstanding();
+            long seq = sequenceManager.reserve(Session.CONTEXT_STARTUP);
+            Encoder enc = broker.createEncoder('S', seq);
+            classKey.encode(enc);
+            broker.send(enc);
+        }
+    }
+
+    public void handleCommandComplete(Broker broker, Decoder decoder,
+            long sequence)
+    {
+        long code = decoder.readUint32();
+        String text = decoder.readStr8();
+        Object context = this.sequenceManager.release(sequence);
+        if (context.equals(CONTEXT_STARTUP))
+        {
+            broker.decrementOutstanding();
+        } else
+        {
+            if ((context.equals(CONTEXT_SYNC)) & broker.getSyncInFlight())
+            {
+                broker.setSyncInFlight(false);
+            } else
+            {
+                if (context.equals(CONTEXT_MULTIGET)
+                        && syncSequenceList.contains(sequence))
+                {
+                    synchronized (lockObject)
+                    {
+                        syncSequenceList.remove(sequence);
+                        if (syncSequenceList.isEmpty())
+                        {
+                            lockObject.notifyAll();
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    public void handleContentIndicator(Broker broker, Decoder decoder,
+            long sequence, boolean hasProperties, boolean hasStatistics)
+    {
+        ClassKey key = new ClassKey(decoder);
+        SchemaClass sClass = null;
+        ;
+        synchronized (lockObject)
+        {
+            sClass = getSchema(key, false);
+        }
+        if (sClass != null)
+        {
+            QMFObject obj = this.createQMFObject(sClass, decoder,
+                    hasProperties, hasStatistics, true);
+            if (key.getPackageName().equals("org.apache.qpid.broker")
+                    && key.getClassName().equals("agent") && hasProperties)
+            {
+                broker.updateAgent(obj);
+            }
+            synchronized (lockObject)
+            {
+                if (syncSequenceList.contains(sequence))
+                {
+                    if (!obj.isDeleted() && this.selectMatch(obj))
+                    {
+                        getResult.add(obj);
+                    }
+                }
+            }
+            if (console != null)
+            {
+                if (hasProperties)
+                {
+                    console.objectProperties(broker, obj);
+                }
+                if (hasStatistics)
+                {
+                    console.objectStatistics(broker, obj);
+                }
+            }
+        }
+    }
+
+    public void handleEventIndicator(Broker broker, Decoder decoder,
+            long sequence)
+    {
+        if (console != null)
+        {
+            QMFEvent newEvent = new QMFEvent(this, decoder);
+            console.eventRecieved(broker, newEvent);
+        }
+    }
+
+    public void handleHeartbeatIndicator(Broker broker, Decoder decoder,
+            long sequence, Message msg)
+    {
+        if (console != null)
+        {
+            long brokerBank = 1;
+            long agentBank = 0;
+            try
+            {
+                // FIXME HOW DO WE GET THE ROUTING KEY
+                // String routingKey = msg.DeliveryProperties.getRoutingKey();
+                String routingKey = null;
+                if (routingKey != null)
+                {
+                    agentBank = Agent.getBrokerBank(routingKey);
+                    brokerBank = Agent.getBrokerBank(routingKey);
+                }
+            } catch (Throwable e)
+            {
+                log.warn("Internal QPID error", e);
+            }
+            String agentKey = Agent.AgentKey(agentBank, brokerBank);
+            long timestamp = decoder.readUint64();
+            if (broker.Agents.containsKey(agentKey))
+            {
+                Agent agent = broker.Agents.get(agentKey);
+                console.hearbeatRecieved(agent, timestamp);
+            }
+        }
+    }
+
+    public void handleMethodResponse(Broker broker, Decoder decoder,
+            long sequence)
+    {
+        long code = decoder.readUint32();
+        String text = decoder.readStr16();
+        java.util.HashMap<String, Object> outArgs = new java.util.HashMap<String,
Object>();
+        Object obj = sequenceManager.release(sequence);
+        if (obj == null)
+        {
+            return;
+        }
+        Object[] pair = (Object[]) obj;
+        if (code == 0)
+        {
+            for (SchemaArgument arg : ((SchemaMethod) pair[0]).Arguments)
+            {
+                if (arg.isOutput())
+                {
+                    outArgs.put(arg.getName(), this.decodeValue(decoder, arg
+                            .getType()));
+                }
+            }
+        }
+        MethodResult result = new MethodResult(code, text, outArgs);
+        if ((Boolean) pair[1])
+        {
+            this.syncResult = result;
+            broker.setSyncInFlight(false);
+        }
+        if (console != null)
+        {
+            console.methodResponse(broker, sequence, result);
+        }
+    }
+
+    // Callback Methods
+    public void handleNewAgent(Agent agent)
+    {
+        if (console != null)
+        {
+            console.newAgent(agent);
+        }
+    }
+
+    public void handlePackageIndicator(Broker broker, Decoder decoder,
+            long sequence)
+    {
+        String packageName = decoder.readStr8();
+        boolean notify = false;
+        if (!packages.containsKey(packageName))
+        {
+            synchronized (lockObject)
+            {
+                packages.put(packageName,
+                        new java.util.HashMap<String, SchemaClass>());
+                notify = true;
+            }
+        }
+        if (notify && console != null)
+        {
+            console.newPackage(packageName);
+        }
+        broker.incrementOutstanding();
+        long seq = sequenceManager.reserve(Session.CONTEXT_STARTUP);
+        Encoder enc = broker.createEncoder('Q', seq);
+        enc.writeStr8(packageName);
+        broker.send(enc);
+    }
+
+    public void handleSchemaResponse(Broker broker, Decoder decoder,
+            long sequence)
+    {
+        short kind = decoder.readUint8();
+        ClassKey classKey = new ClassKey(decoder);
+        SchemaClass sClass = new SchemaClass(kind, classKey, decoder, this);
+        synchronized (lockObject)
+        {
+            java.util.HashMap<String, SchemaClass> classMappings = packages
+                    .get(sClass.getPackageName());
+            classMappings.remove(sClass.getClassKeyString());
+            classMappings.put(sClass.getClassKeyString(), sClass);
+            log.debug(classKey.toString());
+        }
+        sequenceManager.release(sequence);
+        broker.decrementOutstanding();
+        if (console != null)
+        {
+            this.console.newClass(kind, classKey);
+        }
+    }
+
+    public MethodResult invokeMethod(QMFObject obj, String name,
+            List<Object> args, boolean synchronous, int timeToLive)
+    {
+        Broker aBroker = this.getBroker(obj.brokerBank());
+        long seq = this.sendMethodRequest(obj, aBroker, name, args,
+                synchronous, timeToLive);
+        if (seq != 0)
+        {
+            if (!synchronous)
+            {
+                return null;
+            }
+            try
+            {
+                aBroker.waitForSync(timeToLive);
+            } catch (Throwable e)
+            {
+                sequenceManager.release(seq);
+                throw new ConsoleException(e);
+            }
+            // FIXME missing error logic in the broker
+            return (MethodResult) syncResult;
+        }
+        return null;
+    }
+
+    public QMFObject makeObject(ClassKey key)
+    {
+        SchemaClass sClass = this.getSchema(key);
+        if (sClass == null)
+        {
+            throw new ConsoleException("No schema found for class "
+                    + key.toString());
+        }
+        return this.createQMFObject(sClass, true, true, false);
+    }
+
+    public QMFObject makeObject(String keyString)
+    {
+        return this.makeObject(new ClassKey(keyString));
+    }
+
+    public void removeBroker(Broker broker)
+    {
+        if (brokers.contains(broker))
+        {
+            brokers.remove(broker);
+        }
+        broker.shutdown();
+    }
+
+    public boolean selectMatch(QMFObject obj)
+    {
+        return true;
+    }
+
+    protected long sendMethodRequest(QMFObject obj, Broker aBroker,
+            String name, List<Object> args, boolean synchronous, int timeToLive)
+    {
+        SchemaMethod method = obj.getSchema().getMethod(name);
+        if (args == null)
+        {
+            args = new ArrayList<Object>();
+        }
+        long seq = 0;
+        if (method != null)
+        {
+            Object[] pair =
+            { method, synchronous };
+            seq = sequenceManager.reserve(pair);
+            Encoder enc = aBroker.createEncoder('M', seq);
+            obj.getObjectID().encode(enc);
+            obj.getSchema().getKey().encode(enc);
+            enc.writeStr8(name);
+            if (args.size() < method.getInputArgCount())
+            {
+                throw new ConsoleException(String.format(
+                        "Incorrect number of arguments: expected %s, got %s",
+                        method.getInputArgCount(), args.size()));
+            }
+            int argIndex = 0;
+            for (SchemaArgument arg : method.Arguments)
+            {
+                if (arg.isInput())
+                {
+                    this.encodeValue(enc, arg.getType(), args.get(argIndex));
+                    argIndex += 1;
+                }
+            }
+            Message msg = aBroker.createMessage(enc);
+            if (synchronous)
+            {
+                aBroker.setSyncInFlight(true);
+            }
+            aBroker.send(msg, obj.routingKey(), timeToLive);
+        }
+        return seq;
+    }
+
+    protected void waitForStable()
+    {
+        for (Broker broker : brokers)
+        {
+            broker.waitForStable();
+        }
+    }
+}
\ No newline at end of file

Added: qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/Util.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/Util.java?rev=787325&view=auto
==============================================================================
--- qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/Util.java
(added)
+++ qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/Util.java
Mon Jun 22 17:43:54 2009
@@ -0,0 +1,184 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.console;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.UUID;
+
+public class Util
+{
+    private static HashMap<Class, Short> ENCODINGS = new HashMap<Class, Short>();
+    static
+    {
+        ENCODINGS.put(String.class, (short) 7);
+        ENCODINGS.put(Short.class, (short) 1);
+        ENCODINGS.put(Float.class, (short) 13);
+        ENCODINGS.put(QMFObject.class, (short) 20);
+        ENCODINGS.put(Integer.class, (short) 17);
+        ENCODINGS.put(Long.class, (short) 18);
+        ENCODINGS.put(ArrayList.class, (short) 21);
+    }
+
+    public static String accessName(int type)
+    {
+        switch (type)
+        {
+        // case 0: return "UNKNOWN" ;
+        case 1:
+            return "RC";
+        case 2:
+            return "RW";
+        case 3:
+            return "RO";
+        }
+        throw new ConsoleException(String.format("Invalid Access Code: %s",
+                type));
+    }
+
+    public static String byteString(byte[] bytes)
+    {
+        return new String(bytes, Charset.forName("UTF-8"));
+    }
+
+    public static Object defaultValue(short type)
+    {
+        switch (type)
+        {
+        // case 0: return "UNKNOWN" ;
+        case 1:
+            return 0;
+        case 2:
+            return 0;
+        case 3:
+            return 0l;
+        case 4:
+            return 0l;
+        case 5:
+            return false;
+        case 6:
+            return "";
+        case 7:
+            return "";
+        case 8:
+            return 0l;
+        case 9:
+            return 0l;
+        case 10:
+            return new ObjectID();
+        case 11:
+            return false;
+        case 12:
+            return 0f;
+        case 13:
+            return 0d;
+        case 14:
+            return new UUID(0, 0);
+        case 15:
+            return new HashMap<String, Object>();
+        case 16:
+            return 0;
+        case 17:
+            return 0;
+        case 18:
+            return 0l;
+        case 19:
+            return 0l;
+        case 20:
+            return null;
+        case 21:
+            return new java.util.ArrayList<Object>();
+        case 22:
+            return new java.util.ArrayList<Object>();
+        }
+        throw new ConsoleException(String.format("Invalid Type Code: %s", type));
+    }
+
+    public static short qmfType(Object obj)
+    {
+        if (ENCODINGS.containsKey(obj.getClass()))
+        {
+            return ENCODINGS.get(obj.getClass());
+        } else
+        {
+            throw new ConsoleException(String.format("Unkown Type of %s", obj
+                    .getClass()));
+        }
+    }
+
+    public static String typeName(short type)
+    {
+        switch (type)
+        {
+        // case 0: return "UNKNOWN" ;
+        case 1:
+            return "uint8";
+        case 2:
+            return "uint16";
+        case 3:
+            return "uint32";
+        case 4:
+            return "uint64";
+        case 5:
+            return "bool";
+        case 6:
+            return "short-string";
+        case 7:
+            return "long-string";
+        case 8:
+            return "abs-time";
+        case 9:
+            return "delta-time";
+        case 10:
+            return "reference";
+        case 11:
+            return "boolean";
+        case 12:
+            return "float";
+        case 13:
+            return "double";
+        case 14:
+            return "uuid";
+        case 15:
+            return "field-table";
+        case 16:
+            return "int8";
+        case 17:
+            return "int16";
+        case 18:
+            return "int32";
+        case 19:
+            return "int64";
+        case 20:
+            return "object";
+        case 21:
+            return "list";
+        case 22:
+            return "array";
+        }
+        throw new ConsoleException(String.format("Invalid Type Code: %s", type));
+    }
+
+    protected Util()
+    {
+    }
+}
\ No newline at end of file

Added: qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/XMLUtil.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/XMLUtil.java?rev=787325&view=auto
==============================================================================
--- qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/XMLUtil.java
(added)
+++ qpid/trunk/qpid/java/management/console/src/main/java/org/apache/qpid/console/XMLUtil.java
Mon Jun 22 17:43:54 2009
@@ -0,0 +1,103 @@
+package org.apache.qpid.console;
+
+   
+public class XMLUtil
+{
+    
+    public static String commonAttributes(SchemaVariable var) {
+        String returnString = "" ;
+        if (var.getDescription() != null){
+           returnString = returnString + String.format(" desc='%s'", var.getDescription())
;  
+        }       
+        
+        if (var.getRefPackage() != null){
+           returnString = returnString + String.format(" refPackage='%s'", var.getRefPackage())
;  
+        }       
+                
+        if (var.getRefClass() != null){
+           returnString = returnString + String.format(" refClass='%s'", var.getRefClass())
;  
+        }   
+        
+        if (var.getUnit() != null){
+           returnString = returnString + String.format(" unit='%s'", var.getUnit()) ;  
+        }           
+        
+        if (var.getMin() != null){
+           returnString = returnString + String.format(" min='%s'", var.getMin()) ;  
+        }          
+        if (var.getMax() != null){
+           returnString = returnString + String.format(" max='%s'", var.getMax()) ;  
+        }          
+        if (var.getMaxLength() != null){
+           returnString = returnString + String.format(" maxLength='%s'", var.getMaxLength())
;  
+        }
+        
+        return returnString ;
+      }
+
+    public static String schemaXML(Session sess, String packageName) {
+        String returnValue = String.format("<schema package='%s'>\n", packageName)
;
+        for (ClassKey key : sess.getClasses(packageName)) {
+            SchemaClass schema = sess.getSchema(key) ;
+            if (schema.getKind() == 1) {
+                if (schema.getSuperType() == null) {
+                    returnValue += String.format("\t<class name='%s' hash='%s'>\n",
key.getClassName(), key.getHashString()) ;
+                }
+                else {
+                    returnValue += String.format("\t<class name='%s' hash='%s' extends='%s'>\n",
 key.getClassName(), key.getHashString(), schema.getSuperType().getKeyString()) ;
+                }
+                for (SchemaProperty prop : schema.getProperties()) {
+                    Object[] attributes = new Object[5] ;
+                    attributes[0] = prop.getName() ;
+                    attributes[1] = Util.typeName(prop.getType()) ;
+                    attributes[2] = Util.accessName(prop.getAccess()) ;
+                    attributes[3] = prop.getOptional()? "True" : "False ";
+                    attributes[4] = XMLUtil.commonAttributes(prop);
+                    returnValue += String.format("\t\t<property name='%s' type='%s' access='%s'
optional='%s'%s/>\n", attributes) ;
+                }
+                for (SchemaMethod meth : schema.getMethods()) {
+                    returnValue += String.format("\t\t<method name='%s'/>\n", meth.getName())
; 
+                    for (SchemaArgument arg : meth.Arguments) {
+                        Object[] attributes = new Object[4] ;                   
+                        attributes[0] = arg.getName() ;
+                        attributes[1] = arg.getDirection() ;
+                        attributes[2] = Util.typeName(arg.getType()) ;
+                        attributes[3] = XMLUtil.commonAttributes(arg);                  
+                        returnValue += String.format("\t\t\t<arg name='%s' dir='%s' type='%s'%s/>\n",
attributes) ;
+                    }
+                    returnValue += String.format("\t\t</method>\n") ;
+                }
+                returnValue += String.format("\t</class>\n") ;              
+            } else {
+                returnValue += String.format("\t<event name='%s' hash='%s'>\n", key.getClassName(),
key.getHashString()) ; 
+                for (SchemaArgument arg : schema.getArguments()) {
+                    Object[] attributes = new Object[4] ;                   
+                    attributes[0] = arg.getName() ;
+                    attributes[1] = Util.typeName(arg.getType()) ;
+                    attributes[2] = XMLUtil.commonAttributes(arg);                  
+                    returnValue += String.format("\t\t\t<arg name='%s' type='%s'%s/>\n",
attributes) ;
+                }
+                returnValue += String.format("\t</event>\n") ;
+            }
+        }
+        returnValue += String.format("</schema>\n") ;       
+        
+        return returnValue ;
+    }       
+    
+    public static String schemaXML(Session sess, String[] packageNames) {
+        String returnValue = "<schemas>\n" ;
+        for (String pack : packageNames) {
+            returnValue += XMLUtil.schemaXML(sess, pack) ;
+            returnValue += "\n" ;
+        }
+        returnValue += "</schemas>\n" ;
+        return returnValue ;
+    }
+    
+    protected XMLUtil()
+    {
+    }
+}
+
+



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message