cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ahu...@apache.org
Subject [15/36] Commit to try something on removing getZone
Date Fri, 06 Sep 2013 22:38:45 GMT
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/435e74e9/plugins/network-elements/internal-loadbalancer/src/org/apache/cloudstack/network/lb/InternalLoadBalancerVMManagerImpl.java
----------------------------------------------------------------------
diff --git a/plugins/network-elements/internal-loadbalancer/src/org/apache/cloudstack/network/lb/InternalLoadBalancerVMManagerImpl.java
b/plugins/network-elements/internal-loadbalancer/src/org/apache/cloudstack/network/lb/InternalLoadBalancerVMManagerImpl.java
index ff69577..e9fa842 100644
--- a/plugins/network-elements/internal-loadbalancer/src/org/apache/cloudstack/network/lb/InternalLoadBalancerVMManagerImpl.java
+++ b/plugins/network-elements/internal-loadbalancer/src/org/apache/cloudstack/network/lb/InternalLoadBalancerVMManagerImpl.java
@@ -27,9 +27,10 @@ import javax.inject.Inject;
 import javax.naming.ConfigurationException;
 
 import org.apache.log4j.Logger;
-import org.springframework.stereotype.Component;
 
 import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService;
+import org.apache.cloudstack.framework.config.ConfigDepot;
+import org.apache.cloudstack.framework.config.ConfigValue;
 import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
 import org.apache.cloudstack.lb.ApplicationLoadBalancerRuleVO;
 import org.apache.cloudstack.lb.dao.ApplicationLoadBalancerRuleDao;
@@ -79,6 +80,7 @@ import com.cloud.network.lb.LoadBalancingRule.LbDestination;
 import com.cloud.network.lb.LoadBalancingRule.LbHealthCheckPolicy;
 import com.cloud.network.lb.LoadBalancingRule.LbStickinessPolicy;
 import com.cloud.network.lb.LoadBalancingRulesManager;
+import com.cloud.network.router.VirtualNetworkApplianceManager;
 import com.cloud.network.router.VirtualRouter;
 import com.cloud.network.router.VirtualRouter.RedundantState;
 import com.cloud.network.router.VirtualRouter.Role;
@@ -117,12 +119,9 @@ import com.cloud.vm.dao.DomainRouterDao;
 import com.cloud.vm.dao.NicDao;
 
 
-@Component
 @Local(value = { InternalLoadBalancerVMManager.class, InternalLoadBalancerVMService.class})
