stratos-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lahiru Sandaruwan <lahi...@wso2.com>
Subject Re: [2/4] cluster monitors hierarchy redesigned and docker cluster monitor improved
Date Wed, 01 Oct 2014 15:25:29 GMT
Hi guys,

It seems we have lost all the Git history of Cluster Monitors with this
commit :(
We should have Git move in this kind of cases.

Thanks.

On Tue, Sep 23, 2014 at 2:49 PM, <nirmal070125@apache.org> wrote:

>
> http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
> ----------------------------------------------------------------------
> diff --git
> a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
> b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
> index b8dcd73..6525eba 100644
> ---
> a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
> +++
> b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java
> @@ -19,34 +19,54 @@
>
>  package org.apache.stratos.autoscaler.message.receiver.topology;
>
> +import java.util.List;
> +
>  import org.apache.commons.logging.Log;
>  import org.apache.commons.logging.LogFactory;
> -import org.apache.stratos.autoscaler.*;
> +import org.apache.stratos.autoscaler.AutoscalerContext;
> +import org.apache.stratos.autoscaler.KubernetesClusterContext;
> +import org.apache.stratos.autoscaler.MemberStatsContext;
> +import org.apache.stratos.autoscaler.NetworkPartitionContext;
> +import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
> +import org.apache.stratos.autoscaler.PartitionContext;
>  import
> org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
>  import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
>  import
> org.apache.stratos.autoscaler.exception.PartitionValidationException;
>  import org.apache.stratos.autoscaler.exception.PolicyValidationException;
>  import org.apache.stratos.autoscaler.exception.TerminationException;
> -import org.apache.stratos.autoscaler.monitor.AbstractMonitor;
> -import org.apache.stratos.autoscaler.monitor.ClusterMonitor;
> -import org.apache.stratos.autoscaler.monitor.KubernetesClusterMonitor;
> -import org.apache.stratos.autoscaler.monitor.LbClusterMonitor;
> +import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
> +import org.apache.stratos.autoscaler.monitor.ClusterMonitorFactory;
> +import org.apache.stratos.autoscaler.monitor.ContainerClusterMonitor;
> +import org.apache.stratos.autoscaler.monitor.VMClusterMonitor;
>  import org.apache.stratos.autoscaler.partition.PartitionManager;
>  import org.apache.stratos.autoscaler.policy.PolicyManager;
>  import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
> -import org.apache.stratos.autoscaler.util.AutoscalerUtil;
> +import org.apache.stratos.common.enums.ClusterType;
>  import org.apache.stratos.messaging.domain.topology.Cluster;
>  import org.apache.stratos.messaging.domain.topology.Service;
>  import org.apache.stratos.messaging.event.Event;
> -import org.apache.stratos.messaging.event.topology.*;
> -import org.apache.stratos.messaging.listener.topology.*;
> +import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
> +import
> org.apache.stratos.messaging.event.topology.ClusterMaintenanceModeEvent;
> +import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
> +import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
> +import
> org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent;
> +import
> org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent;
> +import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
> +import
> org.apache.stratos.messaging.listener.topology.ClusterCreatedEventListener;
> +import
> org.apache.stratos.messaging.listener.topology.ClusterMaintenanceModeEventListener;
> +import
> org.apache.stratos.messaging.listener.topology.ClusterRemovedEventListener;
> +import
> org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
> +import
> org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
> +import
> org.apache.stratos.messaging.listener.topology.MemberMaintenanceListener;
> +import
> org.apache.stratos.messaging.listener.topology.MemberReadyToShutdownEventListener;
> +import
> org.apache.stratos.messaging.listener.topology.MemberStartedEventListener;
> +import
> org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener;
> +import
> org.apache.stratos.messaging.listener.topology.ServiceRemovedEventListener;
>  import
> org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
>  import
> org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
>  import org.drools.runtime.StatefulKnowledgeSession;
>  import org.drools.runtime.rule.FactHandle;
>
> -import java.util.List;
> -
>  /**
>   * Autoscaler topology receiver.
>   */
> @@ -116,42 +136,60 @@ public class AutoscalerTopologyEventReceiver
> implements Runnable {
>                  try {
>                      MemberReadyToShutdownEvent memberReadyToShutdownEvent
> = (MemberReadyToShutdownEvent)event;
>                      AutoscalerContext asCtx =
> AutoscalerContext.getInstance();
> -                    AbstractMonitor monitor;
> +                    AbstractClusterMonitor monitor;
>                      String clusterId =
> memberReadyToShutdownEvent.getClusterId();
>                      String memberId =
> memberReadyToShutdownEvent.getMemberId();
>
> -                    if(asCtx.monitorExist(clusterId)){
> -                        monitor = asCtx.getMonitor(clusterId);
> -                    }else if(asCtx.lbMonitorExist(clusterId)){
> -                        monitor = asCtx.getLBMonitor(clusterId);
> -                    }else{
> +                    if(asCtx.clusterMonitorExist(clusterId)) {
> +                        monitor = asCtx.getClusterMonitor(clusterId);
> +                    } else {
>                          if(log.isDebugEnabled()){
>                              log.debug(String.format("A cluster monitor is
> not found in autoscaler context [cluster] %s", clusterId));
>                          }
>                          return;
>                      }
> -
> -                    NetworkPartitionContext nwPartitionCtxt;
> -                    nwPartitionCtxt =
> monitor.getNetworkPartitionCtxt(memberReadyToShutdownEvent.getNetworkPartitionId());
> -
> -                    // start a new member in the same Partition
> -                    String partitionId =
> monitor.getPartitionOfMember(memberId);
> -                    PartitionContext partitionCtxt =
> nwPartitionCtxt.getPartitionCtxt(partitionId);
> -
> -
> -                    // terminate the shutdown ready member
> -                    CloudControllerClient ccClient =
> CloudControllerClient.getInstance();
> -                    ccClient.terminate(memberId);
> -
> -                    // remove from active member list
> -                    partitionCtxt.removeActiveMemberById(memberId);
> -
> -                    if (log.isInfoEnabled()) {
> -                        log.info(String.format("Member is terminated and
> removed from the active members list: [member] %s [partition] %s [cluster]
> %s ",
> -                                               memberId, partitionId,
> clusterId));
> +
> +                    TopologyManager.acquireReadLock();
> +
> +                    if(monitor.getClusterType() ==
> ClusterType.VMServiceCluster
> +                               || monitor.getClusterType() ==
> ClusterType.VMLbCluster) {
> +
> +                        NetworkPartitionContext nwPartitionCtxt;
> +                        nwPartitionCtxt = ((VMClusterMonitor)
> monitor).getNetworkPartitionCtxt(memberReadyToShutdownEvent.getNetworkPartitionId());
> +
> +                        // start a new member in the same Partition
> +                        String partitionId = ((VMClusterMonitor)
> monitor).getPartitionOfMember(memberId);
> +                        PartitionContext partitionCtxt =
> nwPartitionCtxt.getPartitionCtxt(partitionId);
> +
> +
> +                        // terminate the shutdown ready member
> +                        CloudControllerClient ccClient =
> CloudControllerClient.getInstance();
> +                        ccClient.terminate(memberId);
> +
> +                        // remove from active member list
> +                        partitionCtxt.removeActiveMemberById(memberId);
> +
> +                        if (log.isInfoEnabled()) {
> +                            log.info(String.format("Member is terminated
> and removed from the active members list: [member] %s [partition] %s
> [cluster] %s ",
> +                                                   memberId, partitionId,
> clusterId));
> +                        }
> +                    } else if(monitor.getClusterType() ==
> ClusterType.DockerServiceCluster) {
> +                       KubernetesClusterContext kubernetesClusterContext
> = ((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt();
> +                       //terminate the shutdown ready container
> +
>  CloudControllerClient.getInstance().terminateContainer(memberId);
> +                       //remove from active member list
> +
>  kubernetesClusterContext.removeActiveMemberById(memberId);
> +
> +                        if (log.isInfoEnabled()) {
> +                            log.info(String.format("Member is terminated
> and removed from the active members list: [member] %s [kub cluster] %s
> [cluster] %s ",
> +                                                   memberId,
> kubernetesClusterContext.getKubernetesClusterID(), clusterId));
> +                        }
>                      }
> +
>                  } catch (TerminationException e) {
>                      log.error(e);
> +                } finally {
> +                    TopologyManager.releaseReadLock();
>                  }
>              }
>
> @@ -185,12 +223,8 @@ public class AutoscalerTopologyEventReceiver
> implements Runnable {
>                      TopologyManager.acquireReadLock();
>                      Service service =
> TopologyManager.getTopology().getService(e.getServiceName());
>                      Cluster cluster =
> service.getCluster(e.getClusterId());
> -
> if(AutoscalerContext.getInstance().kubernetesClusterMonitorExist(cluster.getClusterId()))
> {
> -
>  AutoscalerContext.getInstance().getKubernetesClusterMonitor(e.getClusterId()).setStatus(e.getStatus());
> -                    } else
> if(AutoscalerContext.getInstance().monitorExist((cluster.getClusterId()))) {
> -
> AutoscalerContext.getInstance().getMonitor(e.getClusterId()).setStatus(e.getStatus());
> -                    } else if
> (AutoscalerContext.getInstance().lbMonitorExist((cluster.getClusterId()))) {
> -
> AutoscalerContext.getInstance().getLBMonitor(e.getClusterId()).setStatus(e.getStatus());
> +
> if(AutoscalerContext.getInstance().clusterMonitorExist(cluster.getClusterId()))
> {
> +
>  AutoscalerContext.getInstance().getClusterMonitor(e.getClusterId()).setStatus(e.getStatus());
>                      } else {
>                          log.error("cluster monitor not exists for the
> cluster: " + cluster.toString());
>                      }
> @@ -213,8 +247,7 @@ public class AutoscalerTopologyEventReceiver
> implements Runnable {
>                      String clusterId = e.getClusterId();
>                      String deploymentPolicy = e.getDeploymentPolicy();
>
> -                    AbstractMonitor monitor = null;
> -                    KubernetesClusterMonitor kubernetesClusterMonitor =
> null;
> +                    AbstractClusterMonitor monitor = null;
>
>                      if (e.isLbCluster()) {
>                          DeploymentPolicy depPolicy =
> PolicyManager.getInstance().getDeploymentPolicy(deploymentPolicy);
> @@ -239,13 +272,9 @@ public class AutoscalerTopologyEventReceiver
> implements Runnable {
>
>                              }
>                          }
> -                        monitor = AutoscalerContext.getInstance()
> -                                .removeLbMonitor(clusterId);
> -
> -                    } else {
> -                        monitor = AutoscalerContext.getInstance()
> -                                .removeMonitor(clusterId);
>                      }
> +
> +                    monitor =
> AutoscalerContext.getInstance().removeClusterMonitor(clusterId);
>
>                      // runTerminateAllRule(monitor);
>                      if (monitor != null) {
> @@ -280,43 +309,73 @@ public class AutoscalerTopologyEventReceiver
> implements Runnable {
>                      String networkPartitionId = e.getNetworkPartitionId();
>                      String clusterId = e.getClusterId();
>                      String partitionId = e.getPartitionId();
> -                    AbstractMonitor monitor;
> +                    String memberId = e.getMemberId();
> +                    AbstractClusterMonitor monitor;
> +
> +                    AutoscalerContext asCtx =
> AutoscalerContext.getInstance();
>
> -                    if
> (AutoscalerContext.getInstance().monitorExist(clusterId)) {
> -                        monitor =
> AutoscalerContext.getInstance().getMonitor(clusterId);
> +                    if(asCtx.clusterMonitorExist(clusterId)) {
> +                        monitor = asCtx.getClusterMonitor(clusterId);
>                      } else {
> -                        //This is LB member
> -                        monitor =
> AutoscalerContext.getInstance().getLBMonitor(clusterId);
> +                        if(log.isDebugEnabled()){
> +                            log.debug(String.format("A cluster monitor is
> not found in autoscaler context [cluster] %s", clusterId));
> +                        }
> +                        return;
>                      }
> +
> +                    if(monitor.getClusterType() ==
> ClusterType.VMServiceCluster
> +                               || monitor.getClusterType() ==
> ClusterType.VMLbCluster) {
> +
> +                        NetworkPartitionContext networkPartitionContext =
> ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId);
> +
> +                        PartitionContext partitionContext =
> networkPartitionContext.getPartitionCtxt(partitionId);
> +
> partitionContext.removeMemberStatsContext(memberId);
> +
> +                        if
> (partitionContext.removeTerminationPendingMember(memberId)) {
> +                            if (log.isDebugEnabled()) {
> +                                log.debug(String.format("Member is
> removed from termination pending members list: [member] %s", memberId));
> +                            }
> +                        } else if
> (partitionContext.removePendingMember(memberId)) {
> +                            if (log.isDebugEnabled()) {
> +                                log.debug(String.format("Member is
> removed from pending members list: [member] %s", memberId));
> +                            }
> +                        } else if
> (partitionContext.removeActiveMemberById(memberId)) {
> +                            log.warn(String.format("Member is in the
> wrong list and it is removed from active members list", memberId));
> +                        } else if
> (partitionContext.removeObsoleteMember(memberId)){
> +                               log.warn(String.format("Member's obsolated
> timeout has been expired and it is removed from obsolated members list",
> memberId));
> +                        } else {
> +                            log.warn(String.format("Member is not
> available in any of the list active, pending and termination pending",
> memberId));
> +                        }
>
> -                    NetworkPartitionContext networkPartitionContext =
> monitor.getNetworkPartitionCtxt(networkPartitionId);
> -
> -                    PartitionContext partitionContext =
> networkPartitionContext.getPartitionCtxt(partitionId);
> -                    String memberId = e.getMemberId();
> -                    partitionContext.removeMemberStatsContext(memberId);
> -
> -                    if
> (partitionContext.removeTerminationPendingMember(memberId)) {
> -                        if (log.isDebugEnabled()) {
> -                            log.debug(String.format("Member is removed
> from termination pending members list: [member] %s", memberId));
> +                        if (log.isInfoEnabled()) {
> +                            log.info(String.format("Member stat context
> has been removed successfully: [member] %s", memberId));
>                          }
> -                    } else if
> (partitionContext.removePendingMember(memberId)) {
> -                        if (log.isDebugEnabled()) {
> -                            log.debug(String.format("Member is removed
> from pending members list: [member] %s", memberId));
> +                    } else if(monitor.getClusterType() ==
> ClusterType.DockerServiceCluster) {
> +
> +                       KubernetesClusterContext kubernetesClusterContext
> = ((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt();
> +
>  kubernetesClusterContext.removeMemberStatsContext(memberId);
> +
> +                        if
> (kubernetesClusterContext.removeTerminationPendingMember(memberId)) {
> +                            if (log.isDebugEnabled()) {
> +                                log.debug(String.format("Member is
> removed from termination pending members list: [member] %s", memberId));
> +                            }
> +                        } else if
> (kubernetesClusterContext.removePendingMember(memberId)) {
> +                            if (log.isDebugEnabled()) {
> +                                log.debug(String.format("Member is
> removed from pending members list: [member] %s", memberId));
> +                            }
> +                        } else if
> (kubernetesClusterContext.removeActiveMemberById(memberId)) {
> +                            log.warn(String.format("Member is in the
> wrong list and it is removed from active members list", memberId));
> +                        } else if
> (kubernetesClusterContext.removeObsoleteMember(memberId)){
> +                               log.warn(String.format("Member's obsolated
> timeout has been expired and it is removed from obsolated members list",
> memberId));
> +                        } else {
> +                            log.warn(String.format("Member is not
> available in any of the list active, pending and termination pending",
> memberId));
>                          }
> -                    } else if
> (partitionContext.removeActiveMemberById(memberId)) {
> -                        log.warn(String.format("Member is in the wrong
> list and it is removed from active members list", memberId));
> -                    } else if
> (partitionContext.removeObsoleteMember(memberId)){
> -                       log.warn(String.format("Member's obsolated timeout
> has been expired and it is removed from obsolated members list", memberId));
> -                    } else {
> -                        log.warn(String.format("Member is not available
> in any of the list active, pending and termination pending", memberId));
> -                    }
>
> -                    if (log.isInfoEnabled()) {
> -                        log.info(String.format("Member stat context has
> been removed successfully: [member] %s", memberId));
> +                        if (log.isInfoEnabled()) {
> +                            log.info(String.format("Member stat context
> has been removed successfully: [member] %s", memberId));
> +                        }
>                      }
> -//                partitionContext.decrementCurrentActiveMemberCount(1);
> -
> -
> +
>                  } catch (Exception e) {
>                      log.error("Error processing event", e);
>                  } finally {
> @@ -338,24 +397,37 @@ public class AutoscalerTopologyEventReceiver
> implements Runnable {
>                      String partitionId = e.getPartitionId();
>                      String networkPartitionId = e.getNetworkPartitionId();
>
> -                    PartitionContext partitionContext;
>                      String clusterId = e.getClusterId();
> -                    AbstractMonitor monitor;
> -
> -                    if
> (AutoscalerContext.getInstance().monitorExist(clusterId)) {
> -                        monitor =
> AutoscalerContext.getInstance().getMonitor(clusterId);
> -                        partitionContext =
> monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
> +                    AbstractClusterMonitor monitor;
> +
> +                    AutoscalerContext asCtx =
> AutoscalerContext.getInstance();
> +                    if(asCtx.clusterMonitorExist(clusterId)) {
> +                        monitor = asCtx.getClusterMonitor(clusterId);
>                      } else {
> -                        monitor =
> AutoscalerContext.getInstance().getLBMonitor(clusterId);
> -                        partitionContext =
> monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
> -                    }
> -                    partitionContext.addMemberStatsContext(new
> MemberStatsContext(memberId));
> -                    if (log.isInfoEnabled()) {
> -                        log.info(String.format("Member stat context has
> been added successfully: [member] %s", memberId));
> +                        if(log.isDebugEnabled()){
> +                            log.debug(String.format("A cluster monitor is
> not found in autoscaler context [cluster] %s", clusterId));
> +                        }
> +                        return;
>                      }
> -//                partitionContext.incrementCurrentActiveMemberCount(1);
> -
> partitionContext.movePendingMemberToActiveMembers(memberId);
> -
> +
> +                    if (monitor.getClusterType() ==
> ClusterType.VMServiceCluster) {
> +                       PartitionContext partitionContext;
> +                        partitionContext = ((VMClusterMonitor)
> monitor).getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
> +                        partitionContext.addMemberStatsContext(new
> MemberStatsContext(memberId));
> +                        if (log.isInfoEnabled()) {
> +                            log.info(String.format("Member stat context
> has been added successfully: [member] %s", memberId));
> +                        }
> +
> partitionContext.movePendingMemberToActiveMembers(memberId);
> +                                       } else if(monitor.getClusterType()
> == ClusterType.DockerServiceCluster) {
> +                                               KubernetesClusterContext
> kubernetesClusterContext;
> +                                               kubernetesClusterContext =
> ((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt();
> +
>  kubernetesClusterContext.addMemberStatsContext(new
> MemberStatsContext(memberId));
> +                        if (log.isInfoEnabled()) {
> +                            log.info(String.format("Member stat context
> has been added successfully: [member] %s", memberId));
> +                        }
> +
>  kubernetesClusterContext.movePendingMemberToActiveMembers(memberId);
> +                                       }
> +
>                  } catch (Exception e) {
>                      log.error("Error processing event", e);
>                  } finally {
> @@ -368,42 +440,59 @@ public class AutoscalerTopologyEventReceiver
> implements Runnable {
>             @Override
>             protected void onEvent(Event event) {
>                 try {
> +                  TopologyManager.acquireReadLock();
> +
>                     MemberReadyToShutdownEvent memberReadyToShutdownEvent
> = (MemberReadyToShutdownEvent)event;
>                     AutoscalerContext asCtx =
> AutoscalerContext.getInstance();
> -                   AbstractMonitor monitor;
> +                   AbstractClusterMonitor monitor;
>                     String clusterId =
> memberReadyToShutdownEvent.getClusterId();
>                     String memberId =
> memberReadyToShutdownEvent.getMemberId();
>
> -                   if(asCtx.monitorExist(clusterId)){
> -                       monitor = asCtx.getMonitor(clusterId);
> -                   }else if(asCtx.lbMonitorExist(clusterId)){
> -                       monitor = asCtx.getLBMonitor(clusterId);
> -                   }else{
> +                   if(asCtx.clusterMonitorExist(clusterId)) {
> +                       monitor = asCtx.getClusterMonitor(clusterId);
> +                   } else {
>                         if(log.isDebugEnabled()){
>                             log.debug(String.format("A cluster monitor is
> not found in autoscaler context [cluster] %s", clusterId));
>                         }
>                         return;
>                     }
>
> -                   NetworkPartitionContext nwPartitionCtxt;
> -                   nwPartitionCtxt =
> monitor.getNetworkPartitionCtxt(memberReadyToShutdownEvent.getNetworkPartitionId());
> +                   if(monitor.getClusterType() ==
> ClusterType.VMServiceCluster
> +                                  || monitor.getClusterType() ==
> ClusterType.VMLbCluster) {
> +
> +                       NetworkPartitionContext nwPartitionCtxt;
> +                       nwPartitionCtxt = ((VMClusterMonitor)
> monitor).getNetworkPartitionCtxt(memberReadyToShutdownEvent.getNetworkPartitionId());
>
> -                   // start a new member in the same Partition
> -                   String partitionId =
> monitor.getPartitionOfMember(memberId);
> -                   PartitionContext partitionCtxt =
> nwPartitionCtxt.getPartitionCtxt(partitionId);
> +                       // start a new member in the same Partition
> +                       String partitionId = ((VMClusterMonitor)
> monitor).getPartitionOfMember(memberId);
> +                       PartitionContext partitionCtxt =
> nwPartitionCtxt.getPartitionCtxt(partitionId);
>
>
> -                   // terminate the shutdown ready member
> -                   CloudControllerClient ccClient =
> CloudControllerClient.getInstance();
> -                   ccClient.terminate(memberId);
> +                       // terminate the shutdown ready member
> +                       CloudControllerClient ccClient =
> CloudControllerClient.getInstance();
> +                       ccClient.terminate(memberId);
>
> -                   // remove from active member list
> -                   partitionCtxt.removeActiveMemberById(memberId);
> +                       // remove from active member list
> +                       partitionCtxt.removeActiveMemberById(memberId);
>
> -                   if (log.isInfoEnabled()) {
> -                       log.info(String.format("Member is terminated and
> removed from the active members list: [member] %s [partition] %s [cluster]
> %s ",
> -                                              memberId, partitionId,
> clusterId));
> +                       if (log.isInfoEnabled()) {
> +                           log.info(String.format("Member is terminated
> and removed from the active members list: [member] %s [partition] %s
> [cluster] %s ",
> +                                                  memberId, partitionId,
> clusterId));
> +                       }
> +                   } else if(monitor.getClusterType() ==
> ClusterType.DockerServiceCluster) {
> +                          KubernetesClusterContext
> kubernetesClusterContext;
> +                          kubernetesClusterContext =
> ((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt();
> +                          // terminate the shutdown ready member
> +
> CloudControllerClient.getInstance().terminateContainer(memberId);
> +                          // remove from active member list
> +
> kubernetesClusterContext.removeActiveMemberById(memberId);
> +
> +                       if (log.isInfoEnabled()) {
> +                           log.info(String.format("Member is terminated
> and removed from the active members list: [member] %s [kub cluster] %s
> [cluster] %s ",
> +                                                  memberId,
> kubernetesClusterContext.getKubernetesClusterID(), clusterId));
> +                       }
>                     }
> +
>                 } catch (TerminationException e) {
>                     log.error(e);
>                 }
> @@ -424,22 +513,38 @@ public class AutoscalerTopologyEventReceiver
> implements Runnable {
>                      String partitionId = e.getPartitionId();
>                      String networkPartitionId = e.getNetworkPartitionId();
>
> -                    PartitionContext partitionContext;
>                      String clusterId = e.getClusterId();
> -                    AbstractMonitor monitor;
> -
> -                    if
> (AutoscalerContext.getInstance().monitorExist(clusterId)) {
> -                        monitor =
> AutoscalerContext.getInstance().getMonitor(clusterId);
> -                        partitionContext =
> monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
> +                    AbstractClusterMonitor monitor;
> +
> +                    AutoscalerContext asCtx =
> AutoscalerContext.getInstance();
> +                    if (asCtx.clusterMonitorExist(clusterId)) {
> +                        monitor =
> AutoscalerContext.getInstance().getClusterMonitor(clusterId);
>                      } else {
> -                        monitor =
> AutoscalerContext.getInstance().getLBMonitor(clusterId);
> -                        partitionContext =
> monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
> +                        if(log.isDebugEnabled()){
> +                            log.debug(String.format("A cluster monitor is
> not found in autoscaler context [cluster] %s", clusterId));
> +                        }
> +                        return;
>                      }
> -                    partitionContext.addMemberStatsContext(new
> MemberStatsContext(memberId));
> -                    if (log.isDebugEnabled()) {
> -                        log.debug(String.format("Member has been moved as
> pending termination: [member] %s", memberId));
> +
> +                    if(monitor.getClusterType() ==
> ClusterType.VMServiceCluster
> +                                  || monitor.getClusterType() ==
> ClusterType.VMLbCluster) {
> +
> +                       PartitionContext partitionContext;
> +                       partitionContext = ((VMClusterMonitor)
> monitor).getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId);
> +                        partitionContext.addMemberStatsContext(new
> MemberStatsContext(memberId));
> +                        if (log.isDebugEnabled()) {
> +                            log.debug(String.format("Member has been
> moved as pending termination: [member] %s", memberId));
> +                        }
> +
> partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
> +                    } else if(monitor.getClusterType() ==
> ClusterType.DockerServiceCluster) {
> +                       KubernetesClusterContext kubernetesClusterContext;
> +                       kubernetesClusterContext =
> ((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt();
> +                       kubernetesClusterContext.addMemberStatsContext(new
> MemberStatsContext(memberId));
> +                        if (log.isDebugEnabled()) {
> +                            log.debug(String.format("Member has been
> moved as pending termination: [member] %s", memberId));
> +                        }
> +
>  kubernetesClusterContext.moveActiveMemberToTerminationPendingMembers(memberId);
>                      }
> -
> partitionContext.moveActiveMemberToTerminationPendingMembers(memberId);
>
>                  } catch (Exception e) {
>                      log.error("Error processing event", e);
> @@ -471,64 +576,15 @@ public class AutoscalerTopologyEventReceiver
> implements Runnable {
>          });
>      }
>
> -    private class LBClusterMonitorAdder implements Runnable {
> -        private Cluster cluster;
> -
> -        public LBClusterMonitorAdder(Cluster cluster) {
> -            this.cluster = cluster;
> -        }
> -
> -        public void run() {
> -            LbClusterMonitor monitor = null;
> -            int retries = 5;
> -            boolean success = false;
> -            do {
> -                try {
> -                    Thread.sleep(5000);
> -                } catch (InterruptedException e1) {
> -                }
> -                try {
> -                    monitor = AutoscalerUtil.getLBClusterMonitor(cluster);
> -                    success = true;
> -
> -                } catch (PolicyValidationException e) {
> -                    String msg = "LB Cluster monitor creation failed for
> cluster: " + cluster.getClusterId();
> -                    log.debug(msg, e);
> -                    retries--;
> -
> -                } catch (PartitionValidationException e) {
> -                    String msg = "LB Cluster monitor creation failed for
> cluster: " + cluster.getClusterId();
> -                    log.debug(msg, e);
> -                    retries--;
> -                }
> -            } while (!success && retries <= 0);
> -
> -            if (monitor == null) {
> -                String msg = "LB Cluster monitor creation failed, even
> after retrying for 5 times, "
> -                        + "for cluster: " + cluster.getClusterId();
> -                log.error(msg);
> -                throw new RuntimeException(msg);
> -            }
> -
> -            Thread th = new Thread(monitor);
> -            th.start();
> -            AutoscalerContext.getInstance().addLbMonitor(monitor);
> -            if (log.isInfoEnabled()) {
> -                log.info(String.format("LB Cluster monitor has been
> added successfully: [cluster] %s",
> -                        cluster.getClusterId()));
> -            }
> -        }
> -    }
> -
>      private class ClusterMonitorAdder implements Runnable {
>          private Cluster cluster;
> -
> +        private String clusterMonitorType;
>          public ClusterMonitorAdder(Cluster cluster) {
>              this.cluster = cluster;
>          }
>
>          public void run() {
> -            ClusterMonitor monitor = null;
> +            AbstractClusterMonitor monitor = null;
>              int retries = 5;
>              boolean success = false;
>              do {
> @@ -538,68 +594,23 @@ public class AutoscalerTopologyEventReceiver
> implements Runnable {
>                  }
>
>                  try {
> -                    monitor = AutoscalerUtil.getClusterMonitor(cluster);
> +                    monitor = ClusterMonitorFactory.getMonitor(cluster);
>                      success = true;
> -
> +                    clusterMonitorType = monitor.getClusterType().name();
>                  } catch (PolicyValidationException e) {
> -                    String msg = "Cluster monitor creation failed for
> cluster: " + cluster.getClusterId();
> +                    String msg = clusterMonitorType +" monitor creation
> failed for cluster: " + cluster.getClusterId();
>                      log.debug(msg, e);
>                      retries--;
>
>                  } catch (PartitionValidationException e) {
> -                    String msg = "Cluster monitor creation failed for
> cluster: " + cluster.getClusterId();
> -                    log.debug(msg, e);
> -                    retries--;
> -                }
> -            } while (!success && retries != 0);
> -
> -            if (monitor == null) {
> -                String msg = "Cluster monitor creation failed, even after
> retrying for 5 times, "
> -                        + "for cluster: " + cluster.getClusterId();
> -                log.error(msg);
> -                throw new RuntimeException(msg);
> -            }
> -
> -            Thread th = new Thread(monitor);
> -            th.start();
> -            AutoscalerContext.getInstance().addMonitor(monitor);
> -            if (log.isInfoEnabled()) {
> -                log.info(String.format("Cluster monitor has been added
> successfully: [cluster] %s",
> -                        cluster.getClusterId()));
> -            }
> -        }
> -    }
> -
> -    private class KubernetesClusterMonitorAdder implements Runnable {
> -        private Cluster cluster;
> -
> -        public KubernetesClusterMonitorAdder(Cluster cluster) {
> -            this.cluster = cluster;
> -        }
> -
> -        public void run() {
> -            KubernetesClusterMonitor monitor = null;
> -            int retries = 5;
> -            boolean success = false;
> -            do {
> -                try {
> -                    Thread.sleep(5000);
> -                } catch (InterruptedException e1) {
> -                }
> -
> -                try {
> -                    monitor =
> AutoscalerUtil.getKubernetesClusterMonitor(cluster);
> -                    success = true;
> -
> -                } catch (Exception e) {
> -                    String msg = "Kubernetes cluster monitor creation
> failed for cluster: " + cluster.getClusterId();
> +                    String msg = clusterMonitorType +" monitor creation
> failed for cluster: " + cluster.getClusterId();
>                      log.debug(msg, e);
>                      retries--;
>                  }
>              } while (!success && retries != 0);
>
>              if (monitor == null) {
> -                String msg = "Kubernetes cluster monitor creation failed,
> even after retrying for 5 times, "
> +                String msg = clusterMonitorType +" monitor creation
> failed, even after retrying for 5 times, "
>                          + "for cluster: " + cluster.getClusterId();
>                  log.error(msg);
>                  throw new RuntimeException(msg);
> @@ -607,16 +618,16 @@ public class AutoscalerTopologyEventReceiver
> implements Runnable {
>
>              Thread th = new Thread(monitor);
>              th.start();
> -
> AutoscalerContext.getInstance().addKubernetesClusterMonitor(monitor);
> +            AutoscalerContext.getInstance().addClusterMonitor(monitor);
>              if (log.isInfoEnabled()) {
> -                log.info(String.format("Kubernetes cluster monitor has
> been added successfully: [cluster] %s",
> -                        cluster.getClusterId()));
> +                log.info(String.format("%s monitor has been added
> successfully: [cluster] %s",
> +                        clusterMonitorType, cluster.getClusterId()));
>              }
>          }
>      }
> -
> +
>      @SuppressWarnings("unused")
> -       private void runTerminateAllRule(AbstractMonitor monitor) {
> +       private void runTerminateAllRule(VMClusterMonitor monitor) {
>
>          FactHandle terminateAllFactHandle = null;
>
> @@ -639,16 +650,9 @@ public class AutoscalerTopologyEventReceiver
> implements Runnable {
>
>      protected synchronized void startClusterMonitor(Cluster cluster) {
>          Thread th = null;
> -        if (cluster.isKubernetesCluster()
> -                       &&
> !AutoscalerContext.getInstance().kubernetesClusterMonitorExist(cluster.getClusterId()))
> {
> -               th = new Thread(new
> KubernetesClusterMonitorAdder(cluster));
> -        } else if (cluster.isLbCluster()
> -                       &&
> !AutoscalerContext.getInstance().lbMonitorExist(cluster.getClusterId())) {
> -            th = new Thread(new LBClusterMonitorAdder(cluster));
> -        } else if (!cluster.isLbCluster() &&
> !cluster.isKubernetesCluster()
> -                       &&
> !AutoscalerContext.getInstance().monitorExist(cluster.getClusterId())) {
> -            th = new Thread(new ClusterMonitorAdder(cluster));
> -        }
> +        if
> (!AutoscalerContext.getInstance().clusterMonitorExist(cluster.getClusterId()))
> {
> +               th = new Thread(new ClusterMonitorAdder(cluster));
> +        }
>          if (th != null) {
>              th.start();
>              try {
>
>
> http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
> ----------------------------------------------------------------------
> diff --git
> a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
> b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
> new file mode 100644
> index 0000000..00796f1
> --- /dev/null
> +++
> b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
> @@ -0,0 +1,127 @@
> +package org.apache.stratos.autoscaler.monitor;
> +
> +import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
> +import org.apache.stratos.common.enums.ClusterType;
> +import org.apache.stratos.messaging.domain.topology.ClusterStatus;
> +import org.drools.runtime.StatefulKnowledgeSession;
> +import org.drools.runtime.rule.FactHandle;
> +
> +public abstract class AbstractClusterMonitor implements Runnable{
> +
> +    private String clusterId;
> +    private String serviceId;
> +    private ClusterType clusterType;
> +       private ClusterStatus status;
> +       private int monitorInterval;
> +
> +       protected FactHandle minCheckFactHandle;
> +       protected FactHandle scaleCheckFactHandle;
> +       private StatefulKnowledgeSession minCheckKnowledgeSession;
> +       private StatefulKnowledgeSession scaleCheckKnowledgeSession;
> +       private boolean isDestroyed;
> +
> +       private AutoscalerRuleEvaluator autoscalerRuleEvaluator;
> +
> +       protected AbstractClusterMonitor(String clusterId, String
> serviceId, ClusterType clusterType,
> +                       AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
> +
> +               super();
> +               this.clusterId = clusterId;
> +               this.serviceId = serviceId;
> +               this.clusterType = clusterType;
> +               this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
> +        this.scaleCheckKnowledgeSession =
> autoscalerRuleEvaluator.getScaleCheckStatefulSession();
> +        this.minCheckKnowledgeSession =
> autoscalerRuleEvaluator.getMinCheckStatefulSession();
> +       }
> +
> +       protected abstract void readConfigurations();
> +       protected abstract void monitor();
> +    public abstract void destroy();
> +
> +       public String getClusterId() {
> +               return clusterId;
> +       }
> +
> +       public void setClusterId(String clusterId) {
> +               this.clusterId = clusterId;
> +       }
> +
> +       public void setStatus(ClusterStatus status) {
> +               this.status = status;
> +       }
> +
> +       public ClusterType getClusterType() {
> +               return clusterType;
> +       }
> +
> +       public ClusterStatus getStatus() {
> +               return status;
> +       }
> +
> +       public String getServiceId() {
> +               return serviceId;
> +       }
> +
> +       public void setServiceId(String serviceId) {
> +               this.serviceId = serviceId;
> +       }
> +
> +       public int getMonitorInterval() {
> +               return monitorInterval;
> +       }
> +
> +       public void setMonitorInterval(int monitorInterval) {
> +               this.monitorInterval = monitorInterval;
> +       }
> +
> +       public FactHandle getMinCheckFactHandle() {
> +               return minCheckFactHandle;
> +       }
> +
> +       public void setMinCheckFactHandle(FactHandle minCheckFactHandle) {
> +               this.minCheckFactHandle = minCheckFactHandle;
> +       }
> +
> +       public FactHandle getScaleCheckFactHandle() {
> +               return scaleCheckFactHandle;
> +       }
> +
> +       public void setScaleCheckFactHandle(FactHandle
> scaleCheckFactHandle) {
> +               this.scaleCheckFactHandle = scaleCheckFactHandle;
> +       }
> +
> +       public StatefulKnowledgeSession getMinCheckKnowledgeSession() {
> +               return minCheckKnowledgeSession;
> +       }
> +
> +       public void setMinCheckKnowledgeSession(
> +                       StatefulKnowledgeSession minCheckKnowledgeSession)
> {
> +               this.minCheckKnowledgeSession = minCheckKnowledgeSession;
> +       }
> +
> +       public StatefulKnowledgeSession getScaleCheckKnowledgeSession() {
> +               return scaleCheckKnowledgeSession;
> +       }
> +
> +       public void setScaleCheckKnowledgeSession(
> +                       StatefulKnowledgeSession
> scaleCheckKnowledgeSession) {
> +               this.scaleCheckKnowledgeSession =
> scaleCheckKnowledgeSession;
> +       }
> +
> +       public boolean isDestroyed() {
> +               return isDestroyed;
> +       }
> +
> +       public void setDestroyed(boolean isDestroyed) {
> +               this.isDestroyed = isDestroyed;
> +       }
> +
> +       public AutoscalerRuleEvaluator getAutoscalerRuleEvaluator() {
> +               return autoscalerRuleEvaluator;
> +       }
> +
> +       public void setAutoscalerRuleEvaluator(
> +                       AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
> +               this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
> +       }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractMonitor.java
> ----------------------------------------------------------------------
> diff --git
> a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractMonitor.java
> b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractMonitor.java
> deleted file mode 100644
> index c1441bb..0000000
> ---
> a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractMonitor.java
> +++ /dev/null
> @@ -1,203 +0,0 @@
> -/*
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - *  http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing,
> - * software distributed under the License is distributed on an
> - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> - * KIND, either express or implied.  See the License for the
> - * specific language governing permissions and limitations
> - * under the License.
> - */
> -package org.apache.stratos.autoscaler.monitor;
> -
> -import java.util.Map;
> -
> -import org.apache.commons.configuration.XMLConfiguration;
> -import org.apache.commons.logging.Log;
> -import org.apache.commons.logging.LogFactory;
> -import org.apache.stratos.autoscaler.NetworkPartitionContext;
> -import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
> -import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
> -import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
> -import org.apache.stratos.autoscaler.util.AutoScalerConstants;
> -import org.apache.stratos.autoscaler.util.ConfUtil;
> -import org.apache.stratos.messaging.domain.topology.Cluster;
> -import org.apache.stratos.messaging.domain.topology.Member;
> -import org.apache.stratos.messaging.domain.topology.Service;
> -import
> org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
> -import org.drools.runtime.StatefulKnowledgeSession;
> -import org.drools.runtime.rule.FactHandle;
> -
> -/**
> - * Is responsible for monitoring a service cluster. This runs periodically
> - * and perform minimum instance check and scaling check using the
> underlying
> - * rules engine.
> - *
> - */
> -   abstract public class AbstractMonitor implements Runnable{
> -
> -       private static final Log log =
> LogFactory.getLog(AbstractMonitor.class);
> -       // Map<NetworkpartitionId, Network Partition Context>
> -       protected Map<String, NetworkPartitionContext>
> networkPartitionCtxts;
> -       protected DeploymentPolicy deploymentPolicy;
> -       protected AutoscalePolicy autoscalePolicy;
> -
> -
> -       protected FactHandle minCheckFactHandle;
> -       protected FactHandle scaleCheckFactHandle;
> -
> -       protected StatefulKnowledgeSession minCheckKnowledgeSession;
> -       protected StatefulKnowledgeSession scaleCheckKnowledgeSession;
> -       protected boolean isDestroyed;
> -
> -       protected String clusterId;
> -       protected String serviceId;
> -
> -       protected AutoscalerRuleEvaluator autoscalerRuleEvaluator;
> -
> -    // time intereval between two runs of the Monitor. Default is 90000ms.
> -    protected int monitorInterval;
> -
> -    public AbstractMonitor() {
> -        readConfigurations();
> -    }
> -
> -    private void readConfigurations () {
> -
> -        XMLConfiguration conf =
> ConfUtil.getInstance(null).getConfiguration();
> -        monitorInterval =
> conf.getInt(AutoScalerConstants.AUTOSCALER_MONITOR_INTERVAL, 90000);
> -        if (log.isDebugEnabled()) {
> -            log.debug("Cluster Monitor task interval: " +
> getMonitorInterval());
> -        }
> -    }
> -
> -       @Override
> -       public void run() {
> -               // TODO Auto-generated method stub
> -
> -       }
> -
> -
> -       public NetworkPartitionContext getNetworkPartitionCtxt(Member
> member) {
> -               log.info("***** getNetworkPartitionCtxt " +
> member.getNetworkPartitionId());
> -               String networkPartitionId = member.getNetworkPartitionId();
> -       if(networkPartitionCtxts.containsKey(networkPartitionId)) {
> -               log.info("returnnig network partition context " +
> networkPartitionCtxts.get(networkPartitionId));
> -               return networkPartitionCtxts.get(networkPartitionId);
> -       }
> -       log.info("returning null getNetworkPartitionCtxt");
> -           return null;
> -       }
> -
> -    public String getPartitionOfMember(String memberId){
> -        for(Service service: TopologyManager.getTopology().getServices()){
> -            for(Cluster cluster: service.getClusters()){
> -                if(cluster.memberExists(memberId)){
> -                    return cluster.getMember(memberId).getPartitionId();
> -                }
> -            }
> -        }
> -        return null;
> -       }
> -
> -    public void destroy() {
> -        minCheckKnowledgeSession.dispose();
> -        scaleCheckKnowledgeSession.dispose();
> -        setDestroyed(true);
> -        if(log.isDebugEnabled()) {
> -            log.debug("Cluster Monitor Drools session has been disposed.
> "+this.toString());
> -        }
> -    }
> -
> -    public boolean isDestroyed() {
> -        return isDestroyed;
> -    }
> -
> -    public void setDestroyed(boolean isDestroyed) {
> -        this.isDestroyed = isDestroyed;
> -    }
> -
> -    public String getServiceId() {
> -        return serviceId;
> -    }
> -
> -    public void setServiceId(String serviceId) {
> -        this.serviceId = serviceId;
> -    }
> -
> -    public DeploymentPolicy getDeploymentPolicy() {
> -        return deploymentPolicy;
> -    }
> -
> -    public void setDeploymentPolicy(DeploymentPolicy deploymentPolicy) {
> -        this.deploymentPolicy = deploymentPolicy;
> -    }
> -
> -    public AutoscalePolicy getAutoscalePolicy() {
> -        return autoscalePolicy;
> -    }
> -
> -    public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) {
> -        this.autoscalePolicy = autoscalePolicy;
> -    }
> -
> -    public String getClusterId() {
> -        return clusterId;
> -    }
> -
> -    public void setClusterId(String clusterId) {
> -        this.clusterId = clusterId;
> -    }
> -
> -    public Map<String, NetworkPartitionContext>
> getNetworkPartitionCtxts() {
> -        return networkPartitionCtxts;
> -    }
> -
> -    public NetworkPartitionContext getNetworkPartitionCtxt(String
> networkPartitionId) {
> -        return networkPartitionCtxts.get(networkPartitionId);
> -    }
> -
> -    public void setPartitionCtxt(Map<String, NetworkPartitionContext>
> partitionCtxt) {
> -        this.networkPartitionCtxts = partitionCtxt;
> -    }
> -
> -    public boolean partitionCtxtAvailable(String partitionId) {
> -        return networkPartitionCtxts.containsKey(partitionId);
> -    }
> -
> -    public void addNetworkPartitionCtxt(NetworkPartitionContext ctxt) {
> -        this.networkPartitionCtxts.put(ctxt.getId(), ctxt);
> -    }
> -
> -    public NetworkPartitionContext getPartitionCtxt(String id) {
> -        return this.networkPartitionCtxts.get(id);
> -    }
> -
> -    public StatefulKnowledgeSession getMinCheckKnowledgeSession() {
> -        return minCheckKnowledgeSession;
> -    }
> -
> -    public void setMinCheckKnowledgeSession(StatefulKnowledgeSession
> minCheckKnowledgeSession) {
> -        this.minCheckKnowledgeSession = minCheckKnowledgeSession;
> -    }
> -
> -    public FactHandle getMinCheckFactHandle() {
> -        return minCheckFactHandle;
> -    }
> -
> -    public void setMinCheckFactHandle(FactHandle minCheckFactHandle) {
> -        this.minCheckFactHandle = minCheckFactHandle;
> -    }
> -
> -    public int getMonitorInterval() {
> -        return monitorInterval;
> -    }
> -}
> \ No newline at end of file
>
>
> http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java
> ----------------------------------------------------------------------
> diff --git
> a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java
> b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java
> deleted file mode 100644
> index 5bb478e..0000000
> ---
> a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java
> +++ /dev/null
> @@ -1,223 +0,0 @@
> -/*
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership.  The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License.  You may obtain a copy of the License at
> - *
> - *  http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing,
> - * software distributed under the License is distributed on an
> - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> - * KIND, either express or implied.  See the License for the
> - * specific language governing permissions and limitations
> - * under the License.
> - */
> -package org.apache.stratos.autoscaler.monitor;
> -
> -import org.apache.commons.logging.Log;
> -import org.apache.commons.logging.LogFactory;
> -import org.apache.stratos.autoscaler.NetworkPartitionContext;
> -import org.apache.stratos.autoscaler.PartitionContext;
> -import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
> -import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
> -import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
> -import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
> -import org.apache.stratos.cloud.controller.stub.pojo.Properties;
> -import org.apache.stratos.cloud.controller.stub.pojo.Property;
> -import org.apache.stratos.messaging.domain.topology.ClusterStatus;
> -
> -import java.util.ArrayList;
> -import java.util.List;
> -import java.util.concurrent.ConcurrentHashMap;
> -
> -/**
> - * Is responsible for monitoring a service cluster. This runs periodically
> - * and perform minimum instance check and scaling check using the
> underlying
> - * rules engine.
> - *
> - */
> -public class ClusterMonitor extends AbstractMonitor {
> -
> -    private static final Log log =
> LogFactory.getLog(ClusterMonitor.class);
> -    private String lbReferenceType;
> -    private boolean hasPrimary;
> -    private ClusterStatus status;
> -
> -    public ClusterMonitor(String clusterId, String serviceId,
> DeploymentPolicy deploymentPolicy,
> -                          AutoscalePolicy autoscalePolicy) {
> -        this.clusterId = clusterId;
> -        this.serviceId = serviceId;
> -
> -        this.autoscalerRuleEvaluator = new AutoscalerRuleEvaluator();
> -        this.scaleCheckKnowledgeSession =
> autoscalerRuleEvaluator.getScaleCheckStatefulSession();
> -        this.minCheckKnowledgeSession =
> autoscalerRuleEvaluator.getMinCheckStatefulSession();
> -
> -        this.deploymentPolicy = deploymentPolicy;
> -        this.autoscalePolicy = autoscalePolicy;
> -        networkPartitionCtxts = new ConcurrentHashMap<String,
> NetworkPartitionContext>();
> -    }
> -
> -
> -
> -    @Override
> -    public void run() {
> -
> -        try {
> -            // TODO make this configurable,
> -            // this is the delay the min check of normal cluster monitor
> to wait until LB monitor is added
> -            Thread.sleep(60000);
> -        } catch (InterruptedException ignore) {
> -        }
> -
> -        while (!isDestroyed()) {
> -            if (log.isDebugEnabled()) {
> -                log.debug("Cluster monitor is running.. " +
> this.toString());
> -            }
> -            try {
> -                if(!ClusterStatus.In_Maintenance.equals(status)) {
> -                    monitor();
> -                } else {
> -                    if (log.isDebugEnabled()) {
> -                        log.debug("Cluster monitor is suspended as the
> cluster is in " +
> -                                    ClusterStatus.In_Maintenance + "
> mode......");
> -                    }
> -                }
> -            } catch (Exception e) {
> -                log.error("Cluster monitor: Monitor failed." +
> this.toString(), e);
> -            }
> -            try {
> -                Thread.sleep(monitorInterval);
> -            } catch (InterruptedException ignore) {
> -            }
> -        }
> -    }
> -
> -    private boolean isPrimaryMember(MemberContext memberContext){
> -        Properties props = memberContext.getProperties();
> -        if (log.isDebugEnabled()) {
> -            log.debug(" Properties [" + props + "] ");
> -        }
> -        if (props != null && props.getProperties() != null) {
> -            for (Property prop : props.getProperties()) {
> -                if (prop.getName().equals("PRIMARY")) {
> -                    if (Boolean.parseBoolean(prop.getValue())) {
> -                        log.debug("Adding member id [" +
> memberContext.getMemberId() + "] " +
> -                                "member instance id [" +
> memberContext.getInstanceId() + "] as a primary member");
> -                        return true;
> -                    }
> -                }
> -            }
> -        }
> -        return false;
> -    }
> -
> -    private void monitor() {
> -
> -        //TODO make this concurrent
> -        for (NetworkPartitionContext networkPartitionContext :
> networkPartitionCtxts.values()) {
> -            // store primary members in the network partition context
> -            List<String> primaryMemberListInNetworkPartition = new
> ArrayList<String>();
> -
> -            //minimum check per partition
> -            for (PartitionContext partitionContext :
> networkPartitionContext.getPartitionCtxts().values()) {
> -                // store primary members in the partition context
> -                List<String> primaryMemberListInPartition = new
> ArrayList<String>();
> -                // get active primary members in this partition context
> -                for (MemberContext memberContext :
> partitionContext.getActiveMembers()) {
> -                    if (isPrimaryMember(memberContext)){
> -
> primaryMemberListInPartition.add(memberContext.getMemberId());
> -                    }
> -                }
> -                // get pending primary members in this partition context
> -                for (MemberContext memberContext :
> partitionContext.getPendingMembers()) {
> -                    if (isPrimaryMember(memberContext)){
> -
> primaryMemberListInPartition.add(memberContext.getMemberId());
> -                    }
> -                }
> -
> primaryMemberListInNetworkPartition.addAll(primaryMemberListInPartition);
> -                minCheckKnowledgeSession.setGlobal("clusterId",
> clusterId);
> -                minCheckKnowledgeSession.setGlobal("lbRef",
> lbReferenceType);
> -                minCheckKnowledgeSession.setGlobal("isPrimary",
> hasPrimary);
> -                minCheckKnowledgeSession.setGlobal("primaryMemberCount",
> primaryMemberListInPartition.size());
> -
> -                if (log.isDebugEnabled()) {
> -                    log.debug(String.format("Running minimum check for
> partition %s ", partitionContext.getPartitionId()));
> -                }
> -
> -                minCheckFactHandle =
> AutoscalerRuleEvaluator.evaluateMinCheck(minCheckKnowledgeSession
> -                        , minCheckFactHandle, partitionContext);
> -
> -            }
> -
> -            boolean rifReset = networkPartitionContext.isRifReset();
> -            boolean memoryConsumptionReset =
> networkPartitionContext.isMemoryConsumptionReset();
> -            boolean loadAverageReset =
> networkPartitionContext.isLoadAverageReset();
> -            if (log.isDebugEnabled()) {
> -                log.debug("flag of rifReset: "  + rifReset + " flag of
> memoryConsumptionReset" + memoryConsumptionReset
> -                        + " flag of loadAverageReset" + loadAverageReset);
> -            }
> -            if (rifReset || memoryConsumptionReset || loadAverageReset) {
> -                scaleCheckKnowledgeSession.setGlobal("clusterId",
> clusterId);
> -
> //scaleCheckKnowledgeSession.setGlobal("deploymentPolicy",
> deploymentPolicy);
> -                scaleCheckKnowledgeSession.setGlobal("autoscalePolicy",
> autoscalePolicy);
> -                scaleCheckKnowledgeSession.setGlobal("rifReset",
> rifReset);
> -                scaleCheckKnowledgeSession.setGlobal("mcReset",
> memoryConsumptionReset);
> -                scaleCheckKnowledgeSession.setGlobal("laReset",
> loadAverageReset);
> -                scaleCheckKnowledgeSession.setGlobal("lbRef",
> lbReferenceType);
> -                scaleCheckKnowledgeSession.setGlobal("isPrimary", false);
> -                scaleCheckKnowledgeSession.setGlobal("primaryMembers",
> primaryMemberListInNetworkPartition);
> -
> -                if (log.isDebugEnabled()) {
> -                    log.debug(String.format("Running scale check for
> network partition %s ", networkPartitionContext.getId()));
> -                    log.debug(" Primary members : " +
> primaryMemberListInNetworkPartition);
> -                }
> -
> -                scaleCheckFactHandle =
> AutoscalerRuleEvaluator.evaluateScaleCheck(scaleCheckKnowledgeSession
> -                        , scaleCheckFactHandle, networkPartitionContext);
> -
> -                networkPartitionContext.setRifReset(false);
> -                networkPartitionContext.setMemoryConsumptionReset(false);
> -                networkPartitionContext.setLoadAverageReset(false);
> -            } else if (log.isDebugEnabled()) {
> -                log.debug(String.format("Scale rule will not run since
> the LB statistics have not received before this " +
> -                        "cycle for network partition %s",
> networkPartitionContext.getId()));
> -            }
> -        }
> -    }
> -
> -    @Override
> -    public String toString() {
> -        return "ClusterMonitor [clusterId=" + clusterId + ", serviceId="
> + serviceId +
> -                ", deploymentPolicy=" + deploymentPolicy + ",
> autoscalePolicy=" + autoscalePolicy +
> -                ", lbReferenceType=" + lbReferenceType +
> -                ", hasPrimary=" + hasPrimary + " ]";
> -    }
> -
> -    public String getLbReferenceType() {
> -        return lbReferenceType;
> -    }
> -
> -    public void setLbReferenceType(String lbReferenceType) {
> -        this.lbReferenceType = lbReferenceType;
> -    }
> -
> -    public boolean isHasPrimary() {
> -        return hasPrimary;
> -    }
> -
> -    public void setHasPrimary(boolean hasPrimary) {
> -        this.hasPrimary = hasPrimary;
> -    }
> -
> -    public ClusterStatus getStatus() {
> -        return status;
> -    }
> -
> -    public void setStatus(ClusterStatus status) {
> -        this.status = status;
> -    }
> -}
>
>
> http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
> ----------------------------------------------------------------------
> diff --git
> a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
> b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
> new file mode 100644
> index 0000000..489078e
> --- /dev/null
> +++
> b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java
> @@ -0,0 +1,336 @@
> +package org.apache.stratos.autoscaler.monitor;
> +
> +import java.util.Map;
> +import java.util.Random;
> +
> +import org.apache.commons.logging.Log;
> +import org.apache.commons.logging.LogFactory;
> +import org.apache.stratos.autoscaler.KubernetesClusterContext;
> +import org.apache.stratos.autoscaler.MemberStatsContext;
> +import org.apache.stratos.autoscaler.NetworkPartitionContext;
> +import org.apache.stratos.autoscaler.NetworkPartitionLbHolder;
> +import org.apache.stratos.autoscaler.PartitionContext;
> +import
> org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
> +import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
> +import
> org.apache.stratos.autoscaler.exception.PartitionValidationException;
> +import org.apache.stratos.autoscaler.exception.PolicyValidationException;
> +import org.apache.stratos.autoscaler.partition.PartitionGroup;
> +import org.apache.stratos.autoscaler.partition.PartitionManager;
> +import org.apache.stratos.autoscaler.policy.PolicyManager;
> +import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
> +import
> org.apache.stratos.cloud.controller.stub.deployment.partition.Partition;
> +import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
> +import org.apache.stratos.cloud.controller.stub.pojo.Properties;
> +import org.apache.stratos.cloud.controller.stub.pojo.Property;
> +import org.apache.stratos.common.constants.StratosConstants;
> +import org.apache.stratos.messaging.domain.topology.Cluster;
> +import org.apache.stratos.messaging.domain.topology.ClusterStatus;
> +import org.apache.stratos.messaging.domain.topology.Member;
> +import org.apache.stratos.messaging.domain.topology.MemberStatus;
> +import org.apache.stratos.messaging.util.Constants;
> +
> +public class ClusterMonitorFactory {
> +
> +       private static final Log log =
> LogFactory.getLog(ClusterMonitorFactory.class);
> +
> +       public static AbstractClusterMonitor getMonitor(Cluster cluster)
> throws PolicyValidationException, PartitionValidationException {
> +
> +               AbstractClusterMonitor clusterMonitor;
> +               if(cluster.isKubernetesCluster()){
> +                       clusterMonitor =
> getDockerServiceClusterMonitor(cluster);
> +               } else if (cluster.isLbCluster()){
> +                       clusterMonitor = getVMLbClusterMonitor(cluster);
> +               } else {
> +                       clusterMonitor =
> getVMServiceClusterMonitor(cluster);
> +               }
> +
> +               return clusterMonitor;
> +       }
> +
> +    private static VMServiceClusterMonitor
> getVMServiceClusterMonitor(Cluster cluster) throws
> PolicyValidationException, PartitionValidationException {
> +        // FIXME fix the following code to correctly update
> +        // AutoscalerContext context = AutoscalerContext.getInstance();
> +        if (null == cluster) {
> +            return null;
> +        }
> +
> +        String autoscalePolicyName = cluster.getAutoscalePolicyName();
> +        String deploymentPolicyName = cluster.getDeploymentPolicyName();
> +
> +        if (log.isDebugEnabled()) {
> +            log.debug("Deployment policy name: " + deploymentPolicyName);
> +            log.debug("Autoscaler policy name: " + autoscalePolicyName);
> +        }
> +
> +        AutoscalePolicy policy =
> +                                 PolicyManager.getInstance()
> +
> .getAutoscalePolicy(autoscalePolicyName);
> +        DeploymentPolicy deploymentPolicy =
> +                                            PolicyManager.getInstance()
> +
>  .getDeploymentPolicy(deploymentPolicyName);
> +
> +        if (deploymentPolicy == null) {
> +            String msg = "Deployment Policy is null. Policy name: " +
> deploymentPolicyName;
> +            log.error(msg);
> +            throw new PolicyValidationException(msg);
> +        }
> +
> +        Partition[] allPartitions = deploymentPolicy.getAllPartitions();
> +        if (allPartitions == null) {
> +            String msg =
> +                         "Deployment Policy's Partitions are null. Policy
> name: " +
> +                                 deploymentPolicyName;
> +            log.error(msg);
> +            throw new PolicyValidationException(msg);
> +        }
> +
> +
> CloudControllerClient.getInstance().validateDeploymentPolicy(cluster.getServiceName(),
> deploymentPolicy);
> +
> +        VMServiceClusterMonitor clusterMonitor =
> +                                        new
> VMServiceClusterMonitor(cluster.getClusterId(),
> +
>  cluster.getServiceName(),
> +
>  deploymentPolicy, policy);
> +        clusterMonitor.setStatus(ClusterStatus.Created);
> +
> +        for (PartitionGroup partitionGroup:
> deploymentPolicy.getPartitionGroups()){
> +
> +            NetworkPartitionContext networkPartitionContext = new
> NetworkPartitionContext(partitionGroup.getId(),
> +                    partitionGroup.getPartitionAlgo(),
> partitionGroup.getPartitions());
> +
> +            for(Partition partition: partitionGroup.getPartitions()){
> +                PartitionContext partitionContext = new
> PartitionContext(partition);
> +                partitionContext.setServiceName(cluster.getServiceName());
> +                partitionContext.setProperties(cluster.getProperties());
> +
> partitionContext.setNetworkPartitionId(partitionGroup.getId());
> +
> +                for (Member member: cluster.getMembers()){
> +                    String memberId = member.getMemberId();
> +
> if(member.getPartitionId().equalsIgnoreCase(partition.getId())){
> +                        MemberContext memberContext = new MemberContext();
> +                        memberContext.setClusterId(member.getClusterId());
> +                        memberContext.setMemberId(memberId);
> +                        memberContext.setPartition(partition);
> +
> memberContext.setProperties(convertMemberPropsToMemberContextProps(member.getProperties()));
> +
> +
> if(MemberStatus.Activated.equals(member.getStatus())){
> +
> partitionContext.addActiveMember(memberContext);
> +//
> networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(),
> 1);
> +//
> partitionContext.incrementCurrentActiveMemberCount(1);
> +
> +                        } else
> if(MemberStatus.Created.equals(member.getStatus()) ||
> MemberStatus.Starting.equals(member.getStatus())){
> +
> partitionContext.addPendingMember(memberContext);
> +
> +//
> networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(),
> 1);
> +                        } else
> if(MemberStatus.Suspended.equals(member.getStatus())){
> +//                            partitionContext.addFaultyMember(memberId);
> +                        }
> +                        partitionContext.addMemberStatsContext(new
> MemberStatsContext(memberId));
> +                        if(log.isInfoEnabled()){
> +                            log.info(String.format("Member stat context
> has been added: [member] %s", memberId));
> +                        }
> +                    }
> +
> +                }
> +
> networkPartitionContext.addPartitionContext(partitionContext);
> +                if(log.isInfoEnabled()){
> +                    log.info(String.format("Partition context has been
> added: [partition] %s",
> +                            partitionContext.getPartitionId()));
> +                }
> +            }
> +
> +
> clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
> +            if(log.isInfoEnabled()){
> +                log.info(String.format("Network partition context has
> been added: [network partition] %s",
> +                            networkPartitionContext.getId()));
> +            }
> +        }
> +
> +
> +        // find lb reference type
> +        java.util.Properties props = cluster.getProperties();
> +
> +        if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
> +            String value = props.getProperty(Constants.LOAD_BALANCER_REF);
> +            clusterMonitor.setLbReferenceType(value);
> +            if(log.isDebugEnabled()) {
> +                log.debug("Set the lb reference type: "+value);
> +            }
> +        }
> +
> +        // set hasPrimary property
> +        // hasPrimary is true if there are primary members available in
> that cluster
> +
> clusterMonitor.setHasPrimary(Boolean.parseBoolean(cluster.getProperties().getProperty(Constants.IS_PRIMARY)));
> +
> +        log.info("Cluster monitor created: "+clusterMonitor.toString());
> +        return clusterMonitor;
> +    }
> +
> +    private static Properties convertMemberPropsToMemberContextProps(
> +                       java.util.Properties properties) {
> +       Properties props = new Properties();
> +       for (Map.Entry<Object, Object> e : properties.entrySet()        ) {
> +                       Property prop = new Property();
> +                       prop.setName((String)e.getKey());
> +                       prop.setValue((String)e.getValue());
> +                       props.addProperties(prop);
> +               }
> +               return props;
> +       }
> +
> +
> +       private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster
> cluster) throws PolicyValidationException, PartitionValidationException {
> +        // FIXME fix the following code to correctly update
> +        // AutoscalerContext context = AutoscalerContext.getInstance();
> +        if (null == cluster) {
> +            return null;
> +        }
> +
> +        String autoscalePolicyName = cluster.getAutoscalePolicyName();
> +        String deploymentPolicyName = cluster.getDeploymentPolicyName();
> +
> +        if (log.isDebugEnabled()) {
> +            log.debug("Deployment policy name: " + deploymentPolicyName);
> +            log.debug("Autoscaler policy name: " + autoscalePolicyName);
> +        }
> +
> +        AutoscalePolicy policy =
> +                                 PolicyManager.getInstance()
> +
> .getAutoscalePolicy(autoscalePolicyName);
> +        DeploymentPolicy deploymentPolicy =
> +                                            PolicyManager.getInstance()
> +
>  .getDeploymentPolicy(deploymentPolicyName);
> +
> +        if (deploymentPolicy == null) {
> +            String msg = "Deployment Policy is null. Policy name: " +
> deploymentPolicyName;
> +            log.error(msg);
> +            throw new PolicyValidationException(msg);
> +        }
> +
> +        String clusterId = cluster.getClusterId();
> +        VMLbClusterMonitor clusterMonitor =
> +                                        new VMLbClusterMonitor(clusterId,
> +
>  cluster.getServiceName(),
> +
>  deploymentPolicy, policy);
> +        clusterMonitor.setStatus(ClusterStatus.Created);
> +        // partition group = network partition context
> +        for (PartitionGroup partitionGroup :
> deploymentPolicy.getPartitionGroups()) {
> +
> +            NetworkPartitionLbHolder networkPartitionLbHolder =
> +
> PartitionManager.getInstance()
> +
>     .getNetworkPartitionLbHolder(partitionGroup.getId());
> +//
> PartitionManager.getInstance()
> +//
>       .getNetworkPartitionLbHolder(partitionGroup.getId());
> +            // FIXME pick a random partition
> +            Partition partition =
> +                                  partitionGroup.getPartitions()[new
> Random().nextInt(partitionGroup.getPartitions().length)];
> +            PartitionContext partitionContext = new
> PartitionContext(partition);
> +            partitionContext.setServiceName(cluster.getServiceName());
> +            partitionContext.setProperties(cluster.getProperties());
> +
> partitionContext.setNetworkPartitionId(partitionGroup.getId());
> +            partitionContext.setMinimumMemberCount(1);//Here it hard
> codes the minimum value as one for LB cartridge partitions
> +
> +            NetworkPartitionContext networkPartitionContext = new
> NetworkPartitionContext(partitionGroup.getId(),
> +                    partitionGroup.getPartitionAlgo(),
> partitionGroup.getPartitions()) ;
> +            for (Member member : cluster.getMembers()) {
> +                String memberId = member.getMemberId();
> +                if
> (member.getNetworkPartitionId().equalsIgnoreCase(networkPartitionContext.getId()))
> {
> +                    MemberContext memberContext = new MemberContext();
> +                    memberContext.setClusterId(member.getClusterId());
> +                    memberContext.setMemberId(memberId);
> +                    memberContext.setPartition(partition);
> +
> +                    if
> (MemberStatus.Activated.equals(member.getStatus())) {
> +                        partitionContext.addActiveMember(memberContext);
> +//
> networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(),
> 1);
> +//
> partitionContext.incrementCurrentActiveMemberCount(1);
> +                    } else if
> (MemberStatus.Created.equals(member.getStatus()) ||
> +
>  MemberStatus.Starting.equals(member.getStatus())) {
> +                        partitionContext.addPendingMember(memberContext);
> +//
> networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(),
> 1);
> +                    } else if
> (MemberStatus.Suspended.equals(member.getStatus())) {
> +//                        partitionContext.addFaultyMember(memberId);
> +                    }
> +
> +                    partitionContext.addMemberStatsContext(new
> MemberStatsContext(memberId));
> +                    if(log.isInfoEnabled()){
> +                        log.info(String.format("Member stat context has
> been added: [member] %s", memberId));
> +                    }
> +                }
> +
> +            }
> +            networkPartitionContext.addPartitionContext(partitionContext);
> +
> +            // populate lb cluster id in network partition context.
> +            java.util.Properties props = cluster.getProperties();
> +
> +            // get service type of load balanced cluster
> +            String loadBalancedServiceType =
> props.getProperty(Constants.LOAD_BALANCED_SERVICE_TYPE);
> +
> +            if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
> +                String value =
> props.getProperty(Constants.LOAD_BALANCER_REF);
> +
> +                if
> (value.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER))
> {
> +
> networkPartitionLbHolder.setDefaultLbClusterId(clusterId);
> +
> +                } else if
> (value.equals(org.apache.stratos.messaging.util.Constants.SERVICE_AWARE_LOAD_BALANCER))
> {
> +                    String serviceName = cluster.getServiceName();
> +                    // TODO: check if this is correct
> +                    networkPartitionLbHolder.addServiceLB(serviceName,
> clusterId);
> +
> +                    if (loadBalancedServiceType != null &&
> !loadBalancedServiceType.isEmpty()) {
> +
> networkPartitionLbHolder.addServiceLB(loadBalancedServiceType, clusterId);
> +                        if (log.isDebugEnabled()) {
> +                            log.debug("Added cluster id " + clusterId + "
> as the LB cluster id for service type " + loadBalancedServiceType);
> +                        }
> +                    }
> +                }
> +            }
> +
> +
> clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
> +        }
> +
> +        log.info("LB Cluster monitor created:
> "+clusterMonitor.toString());
> +        return clusterMonitor;
> +    }
> +
> +    private static DockerServiceClusterMonitor
> getDockerServiceClusterMonitor(Cluster cluster) {
> +
> +       if (null == cluster) {
> +            return null;
> +        }
> +
> +        String autoscalePolicyName = cluster.getAutoscalePolicyName();
> +        if (log.isDebugEnabled()) {
> +            log.debug("Autoscaler policy name: " + autoscalePolicyName);
> +        }
> +
> +        AutoscalePolicy policy =
> PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName);
> +        java.util.Properties props = cluster.getProperties();
> +        String kubernetesHostClusterID =
> props.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID);
> +               KubernetesClusterContext kubernetesClusterCtxt = new
> KubernetesClusterContext(kubernetesHostClusterID);
> +
> +        DockerServiceClusterMonitor dockerClusterMonitor = new
> DockerServiceClusterMonitor(
> +                       kubernetesClusterCtxt,
> +                       cluster.getClusterId(),
> +                       cluster.getServiceName(),
> +                       policy);
> +
> +        dockerClusterMonitor.setStatus(ClusterStatus.Created);
> +
> +        // find lb reference type
> +        if(props.containsKey(Constants.LOAD_BALANCER_REF)) {
> +            String value = props.getProperty(Constants.LOAD_BALANCER_REF);
> +            dockerClusterMonitor.setLbReferenceType(value);
> +            if(log.isDebugEnabled()) {
> +                log.debug("Set the lb reference type: "+value);
> +            }
> +        }
> +
> +//        // set hasPrimary property
> +//        // hasPrimary is true if there are primary members available in
> that cluster
> +//
> dockerClusterMonitor.setHasPrimary(Boolean.parseBoolean(props.getProperty(Constants.IS_PRIMARY)));
> +
> +        log.info("Docker cluster monitor created: "+
> dockerClusterMonitor.toString());
> +        return dockerClusterMonitor;
> +    }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
> ----------------------------------------------------------------------
> diff --git
> a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
> b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
> new file mode 100644
> index 0000000..f9b9047
> --- /dev/null
> +++
> b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java
> @@ -0,0 +1,38 @@
> +package org.apache.stratos.autoscaler.monitor;
> +
> +import org.apache.stratos.autoscaler.KubernetesClusterContext;
> +import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
> +import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
> +import org.apache.stratos.common.enums.ClusterType;
> +
> +public abstract class ContainerClusterMonitor extends
> AbstractClusterMonitor {
> +
> +       private KubernetesClusterContext kubernetesClusterCtxt;
> +       protected AutoscalePolicy autoscalePolicy;
> +
> +       protected ContainerClusterMonitor(String clusterId, String
> serviceId, ClusterType clusterType,
> +                       KubernetesClusterContext kubernetesClusterContext,
> +                       AutoscalerRuleEvaluator autoscalerRuleEvaluator,
> AutoscalePolicy autoscalePolicy){
> +
> +               super(clusterId, serviceId, clusterType,
> autoscalerRuleEvaluator);
> +               this.kubernetesClusterCtxt = kubernetesClusterContext;
> +               this.autoscalePolicy = autoscalePolicy;
> +       }
> +
> +       public KubernetesClusterContext getKubernetesClusterCtxt() {
> +               return kubernetesClusterCtxt;
> +       }
> +
> +       public void setKubernetesClusterCtxt(
> +                       KubernetesClusterContext kubernetesClusterCtxt) {
> +               this.kubernetesClusterCtxt = kubernetesClusterCtxt;
> +       }
> +
> +       public AutoscalePolicy getAutoscalePolicy() {
> +               return autoscalePolicy;
> +       }
> +
> +       public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) {
> +               this.autoscalePolicy = autoscalePolicy;
> +       }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
> ----------------------------------------------------------------------
> diff --git
> a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
> b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
> new file mode 100644
> index 0000000..ca39b6a
> --- /dev/null
> +++
> b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java
> @@ -0,0 +1,156 @@
> +package org.apache.stratos.autoscaler.monitor;
> +
> +import java.util.Properties;
> +
> +import org.apache.commons.configuration.XMLConfiguration;
> +import org.apache.commons.logging.Log;
> +import org.apache.commons.logging.LogFactory;
> +import org.apache.stratos.autoscaler.KubernetesClusterContext;
> +import
> org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
> +import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
> +import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
> +import org.apache.stratos.autoscaler.util.AutoScalerConstants;
> +import org.apache.stratos.autoscaler.util.ConfUtil;
> +import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
> +import org.apache.stratos.common.constants.StratosConstants;
> +import org.apache.stratos.common.enums.ClusterType;
> +import org.apache.stratos.messaging.domain.topology.ClusterStatus;
> +import
> org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
> +
> +public final class DockerServiceClusterMonitor extends
> ContainerClusterMonitor{
> +
> +       private static final Log log =
> LogFactory.getLog(DockerServiceClusterMonitor.class);
> +
> +       private String lbReferenceType;
> +    private int numberOfReplicasInServiceCluster = 0;
> +       int retryInterval = 60000;
> +
> +    public DockerServiceClusterMonitor(KubernetesClusterContext
> kubernetesClusterCtxt,
> +               String serviceClusterID, String serviceId, AutoscalePolicy
> autoscalePolicy) {
> +       super(serviceClusterID, serviceId,
> ClusterType.DockerServiceCluster, kubernetesClusterCtxt,
> +                       new AutoscalerRuleEvaluator(), autoscalePolicy);
> +        readConfigurations();
> +    }
> +
> +       @Override
> +       public void run() {
> +               try {
> +                       // TODO make this configurable,
> +                       // this is the delay the min check of normal
> cluster monitor to wait
> +                       // until LB monitor is added
> +                       Thread.sleep(60000);
> +               } catch (InterruptedException ignore) {
> +               }
> +
> +               while (!isDestroyed()) {
> +                       if (log.isDebugEnabled()) {
> +                               log.debug("Kubernetes cluster monitor is
> running.. " + this.toString());
> +                       }
> +                       try {
> +                               if
> (!ClusterStatus.In_Maintenance.equals(getStatus())) {
> +                                       monitor();
> +                               } else {
> +                                       if (log.isDebugEnabled()) {
> +                                               log.debug("Kubernetes
> cluster monitor is suspended as the cluster is in "
> +                                                               +
> ClusterStatus.In_Maintenance + " mode......");
> +                                       }
> +                               }
> +                       } catch (Exception e) {
> +                               log.error("Kubernetes cluster monitor:
> Monitor failed." + this.toString(),
> +                                               e);
> +                       }
> +                       try {
> +                               Thread.sleep(getMonitorInterval());
> +                       } catch (InterruptedException ignore) {
> +                       }
> +               }
> +       }
> +
> +       @Override
> +       protected void monitor() {
> +
> +           // is container created successfully?
> +               boolean success = false;
> +               String kubernetesClusterId =
> getKubernetesClusterCtxt().getKubernetesClusterID();
> +
> +               try {
> +                       TopologyManager.acquireReadLock();
> +                       Properties props =
> TopologyManager.getTopology().getService(getServiceId()).getCluster(getClusterId()).getProperties();
> +                       int minReplicas =
> Integer.parseInt(props.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS));
> +
> +                       int nonTerminatedMembers =
> getKubernetesClusterCtxt().getActiveMembers().size() +
> getKubernetesClusterCtxt().getPendingMembers().size();
> +
> +                       if (nonTerminatedMembers == 0) {
> +
> +                               while (success) {
> +                                       try {
> +
> +                                               MemberContext
> memberContext =
> CloudControllerClient.getInstance().createContainer(kubernetesClusterId,
> getClusterId());
> +                                               if(null != memberContext) {
> +
>  getKubernetesClusterCtxt().addPendingMember(memberContext);
> +                                                       success = true;
> +
>  numberOfReplicasInServiceCluster = minReplicas;
> +
>  if(log.isDebugEnabled()){
> +
>  log.debug(String.format("Pending member added, [member] %s [kub cluster]
> %s",
> +
>      memberContext.getMemberId(),
> getKubernetesClusterCtxt().getKubernetesClusterID()));
> +                                                       }
> +                                               } else {
> +                                                       if
> (log.isDebugEnabled()) {
> +
>  log.debug("Returned member context is null, did not add to pending
> members");
> +                                                       }
> +                                               }
> +                                       } catch (Throwable e) {
> +                                               if (log.isDebugEnabled()) {
> +                                                       String message =
> "Cannot create a container, will retry in "+(retryInterval/1000)+"s";
> +                                                       log.debug(message,
> e);
> +                                               }
> +                                       }
> +
> +                       try {
> +                           Thread.sleep(retryInterval);
> +                       } catch (InterruptedException e1) {
> +                       }
> +                               }
> +                       }
> +               } finally {
> +                       TopologyManager.releaseReadLock();
> +               }
> +       }
> +
> +       @Override
> +       public void destroy() {
> +        getMinCheckKnowledgeSession().dispose();
> +        getScaleCheckKnowledgeSession().dispose();
> +        setDestroyed(true);
> +        if(log.isDebugEnabled()) {
> +            log.debug("DockerClusterMonitor Drools session has been
> disposed. "+this.toString());
> +        }
> +       }
> +
> +    @Override
> +    protected void readConfigurations () {
> +       // same as VM cluster monitor interval
> +        XMLConfiguration conf =
> ConfUtil.getInstance(null).getConfiguration();
> +        int monitorInterval =
> conf.getInt(AutoScalerConstants.AUTOSCALER_MONITOR_INTERVAL, 90000);
> +        setMonitorInterval(monitorInterval);
> +        if (log.isDebugEnabled()) {
> +            log.debug("Kubernetes Cluster Monitor task interval: " +
> getMonitorInterval());
> +        }
> +    }
> +
> +    @Override
> +    public String toString() {
> +        return "DockerClusterMonitor "
> +                       + "[ kubernetesHostClusterId=" +
> getKubernetesClusterCtxt().getKubernetesClusterID()
> +                       + ", clusterId=" + getClusterId()
> +                       + ", serviceId=" + getServiceId() + "]";
> +    }
> +
> +       public String getLbReferenceType() {
> +               return lbReferenceType;
> +       }
> +
> +       public void setLbReferenceType(String lbReferenceType) {
> +               this.lbReferenceType = lbReferenceType;
> +       }
> +}
> \ No newline at end of file
>
>


-- 
--
Lahiru Sandaruwan
Committer and PMC member, Apache Stratos,
Senior Software Engineer,
WSO2 Inc., http://wso2.com
lean.enterprise.middleware

email: lahirus@wso2.com cell: (+94) 773 325 954
blog: http://lahiruwrites.blogspot.com/
twitter: http://twitter.com/lahirus
linked-in: http://lk.linkedin.com/pub/lahiru-sandaruwan/16/153/146

Mime
View raw message