-public class InternalLoadBalancerVMManagerImpl extends ManagerBase implements
-        InternalLoadBalancerVMManager, VirtualMachineGuru {
-    private static final Logger s_logger = Logger
-            .getLogger(InternalLoadBalancerVMManagerImpl.class);
+public class InternalLoadBalancerVMManagerImpl extends ManagerBase implements InternalLoadBalancerVMManager,
InternalLoadBalancerVMService, VirtualMachineGuru {
+    private static final Logger s_logger = Logger.getLogger(InternalLoadBalancerVMManagerImpl.class);
     static final private String _internalLbVmNamePrefix = "b";
     
     private String _instance;
@@ -151,6 +150,8 @@ public class InternalLoadBalancerVMManagerImpl extends ManagerBase implements
     @Inject VMTemplateDao _templateDao;
     @Inject ResourceManager _resourceMgr;
     @Inject ConfigurationServer _configServer;
+    @Inject
+    ConfigDepot _configDepot;
 
     @Override
     public boolean finalizeVirtualMachineProfile(VirtualMachineProfile profile,
@@ -343,6 +344,13 @@ public class InternalLoadBalancerVMManagerImpl extends ManagerBase implements
     public void prepareStop(VirtualMachineProfile profile) {
     }
     
+    static ConfigValue<Integer> _networkLockTimeout;
+    static ConfigValue<String> _routerTemplateXen;
+    static ConfigValue<String> _routerTemplateKvm;
+    static ConfigValue<String> _routerTemplateVmware;
+    static ConfigValue<String> _routerTemplateHyperV;
+    static ConfigValue<String> _routerTemplateLxc;
+
     @Override
     public boolean configure(String name, Map<String, Object> params) throws ConfigurationException
{
         final Map<String, String> configs = _configDao.getConfiguration("AgentManager",
params);
@@ -351,6 +359,13 @@ public class InternalLoadBalancerVMManagerImpl extends ManagerBase implements
             _instance = "DEFAULT";
         }
         
+        _networkLockTimeout = _configDepot.get(NetworkOrchestrationService.NetworkLockTimeout);
+        _routerTemplateXen = _configDepot.get(VirtualNetworkApplianceManager.RouterTemplateXen);
+        _routerTemplateKvm = _configDepot.get(VirtualNetworkApplianceManager.RouterTemplateKvm);
+        _routerTemplateVmware = _configDepot.get(VirtualNetworkApplianceManager.RouterTemplateVmware);
+        _routerTemplateHyperV = _configDepot.get(VirtualNetworkApplianceManager.RouterTemplateHyperV);
+        _routerTemplateLxc = _configDepot.get(VirtualNetworkApplianceManager.RouterTemplateLxc);
+
         _mgmtHost = configs.get("host");
         _mgmtCidr = _configDao.getValue(Config.ManagementNetwork.key());
         
@@ -587,7 +602,7 @@ public class InternalLoadBalancerVMManagerImpl extends ManagerBase implements
             InsufficientCapacityException, ResourceUnavailableException {
 
         List<DomainRouterVO> internalLbVms = new ArrayList<DomainRouterVO>();
-        Network lock = _networkDao.acquireInLockTable(guestNetwork.getId(), _ntwkMgr.getNetworkLockTimeout());
+        Network lock = _networkDao.acquireInLockTable(guestNetwork.getId(), _networkLockTimeout.value());
         if (lock == null) {
             throw new ConcurrentOperationException("Unable to lock network " + guestNetwork.getId());
         }
@@ -740,19 +755,19 @@ public class InternalLoadBalancerVMManagerImpl extends ManagerBase implements
                 }                String templateName = null;
                 switch (hType) {
                     case XenServer:
-                        templateName = _configServer.getConfigValue(Config.RouterTemplateXen.key(),
Config.ConfigurationParameterScope.zone.toString(), dest.getDataCenter().getId());
+                        templateName = _routerTemplateXen.valueIn(dest.getDataCenter().getId());
                         break;
                     case KVM:
-                        templateName = _configServer.getConfigValue(Config.RouterTemplateKVM.key(),
Config.ConfigurationParameterScope.zone.toString(), dest.getDataCenter().getId());
+                        templateName = _routerTemplateKvm.valueIn(dest.getDataCenter().getId());
                         break;
                     case VMware:
-                        templateName = _configServer.getConfigValue(Config.RouterTemplateVmware.key(),
Config.ConfigurationParameterScope.zone.toString(), dest.getDataCenter().getId());
+                        templateName = _routerTemplateVmware.valueIn(dest.getDataCenter().getId());
                         break;
                     case Hyperv:
-                        templateName = _configServer.getConfigValue(Config.RouterTemplateHyperv.key(),
Config.ConfigurationParameterScope.zone.toString(), dest.getDataCenter().getId());
+                        templateName = _routerTemplateHyperV.valueIn(dest.getDataCenter().getId());
                         break;
                     case LXC:
-                        templateName = _configServer.getConfigValue(Config.RouterTemplateLXC.key(),
Config.ConfigurationParameterScope.zone.toString(), dest.getDataCenter().getId());
+                        templateName = _routerTemplateLxc.valueIn(dest.getDataCenter().getId());
                         break;
                     default: break;
                 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/435e74e9/plugins/network-elements/ovs/src/com/cloud/network/guru/OvsGuestNetworkGuru.java
----------------------------------------------------------------------
diff --git a/plugins/network-elements/ovs/src/com/cloud/network/guru/OvsGuestNetworkGuru.java
b/plugins/network-elements/ovs/src/com/cloud/network/guru/OvsGuestNetworkGuru.java
index 34246dd..61ab272 100644
--- a/plugins/network-elements/ovs/src/com/cloud/network/guru/OvsGuestNetworkGuru.java
+++ b/plugins/network-elements/ovs/src/com/cloud/network/guru/OvsGuestNetworkGuru.java
@@ -19,10 +19,7 @@ package com.cloud.network.guru;
 import javax.ejb.Local;
 import javax.inject.Inject;
 
-import com.cloud.event.ActionEventUtils;
-
 import org.apache.log4j.Logger;
-import org.springframework.stereotype.Component;
 
 import org.apache.cloudstack.context.CallContext;
 
@@ -30,6 +27,7 @@ import com.cloud.dc.DataCenter;
 import com.cloud.dc.DataCenter.NetworkType;
 import com.cloud.deploy.DeployDestination;
 import com.cloud.deploy.DeploymentPlan;
+import com.cloud.event.ActionEventUtils;
 import com.cloud.event.EventTypes;
 import com.cloud.event.EventVO;
 import com.cloud.exception.InsufficientVirtualNetworkCapcityException;
@@ -45,7 +43,6 @@ import com.cloud.offering.NetworkOffering;
 import com.cloud.user.Account;
 import com.cloud.vm.ReservationContext;
 
-@Component
 @Local(value=NetworkGuru.class)
 public class OvsGuestNetworkGuru extends GuestNetworkGuru {
     private static final Logger s_logger = Logger.getLogger(OvsGuestNetworkGuru.class);
@@ -73,7 +70,7 @@ public class OvsGuestNetworkGuru extends GuestNetworkGuru {
                     + NetworkType.Advanced);
             return false;
         }
-    }    
+    }
 
     @Override
     public Network design(NetworkOffering offering, DeploymentPlan plan, Network userSpecified,
Account owner) {
@@ -82,7 +79,7 @@ public class OvsGuestNetworkGuru extends GuestNetworkGuru {
             return null;
         }
 
-        NetworkVO config = (NetworkVO) super.design(offering, plan, userSpecified, owner);

+        NetworkVO config = (NetworkVO) super.design(offering, plan, userSpecified, owner);
         if (config == null) {
             return null;
         }
@@ -96,8 +93,7 @@ public class OvsGuestNetworkGuru extends GuestNetworkGuru {
     protected void allocateVnet(Network network, NetworkVO implemented, long dcId,
             long physicalNetworkId, String reservationId) throws InsufficientVirtualNetworkCapcityException
{
         if (network.getBroadcastUri() == null) {
-            String vnet = _dcDao.allocateVnet(dcId, physicalNetworkId, network.getAccountId(),
reservationId,
-                    canUseSystemGuestVlan(network.getAccountId()));
+            String vnet = _dcDao.allocateVnet(dcId, physicalNetworkId, network.getAccountId(),
reservationId, _useSystemGuestVlans.valueIn(network.getAccountId()));
             if (vnet == null) {
                 throw new InsufficientVirtualNetworkCapcityException("Unable to allocate
vnet as a part of network " + network + " implement ", DataCenter.class, dcId);
             }
@@ -114,7 +110,7 @@ public class OvsGuestNetworkGuru extends GuestNetworkGuru {
         if (!_ovsTunnelMgr.isOvsTunnelEnabled()) {
             return null;
         }
-        NetworkVO implemented = (NetworkVO)super.implement(config, offering, dest, context);
	 
+        NetworkVO implemented = (NetworkVO)super.implement(config, offering, dest, context);
         return implemented;
     }
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/435e74e9/server/src/com/cloud/agent/AgentManager.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/agent/AgentManager.java b/server/src/com/cloud/agent/AgentManager.java
deleted file mode 100755
index 43b42b9..0000000
--- a/server/src/com/cloud/agent/AgentManager.java
+++ /dev/null
@@ -1,148 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.agent;
-
-import com.cloud.agent.api.Answer;
-import com.cloud.agent.api.Command;
-import com.cloud.agent.api.StartupCommand;
-import com.cloud.agent.manager.AgentAttache;
-import com.cloud.agent.manager.Commands;
-import com.cloud.exception.AgentUnavailableException;
-import com.cloud.exception.ConnectionException;
-import com.cloud.exception.OperationTimedoutException;
-import com.cloud.host.HostVO;
-import com.cloud.host.Status;
-import com.cloud.hypervisor.Hypervisor.HypervisorType;
-import com.cloud.resource.ServerResource;
-import com.cloud.utils.component.Manager;
-
-/**
- * AgentManager manages hosts. It directly coordinates between the DAOs and the connections
it manages.
- */
-public interface AgentManager extends Manager {
-    public enum TapAgentsAction {
-        Add,
-        Del,
-        Contains,
-    }
-
-    /**
-     * easy send method that returns null if there's any errors. It handles all exceptions.
-     *
-     * @param hostId
-     *            host id
-     * @param cmd
-     *            command to send.
-     * @return Answer if successful; null if not.
-     */
-    Answer easySend(Long hostId, Command cmd);
-
-    /**
-     * Synchronous sending a command to the agent.
-     *
-     * @param hostId
-     *            id of the agent on host
-     * @param cmd
-     *            command
-     * @return an Answer
-     */
-
-    Answer send(Long hostId, Command cmd) throws AgentUnavailableException, OperationTimedoutException;
-
-    /**
-     * Synchronous sending a list of commands to the agent.
-     *
-     * @param hostId
-     *            id of the agent on host
-     * @param cmds
-     *            array of commands
-     * @param isControl
-     *            Commands sent contains control commands
-     * @param stopOnError
-     *            should the agent stop execution on the first error.
-     * @return an array of Answer
-     */
-    Answer[] send(Long hostId, Commands cmds) throws AgentUnavailableException, OperationTimedoutException;
-
-    Answer[] send(Long hostId, Commands cmds, int timeout) throws AgentUnavailableException,
OperationTimedoutException;
-
-    /**
-     * Asynchronous sending of a command to the agent.
-     *
-     * @param hostId
-     *            id of the agent on the host.
-     * @param cmds
-     *            Commands to send.
-     * @param stopOnError
-     *            should the agent stop execution on the first error.
-     * @param listener
-     *            the listener to process the answer.
-     * @return sequence number.
-     */
-    long send(Long hostId, Commands cmds, Listener listener) throws AgentUnavailableException;
-
-    /**
-     * Register to listen for host events. These are mostly connection and disconnection
events.
-     *
-     * @param listener
-     * @param connections
-     *            listen for connections
-     * @param commands
-     *            listen for connections
-     * @param priority
-     *            in listening for events.
-     * @return id to unregister if needed.
-     */
-    int registerForHostEvents(Listener listener, boolean connections, boolean commands, boolean
priority);
-
-
-    /**
-     * Register to listen for initial agent connections.
-     * @param creator
-     * @param priority in listening for events.
-     * @return id to unregister if needed.
-     */
-    int registerForInitialConnects(StartupCommandProcessor creator,  boolean priority);
-
-    /**
-     * Unregister for listening to host events.
-     *
-     * @param id
-     *            returned from registerForHostEvents
-     */
-    void unregisterForHostEvents(int id);
-
-    Answer sendTo(Long dcId, HypervisorType type, Command cmd);
-
-
-    /* working as a lock while agent is being loaded */
-    public boolean tapLoadingAgents(Long hostId, TapAgentsAction action);
-
-    public AgentAttache handleDirectConnectAgent(HostVO host, StartupCommand[] cmds, ServerResource
resource, boolean forRebalance) throws ConnectionException;
-
-    public boolean agentStatusTransitTo(HostVO host, Status.Event e, long msId);
-
-    boolean isAgentAttached(long hostId);
-
-    void disconnectWithoutInvestigation(long hostId, Status.Event event);
-
-    public void pullAgentToMaintenance(long hostId);
-
-    public void pullAgentOutMaintenance(long hostId);
-
-	boolean reconnect(long hostId);
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/435e74e9/server/src/com/cloud/agent/manager/AgentAttache.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/agent/manager/AgentAttache.java b/server/src/com/cloud/agent/manager/AgentAttache.java
deleted file mode 100755
index 67deba0..0000000
--- a/server/src/com/cloud/agent/manager/AgentAttache.java
+++ /dev/null
@@ -1,519 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.agent.manager;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.log4j.Logger;
-
-import com.cloud.agent.Listener;
-import com.cloud.agent.api.Answer;
-import com.cloud.agent.api.CheckHealthCommand;
-import com.cloud.agent.api.CheckNetworkCommand;
-import com.cloud.agent.api.CheckVirtualMachineCommand;
-import com.cloud.agent.api.CleanupNetworkRulesCmd;
-import com.cloud.agent.api.ClusterSyncCommand;
-import com.cloud.agent.api.Command;
-import com.cloud.agent.api.MaintainCommand;
-import com.cloud.agent.api.MigrateCommand;
-import com.cloud.agent.api.PingTestCommand;
-import com.cloud.agent.api.PvlanSetupCommand;
-import com.cloud.agent.api.ReadyCommand;
-import com.cloud.agent.api.SetupCommand;
-import com.cloud.agent.api.ShutdownCommand;
-import com.cloud.agent.api.StartCommand;
-import com.cloud.agent.api.StopCommand;
-import com.cloud.agent.api.storage.CreateCommand;
-import com.cloud.agent.transport.Request;
-import com.cloud.agent.transport.Response;
-import com.cloud.exception.AgentUnavailableException;
-import com.cloud.exception.OperationTimedoutException;
-import com.cloud.host.Status;
-import com.cloud.utils.concurrency.NamedThreadFactory;
-
-/**
- *  AgentAttache provides basic commands to be implemented.
- */
-public abstract class AgentAttache {
-    private static final Logger s_logger = Logger.getLogger(AgentAttache.class);
-
-    private static final ScheduledExecutorService s_listenerExecutor = Executors.newScheduledThreadPool(10,
new NamedThreadFactory("ListenerTimer"));
-    private static final Random                       s_rand                            
  = new Random(System.currentTimeMillis());
-
-    protected static final Comparator<Request> s_reqComparator =
-            new Comparator<Request>() {
-        @Override
-        public int compare(Request o1, Request o2) {
-            long seq1 = o1.getSequence();
-            long seq2 = o2.getSequence();
-            if (seq1 < seq2) {
-                return -1;
-            } else if (seq1 > seq2) {
-                return 1;
-            } else {
-                return 0;
-            }
-        }
-    };
-
-    protected static final Comparator<Object> s_seqComparator =
-            new Comparator<Object>() {
-        @Override
-        public int compare(Object o1, Object o2) {
-            long seq1 = ((Request) o1).getSequence();
-            long seq2 = (Long) o2;
-            if (seq1 < seq2) {
-                return -1;
-            } else if (seq1 > seq2) {
-                return 1;
-            } else {
-                return 0;
-            }
-        }
-    };
-
-    protected final long _id;
-    protected final ConcurrentHashMap<Long, Listener> _waitForList;
-    protected final LinkedList<Request> _requests;
-    protected Long _currentSequence;
-    protected Status _status = Status.Connecting;
-    protected boolean _maintenance;
-    protected long                                    _nextSequence;
-
-    protected AgentManagerImpl _agentMgr;
-
-    public final static String[] s_commandsAllowedInMaintenanceMode =
-            new String[] { MaintainCommand.class.toString(), MigrateCommand.class.toString(),
StopCommand.class.toString(), CheckVirtualMachineCommand.class.toString(), PingTestCommand.class.toString(),
-    					   CheckHealthCommand.class.toString(), ReadyCommand.class.toString(), ShutdownCommand.class.toString(),
SetupCommand.class.toString(), ClusterSyncCommand.class.toString(),
-    					   CleanupNetworkRulesCmd.class.toString(), CheckNetworkCommand.class.toString(),
PvlanSetupCommand.class.toString() };
-    protected final static String[] s_commandsNotAllowedInConnectingMode =
-            new String[] { StartCommand.class.toString(), CreateCommand.class.toString()
};
-    static {
-        Arrays.sort(s_commandsAllowedInMaintenanceMode);
-        Arrays.sort(s_commandsNotAllowedInConnectingMode);
-    }
-
-
-    protected AgentAttache(AgentManagerImpl agentMgr, final long id, boolean maintenance)
{
-        _id = id;
-        _waitForList = new ConcurrentHashMap<Long, Listener>();
-        _currentSequence = null;
-        _maintenance = maintenance;
-        _requests = new LinkedList<Request>();
-        _agentMgr = agentMgr;
-        _nextSequence = s_rand.nextInt(Short.MAX_VALUE) << 48;
-    }
-
-    public synchronized long getNextSequence() {
-        return ++_nextSequence;
-    }
-
-    public synchronized void setMaintenanceMode(final boolean value) {
-        _maintenance = value;
-    }
-
-    public void ready() {
-        _status = Status.Up;
-    }
-
-    public boolean isReady() {
-        return _status == Status.Up;
-    }
-
-    public boolean isConnecting() {
-        return _status == Status.Connecting;
-    }
-
-    public boolean forForward() {
-        return false;
-    }
-
-    protected void checkAvailability(final Command[] cmds) throws AgentUnavailableException
{
-        if (!_maintenance && _status != Status.Connecting) {
-            return;
-        }
-
-        if (_maintenance) {
-            for (final Command cmd : cmds) {
-                if (Arrays.binarySearch(s_commandsAllowedInMaintenanceMode, cmd.getClass().toString())
< 0) {
-                    throw new AgentUnavailableException("Unable to send " + cmd.getClass().toString()
+ " because agent is in maintenance mode", _id);
-                }
-            }
-        }
-
-        if (_status == Status.Connecting) {
-            for (final Command cmd : cmds) {
-                if (Arrays.binarySearch(s_commandsNotAllowedInConnectingMode, cmd.getClass().toString())
>= 0) {
-                    throw new AgentUnavailableException("Unable to send " + cmd.getClass().toString()
+ " because agent is in connecting mode", _id);
-                }
-            }
-        }
-    }
-
-    protected synchronized void addRequest(Request req) {
-        int index = findRequest(req);
-        assert (index < 0) : "How can we get index again? " + index + ":" + req.toString();
-        _requests.add(-index - 1, req);
-    }
-
-
-    protected void cancel(Request req) {
-        long seq = req.getSequence();
-        cancel(seq);
-    }
-
-    protected synchronized void cancel(final long seq) {
-        if (s_logger.isDebugEnabled()) {
-            s_logger.debug(log(seq, "Cancelling."));
-        }
-        final Listener listener = _waitForList.remove(seq);
-        if (listener != null) {
-            listener.processDisconnect(_id, Status.Disconnected);
-        }
-        int index = findRequest(seq);
-        if (index >= 0) {
-            _requests.remove(index);
-        }
-    }
-
-    protected synchronized int findRequest(Request req) {
-        return Collections.binarySearch(_requests, req, s_reqComparator);
-    }
-
-    protected synchronized int findRequest(long seq) {
-        return Collections.binarySearch(_requests, seq, s_seqComparator);
-    }
-
-
-    protected String log(final long seq, final String msg) {
-        return "Seq " + _id + "-" + seq + ": " + msg;
-    }
-
-    protected void registerListener(final long seq, final Listener listener) {
-        if (s_logger.isTraceEnabled()) {
-            s_logger.trace(log(seq, "Registering listener"));
-        }
-        if (listener.getTimeout() != -1) {
-            s_listenerExecutor.schedule(new Alarm(seq), listener.getTimeout(), TimeUnit.SECONDS);
-        }
-        _waitForList.put(seq, listener);
-    }
-
-    protected Listener unregisterListener(final long sequence) {
-        if (s_logger.isTraceEnabled()) {
-            s_logger.trace(log(sequence, "Unregistering listener"));
-        }
-        return _waitForList.remove(sequence);
-    }
-
-    protected Listener getListener(final long sequence) {
-        return _waitForList.get(sequence);
-    }
-
-    public long getId() {
-        return _id;
-    }
-
-    public int getQueueSize() {
-        return _requests.size();
-    }
-
-    public int getNonRecurringListenersSize() {
-        List<Listener> nonRecurringListenersList = new ArrayList<Listener>();
-        if (_waitForList.isEmpty()) {
-            return 0;
-        } else {
-            final Set<Map.Entry<Long, Listener>> entries = _waitForList.entrySet();
-            final Iterator<Map.Entry<Long, Listener>> it = entries.iterator();
-            while (it.hasNext()) {
-                final Map.Entry<Long, Listener> entry = it.next();
-                final Listener monitor = entry.getValue();
-                if (!monitor.isRecurring()) {
-                    //TODO - remove this debug statement later
-                    s_logger.debug("Listener is " + entry.getValue() + " waiting on " + entry.getKey());
-                    nonRecurringListenersList.add(monitor);
-                }
-            }
-        }
-
-        return nonRecurringListenersList.size();
-    }
-
-    public boolean processAnswers(final long seq, final Response resp) {
-        resp.logD("Processing: ", true);
-
-        final Answer[] answers = resp.getAnswers();
-
-        boolean processed = false;
-
-        try {
-            Listener monitor = getListener(seq);
-
-            if (monitor == null) {
-                if ( answers[0] != null && answers[0].getResult() ) {
-                    processed = true;
-                }
-                if (s_logger.isDebugEnabled()) {
-                    s_logger.debug(log(seq, "Unable to find listener."));
-                }
-            } else {
-                processed = monitor.processAnswers(_id, seq, answers);
-                if (s_logger.isTraceEnabled()) {
-                    s_logger.trace(log(seq, (processed ? "" : " did not ") + " processed
"));
-                }
-
-                if (!monitor.isRecurring()) {
-                    unregisterListener(seq);
-                }
-            }
-
-            _agentMgr.notifyAnswersToMonitors(_id, seq, answers);
-
-        } finally {
-            // we should always trigger next command execution, even in failure cases - otherwise
in exception case all the remaining will be stuck in the sync queue forever
-            if (resp.executeInSequence()) {
-                sendNext(seq);
-            }
-        }
-
-        return processed;
-    }
-
-    protected void cancelAllCommands(final Status state, final boolean cancelActive) {
-        final Set<Map.Entry<Long, Listener>> entries = _waitForList.entrySet();
-        final Iterator<Map.Entry<Long, Listener>> it = entries.iterator();
-        while (it.hasNext()) {
-            final Map.Entry<Long, Listener> entry = it.next();
-            it.remove();
-            final Listener monitor = entry.getValue();
-            if (s_logger.isDebugEnabled()) {
-                s_logger.debug(log(entry.getKey(), "Sending disconnect to " + monitor.getClass()));
-            }
-            monitor.processDisconnect(_id, state);
-        }
-    }
-
-    public void cleanup(final Status state) {
-        cancelAllCommands(state, true);
-        _requests.clear();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        try {
-            AgentAttache that = (AgentAttache) obj;
-            return _id == that._id;
-        } catch (ClassCastException e) {
-            assert false : "Who's sending an " + obj.getClass().getSimpleName() + " to AgentAttache.equals()?
";
-        return false;
-        }
-    }
-
-    public void send(Request req, final Listener listener) throws AgentUnavailableException
{
-        checkAvailability(req.getCommands());
-
-        long seq = req.getSequence();
-        if (listener != null) {
-            registerListener(seq, listener);
-        } else if (s_logger.isDebugEnabled()) {
-            s_logger.debug(log(seq, "Routed from " + req.getManagementServerId()));
-        }
-
-        synchronized(this) {
-            try {
-                if (isClosed()) {
-                    throw new AgentUnavailableException("The link to the agent has been closed",
_id);
-                }
-
-                if (req.executeInSequence() && _currentSequence != null) {
-                    req.logD("Waiting for Seq " + _currentSequence + " Scheduling: ", true);
-                    addRequest(req);
-                    return;
-                }
-
-                // If we got to here either we're not suppose to set
-                // the _currentSequence or it is null already.
-
-                req.logD("Sending ", true);
-                send(req);
-
-                if (req.executeInSequence() && _currentSequence == null) {
-                    _currentSequence = seq;
-                    if (s_logger.isTraceEnabled()) {
-                        s_logger.trace(log(seq, " is current sequence"));
-                    }
-                }
-            } catch (AgentUnavailableException e) {
-                s_logger.info(log(seq, "Unable to send due to " + e.getMessage()));
-                cancel(seq);
-                throw e;
-            } catch (Exception e) {
-                s_logger.warn(log(seq, "Unable to send due to "), e);
-                cancel(seq);
-                throw new AgentUnavailableException("Problem due to other exception " + e.getMessage(),
_id);
-            }
-        }
-    }
-
-    public Answer[] send(Request req, int wait) throws AgentUnavailableException, OperationTimedoutException
{
-        SynchronousListener sl = new SynchronousListener(null);
-
-        long seq = req.getSequence();
-        send(req, sl);
-
-        try {
-            for (int i = 0; i < 2; i++) {
-                Answer[] answers = null;
-                try {
-                    answers = sl.waitFor(wait);
-                } catch (final InterruptedException e) {
-                    s_logger.debug(log(seq, "Interrupted"));
-                }
-                if (answers != null) {
-                    if (s_logger.isDebugEnabled()) {
-                        new Response(req, answers).logD("Received: ", false);
-                    }
-                    return answers;
-                }
-
-                answers = sl.getAnswers(); // Try it again.
-                if (answers != null) {
-                    if (s_logger.isDebugEnabled()) {
-                        new Response(req, answers).logD("Received after timeout: ", true);
-                    }
-
-                    _agentMgr.notifyAnswersToMonitors(_id, seq, answers);
-                    return answers;
-                }
-
-                final Long current = _currentSequence;
-                if (current != null && seq != current) {
-                    if (s_logger.isDebugEnabled()) {
-                        s_logger.debug(log(seq, "Waited too long."));
-                    }
-
-                    throw new OperationTimedoutException(req.getCommands(), _id, seq, wait,
false);
-                }
-
-                if (s_logger.isDebugEnabled()) {
-                    s_logger.debug(log(seq, "Waiting some more time because this is the current
command"));
-                }
-            }
-
-            throw new OperationTimedoutException(req.getCommands(), _id, seq, wait * 2, true);
-        } catch (OperationTimedoutException e) {
-            s_logger.warn(log(seq, "Timed out on " + req.toString()));
-            cancel(seq);
-            final Long current = _currentSequence;
-            if (req.executeInSequence() && (current != null && current ==
seq)) {
-                sendNext(seq);
-            }
-            throw e;
-        } catch (Exception e) {
-            s_logger.warn(log(seq, "Exception while waiting for answer"), e);
-            cancel(seq);
-            final Long current = _currentSequence;
-            if (req.executeInSequence() && (current != null && current ==
seq)) {
-                sendNext(seq);
-            }
-            throw new OperationTimedoutException(req.getCommands(), _id, seq, wait, false);
-        } finally {
-            unregisterListener(seq);
-        }
-    }
-
-    protected synchronized void sendNext(final long seq) {
-        _currentSequence = null;
-        if (_requests.isEmpty()) {
-            if (s_logger.isDebugEnabled()) {
-                s_logger.debug(log(seq, "No more commands found"));
-            }
-            return;
-        }
-
-        Request req = _requests.pop();
-        if (s_logger.isDebugEnabled()) {
-            s_logger.debug(log(req.getSequence(), "Sending now.  is current sequence."));
-        }
-        try {
-            send(req);
-        } catch (AgentUnavailableException e) {
-            if (s_logger.isDebugEnabled()) {
-                s_logger.debug(log(req.getSequence(), "Unable to send the next sequence"));
-            }
-            cancel(req.getSequence());
-        }
-        _currentSequence = req.getSequence();
-    }
-
-    public void process(Answer[] answers) {
-        //do nothing
-    }
-
-    /**
-     * sends the request asynchronously.
-     * 
-     * @param req
-     * @throws AgentUnavailableException
-     */
-    public abstract void send(Request req) throws AgentUnavailableException;
-
-    /**
-     * Process disconnect.
-     * @param state state of the agent.
-     */
-    public abstract void disconnect(final Status state);
-
-    /**
-     * Is the agent closed for more commands?
-     * @return true if unable to reach agent or false if reachable.
-     */
-    protected abstract boolean isClosed();
-
-    protected class Alarm implements Runnable {
-        long _seq;
-        public Alarm(long seq) {
-            _seq = seq;
-        }
-
-        @Override
-        public void run() {
-            try {
-                Listener listener = unregisterListener(_seq);
-                if (listener != null) {
-                    cancel(_seq);
-                    listener.processTimeout(_id, _seq);
-                }
-            } catch (Exception e) {
-                s_logger.warn("Exception ", e);
-            }
-        }
-    }
-}


Mime
View raw message