Return-Path: X-Original-To: apmail-cloudstack-commits-archive@www.apache.org Delivered-To: apmail-cloudstack-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5695A10B8A for ; Mon, 8 Dec 2014 10:08:58 +0000 (UTC) Received: (qmail 86545 invoked by uid 500); 8 Dec 2014 10:08:50 -0000 Delivered-To: apmail-cloudstack-commits-archive@cloudstack.apache.org Received: (qmail 86495 invoked by uid 500); 8 Dec 2014 10:08:50 -0000 Mailing-List: contact commits-help@cloudstack.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cloudstack.apache.org Delivered-To: mailing list commits@cloudstack.apache.org Received: (qmail 85830 invoked by uid 99); 8 Dec 2014 10:08:50 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Dec 2014 10:08:50 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 0B66E9C0AB5; Mon, 8 Dec 2014 10:08:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rajani@apache.org To: commits@cloudstack.apache.org Date: Mon, 08 Dec 2014 10:09:15 -0000 Message-Id: In-Reply-To: <688e4767e27c455d8a8ee5bf2c0c53f2@git.apache.org> References: <688e4767e27c455d8a8ee5bf2c0c53f2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [27/50] [abbrv] git commit: updated refs/heads/master to b61733d http://git-wip-us.apache.org/repos/asf/cloudstack/blob/98d75b02/test/integration/component/test_security_groups.py ---------------------------------------------------------------------- diff --git a/test/integration/component/test_security_groups.py b/test/integration/component/test_security_groups.py index eb242c7..a644a1c 100644 --- a/test/integration/component/test_security_groups.py +++ b/test/integration/component/test_security_groups.py @@ -5,9 +5,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -17,94 +17,27 @@ """ P1 for Security groups """ -#Import Local Modules +# Import Local Modules from nose.plugins.attrib import attr from marvin.cloudstackTestCase import cloudstackTestCase -#from marvin.cloudstackAPI import * from marvin.lib.utils import cleanup_resources from marvin.lib.base import (Account, ServiceOffering, VirtualMachine, SecurityGroup, Router, - Host, - Configurations) + Host) from marvin.lib.common import (get_domain, get_zone, get_template, get_process_status) from marvin.sshClient import SshClient -#Import System modules +# Import System modules import time import subprocess -class Services: - """Test Security groups Services - """ - - def __init__(self): - self.services = { - "disk_offering": { - "displaytext": "Small", - "name": "Small", - "disksize": 1 - }, - "account": { - "email": "test@test.com", - "firstname": "Test", - "lastname": "User", - "username": "test", - # Random characters are appended in create account to - # ensure unique username generated each time - "password": "password", - }, - "virtual_machine": { - # Create a small virtual machine instance with disk offering - "displayname": "Test VM", - "username": "root", # VM creds for SSH - "password": "password", - "ssh_port": 22, - "hypervisor": 'XenServer', - "privateport": 22, - "publicport": 22, - "protocol": 'TCP', - "userdata": 'This is sample data', - }, - "host": { - "publicport": 22, - "username": "root", # Host creds for SSH - "password": "password", - }, - "service_offering": { - "name": "Tiny Instance", - "displaytext": "Tiny Instance", - "cpunumber": 1, - "cpuspeed": 100, # in MHz - "memory": 128, # In MBs - }, - "security_group": { - "name": 'SSH', - "protocol": 'TCP', - "startport": 22, - "endport": 22, - "cidrlist": '0.0.0.0/0', - }, - "security_group_2": { - "name": 'ICMP', - "protocol": 'ICMP', - "startport": -1, - "endport": -1, - "cidrlist": '0.0.0.0/0', - }, - "ostype": 'CentOS 5.3 (64-bit)', - # CentOS 5.3 (64-bit) - "sleep": 60, - "timeout": 10, - } - - class TestDefaultSecurityGroup(cloudstackTestCase): def setUp(self): @@ -116,7 +49,7 @@ class TestDefaultSecurityGroup(cloudstackTestCase): def tearDown(self): try: - #Clean up, terminate the created templates + # Clean up, terminate the created templates cleanup_resources(self.apiclient, self.cleanup) except Exception as e: @@ -125,47 +58,51 @@ class TestDefaultSecurityGroup(cloudstackTestCase): @classmethod def setUpClass(cls): - cls.testClient = super(TestDefaultSecurityGroup, cls).getClsTestClient() + cls.testClient = super( + TestDefaultSecurityGroup, + cls).getClsTestClient() cls.api_client = cls.testClient.getApiClient() - cls.services = Services().services + # Fill testdata from the external config file + cls.testdata = cls.testClient.getParsedTestDataConfig() # Get Zone, Domain and templates cls.domain = get_domain(cls.api_client) cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests()) - cls.services['mode'] = cls.zone.networktype + cls.testdata['mode'] = cls.zone.networktype template = get_template( - cls.api_client, - cls.zone.id, - cls.services["ostype"] - ) - cls.services["domainid"] = cls.domain.id - cls.services["virtual_machine"]["zoneid"] = cls.zone.id - cls.services["virtual_machine"]["template"] = template.id + cls.api_client, + cls.zone.id, + cls.testdata["ostype"] + ) + cls.testdata["domainid"] = cls.domain.id + cls.testdata["virtual_machine"]["zoneid"] = cls.zone.id + cls.testdata["virtual_machine"]["template"] = template.id cls.service_offering = ServiceOffering.create( - cls.api_client, - cls.services["service_offering"] - ) + cls.api_client, + cls.testdata["service_offering"] + ) cls.account = Account.create( - cls.api_client, - cls.services["account"], - admin=True, - domainid=cls.domain.id - ) - cls.services["account"] = cls.account.name + cls.api_client, + cls.testdata["account"], + admin=True, + domainid=cls.domain.id + ) cls._cleanup = [ - cls.account, - cls.service_offering - ] + cls.account, + cls.service_offering + ] return @classmethod def tearDownClass(cls): try: - cls.api_client = super(TestDefaultSecurityGroup, cls).getClsTestClient().getApiClient() - #Cleanup resources used + cls.api_client = super( + TestDefaultSecurityGroup, + cls).getClsTestClient().getApiClient() + # Cleanup resources used cleanup_resources(cls.api_client, cls._cleanup) except Exception as e: @@ -173,209 +110,207 @@ class TestDefaultSecurityGroup(cloudstackTestCase): return - @attr(tags = ["sg", "eip", "advancedsg"]) + @attr(tags=["sg", "eip", "advancedsg"]) def test_01_deployVM_InDefaultSecurityGroup(self): """Test deploy VM in default security group """ - # Validate the following: # 1. deploy Virtual machine using admin user # 2. listVM should show a VM in Running state # 3. listRouters should show one router running self.virtual_machine = VirtualMachine.create( - self.apiclient, - self.services["virtual_machine"], - accountid=self.account.name, - domainid=self.account.domainid, - serviceofferingid=self.service_offering.id - ) + self.apiclient, + self.testdata["virtual_machine"], + accountid=self.account.name, + domainid=self.account.domainid, + serviceofferingid=self.service_offering.id + ) self.debug("Deployed VM with ID: %s" % self.virtual_machine.id) self.cleanup.append(self.virtual_machine) list_vm_response = VirtualMachine.list( - self.apiclient, - id=self.virtual_machine.id - ) + self.apiclient, + id=self.virtual_machine.id + ) self.debug( - "Verify listVirtualMachines response for virtual machine: %s" \ - % self.virtual_machine.id - ) + "Verify listVirtualMachines response for virtual machine: %s" + % self.virtual_machine.id + ) self.assertEqual( - isinstance(list_vm_response, list), - True, - "Check for list VM response" - ) + isinstance(list_vm_response, list), + True, + "Check for list VM response" + ) vm_response = list_vm_response[0] self.assertNotEqual( - len(list_vm_response), - 0, - "Check VM available in List Virtual Machines" - ) + len(list_vm_response), + 0, + "Check VM available in List Virtual Machines" + ) self.assertEqual( - vm_response.id, - self.virtual_machine.id, - "Check virtual machine id in listVirtualMachines" - ) + vm_response.id, + self.virtual_machine.id, + "Check virtual machine id in listVirtualMachines" + ) self.assertEqual( - vm_response.displayname, - self.virtual_machine.displayname, - "Check virtual machine displayname in listVirtualMachines" - ) + vm_response.displayname, + self.virtual_machine.displayname, + "Check virtual machine displayname in listVirtualMachines" + ) # Verify List Routers response for account self.debug( - "Verify list routers response for account: %s" \ - % self.account.name - ) + "Verify list routers response for account: %s" + % self.account.name + ) routers = Router.list( - self.apiclient, - zoneid=self.zone.id, - listall=True - ) + self.apiclient, + zoneid=self.zone.id, + listall=True + ) self.assertEqual( - isinstance(routers, list), - True, - "Check for list Routers response" - ) + isinstance(routers, list), + True, + "Check for list Routers response" + ) self.debug("Router Response: %s" % routers) self.assertEqual( - len(routers), - 1, - "Check virtual router is created for account or not" - ) + len(routers), + 1, + "Check virtual router is created for account or not" + ) return - @attr(tags = ["sg", "eip", "advancedsg"]) + @attr(tags=["sg", "eip", "advancedsg"]) def test_02_listSecurityGroups(self): """Test list security groups for admin account """ - # Validate the following: # 1. listSecurityGroups in admin account - # 2. There should be one security group (default) listed for the admin account + # 2. There should be one security group (default) listed + # for the admin account # 3. No Ingress Rules should be part of the default security group sercurity_groups = SecurityGroup.list( - self.apiclient, - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + account=self.account.name, + domainid=self.account.domainid + ) self.assertEqual( - isinstance(sercurity_groups, list), - True, - "Check for list security groups response" - ) + isinstance(sercurity_groups, list), + True, + "Check for list security groups response" + ) self.assertNotEqual( - len(sercurity_groups), - 0, - "Check List Security groups response" - ) + len(sercurity_groups), + 0, + "Check List Security groups response" + ) self.debug("List Security groups response: %s" % - str(sercurity_groups)) + str(sercurity_groups)) self.assertEqual( - hasattr(sercurity_groups, 'ingressrule'), - False, - "Check ingress rule attribute for default security group" - ) + hasattr(sercurity_groups, 'ingressrule'), + False, + "Check ingress rule attribute for default security group" + ) return - @attr(tags = ["sg", "eip", "advancedsg"]) + @attr(tags=["sg", "eip", "advancedsg"]) def test_03_accessInDefaultSecurityGroup(self): """Test access in default security group """ - # Validate the following: # 1. deploy Virtual machine using admin user # 2. listVM should show a VM in Running state # 3. listRouters should show one router running self.virtual_machine = VirtualMachine.create( - self.apiclient, - self.services["virtual_machine"], - accountid=self.account.name, - domainid=self.account.domainid, - serviceofferingid=self.service_offering.id - ) + self.apiclient, + self.testdata["virtual_machine"], + accountid=self.account.name, + domainid=self.account.domainid, + serviceofferingid=self.service_offering.id + ) self.debug("Deployed VM with ID: %s" % self.virtual_machine.id) self.cleanup.append(self.virtual_machine) list_vm_response = VirtualMachine.list( - self.apiclient, - id=self.virtual_machine.id - ) + self.apiclient, + id=self.virtual_machine.id + ) self.assertEqual( - isinstance(list_vm_response, list), - True, - "Check for list VM response" - ) + isinstance(list_vm_response, list), + True, + "Check for list VM response" + ) self.debug( - "Verify listVirtualMachines response for virtual machine: %s" \ - % self.virtual_machine.id - ) + "Verify listVirtualMachines response for virtual machine: %s" + % self.virtual_machine.id + ) vm_response = list_vm_response[0] self.assertNotEqual( - len(list_vm_response), - 0, - "Check VM available in List Virtual Machines" - ) + len(list_vm_response), + 0, + "Check VM available in List Virtual Machines" + ) self.assertEqual( - vm_response.id, - self.virtual_machine.id, - "Check virtual machine id in listVirtualMachines" - ) + vm_response.id, + self.virtual_machine.id, + "Check virtual machine id in listVirtualMachines" + ) self.assertEqual( - vm_response.displayname, - self.virtual_machine.displayname, - "Check virtual machine displayname in listVirtualMachines" - ) + vm_response.displayname, + self.virtual_machine.displayname, + "Check virtual machine displayname in listVirtualMachines" + ) # Default Security group should not have any ingress rule sercurity_groups = SecurityGroup.list( - self.apiclient, - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + account=self.account.name, + domainid=self.account.domainid + ) self.assertEqual( - isinstance(sercurity_groups, list), - True, - "Check for list security groups response" - ) + isinstance(sercurity_groups, list), + True, + "Check for list security groups response" + ) self.debug("List Security groups response: %s" % - str(sercurity_groups)) + str(sercurity_groups)) self.assertNotEqual( - len(sercurity_groups), - 0, - "Check List Security groups response" - ) + len(sercurity_groups), + 0, + "Check List Security groups response" + ) self.assertEqual( - hasattr(sercurity_groups, 'ingressrule'), - False, - "Check ingress rule attribute for default security group" - ) + hasattr(sercurity_groups, 'ingressrule'), + False, + "Check ingress rule attribute for default security group" + ) # SSH Attempt to VM should fail with self.assertRaises(Exception): self.debug("SSH into VM: %s" % self.virtual_machine.ssh_ip) SshClient( - self.virtual_machine.ssh_ip, - self.virtual_machine.ssh_port, - self.virtual_machine.username, - self.virtual_machine.password - ) + self.virtual_machine.ssh_ip, + self.virtual_machine.ssh_port, + self.virtual_machine.username, + self.virtual_machine.password + ) return @@ -390,7 +325,7 @@ class TestAuthorizeIngressRule(cloudstackTestCase): def tearDown(self): try: - #Clean up, terminate the created templates + # Clean up, terminate the created templates cleanup_resources(self.apiclient, self.cleanup) except Exception as e: @@ -399,45 +334,49 @@ class TestAuthorizeIngressRule(cloudstackTestCase): @classmethod def setUpClass(cls): - cls.testClient = super(TestAuthorizeIngressRule, cls).getClsTestClient() + cls.testClient = super( + TestAuthorizeIngressRule, + cls).getClsTestClient() cls.api_client = cls.testClient.getApiClient() - cls.services = Services().services + # Fill testdata from the external config file + cls.testdata = cls.testClient.getParsedTestDataConfig() # Get Zone, Domain and templates cls.domain = get_domain(cls.api_client) cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests()) - cls.services['mode'] = cls.zone.networktype + cls.testdata['mode'] = cls.zone.networktype template = get_template( - cls.api_client, - cls.zone.id, - cls.services["ostype"] - ) - cls.services["domainid"] = cls.domain.id - cls.services["virtual_machine"]["zoneid"] = cls.zone.id - cls.services["virtual_machine"]["template"] = template.id + cls.api_client, + cls.zone.id, + cls.testdata["ostype"] + ) + cls.testdata["domainid"] = cls.domain.id + cls.testdata["virtual_machine"]["zoneid"] = cls.zone.id + cls.testdata["virtual_machine"]["template"] = template.id cls.service_offering = ServiceOffering.create( - cls.api_client, - cls.services["service_offering"] - ) + cls.api_client, + cls.testdata["service_offering"] + ) cls.account = Account.create( - cls.api_client, - cls.services["account"], - domainid=cls.domain.id - ) - cls.services["account"] = cls.account.name + cls.api_client, + cls.testdata["account"], + domainid=cls.domain.id + ) cls._cleanup = [ - cls.account, - cls.service_offering - ] + cls.account, + cls.service_offering + ] return @classmethod def tearDownClass(cls): try: - cls.api_client = super(TestAuthorizeIngressRule, cls).getClsTestClient().getApiClient() - #Cleanup resources used + cls.api_client = super( + TestAuthorizeIngressRule, + cls).getClsTestClient().getApiClient() + # Cleanup resources used cleanup_resources(cls.api_client, cls._cleanup) except Exception as e: @@ -445,73 +384,73 @@ class TestAuthorizeIngressRule(cloudstackTestCase): return - @attr(tags = ["sg", "eip", "advancedsg"]) + @attr(tags=["sg", "eip", "advancedsg"]) def test_01_authorizeIngressRule(self): """Test authorize ingress rule """ - # Validate the following: - #1. Create Security group for the account. - #2. Createsecuritygroup (ssh-incoming) for this account - #3. authorizeSecurityGroupIngress to allow ssh access to the VM - #4. deployVirtualMachine into this security group (ssh-incoming) + # 1. Create Security group for the account. + # 2. Createsecuritygroup (ssh-incoming) for this account + # 3. authorizeSecurityGroupIngress to allow ssh access to the VM + # 4. deployVirtualMachine into this security group (ssh-incoming) security_group = SecurityGroup.create( - self.apiclient, - self.services["security_group"], - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + self.testdata["security_group"], + account=self.account.name, + domainid=self.account.domainid + ) self.debug("Created security group with ID: %s" % security_group.id) # Default Security group should not have any ingress rule sercurity_groups = SecurityGroup.list( - self.apiclient, - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + account=self.account.name, + domainid=self.account.domainid + ) self.assertEqual( - isinstance(sercurity_groups, list), - True, - "Check for list security groups response" - ) + isinstance(sercurity_groups, list), + True, + "Check for list security groups response" + ) self.assertEqual( - len(sercurity_groups), - 2, - "Check List Security groups response" - ) + len(sercurity_groups), + 2, + "Check List Security groups response" + ) # Authorize Security group to SSH to VM ingress_rule = security_group.authorize( - self.apiclient, - self.services["security_group"], - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + self.testdata["ingress_rule"], + account=self.account.name, + domainid=self.account.domainid + ) self.assertEqual( - isinstance(ingress_rule, dict), - True, - "Check ingress rule created properly" - ) + isinstance(ingress_rule, dict), + True, + "Check ingress rule created properly" + ) - self.debug("Authorizing ingress rule for sec group ID: %s for ssh access" - % security_group.id) + self.debug( + "Authorizing ingress rule for sec group ID: %s for ssh access" % + security_group.id) self.virtual_machine = VirtualMachine.create( - self.apiclient, - self.services["virtual_machine"], - accountid=self.account.name, - domainid=self.account.domainid, - serviceofferingid=self.service_offering.id, - securitygroupids=[security_group.id], - mode=self.services['mode'] - ) + self.apiclient, + self.testdata["virtual_machine"], + accountid=self.account.name, + domainid=self.account.domainid, + serviceofferingid=self.service_offering.id, + securitygroupids=[security_group.id], + mode=self.testdata['mode'] + ) self.debug("Deploying VM in account: %s" % self.account.name) # Should be able to SSH VM try: self.debug("SSH into VM: %s" % self.virtual_machine.id) self.virtual_machine.get_ssh_client() except Exception as e: - self.fail("SSH Access failed for %s: %s" % \ + self.fail("SSH Access failed for %s: %s" % (self.virtual_machine.ipaddress, e) ) return @@ -528,7 +467,7 @@ class TestRevokeIngressRule(cloudstackTestCase): def tearDown(self): try: - #Clean up, terminate the created templates + # Clean up, terminate the created templates cleanup_resources(self.apiclient, self.cleanup) except Exception as e: @@ -540,42 +479,44 @@ class TestRevokeIngressRule(cloudstackTestCase): cls.testClient = super(TestRevokeIngressRule, cls).getClsTestClient() cls.api_client = cls.testClient.getApiClient() - cls.services = Services().services + # Fill testdata from the external config file + cls.testdata = cls.testClient.getParsedTestDataConfig() # Get Zone, Domain and templates cls.domain = get_domain(cls.api_client) cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests()) - cls.services['mode'] = cls.zone.networktype + cls.testdata['mode'] = cls.zone.networktype template = get_template( - cls.api_client, - cls.zone.id, - cls.services["ostype"] - ) - cls.services["domainid"] = cls.domain.id - cls.services["virtual_machine"]["zoneid"] = cls.zone.id - cls.services["virtual_machine"]["template"] = template.id + cls.api_client, + cls.zone.id, + cls.testdata["ostype"] + ) + cls.testdata["domainid"] = cls.domain.id + cls.testdata["virtual_machine"]["zoneid"] = cls.zone.id + cls.testdata["virtual_machine"]["template"] = template.id cls.service_offering = ServiceOffering.create( - cls.api_client, - cls.services["service_offering"] - ) + cls.api_client, + cls.testdata["service_offering"] + ) cls.account = Account.create( - cls.api_client, - cls.services["account"], - domainid=cls.domain.id - ) - cls.services["account"] = cls.account.name + cls.api_client, + cls.testdata["account"], + domainid=cls.domain.id + ) cls._cleanup = [ - cls.account, - cls.service_offering - ] + cls.account, + cls.service_offering + ] return @classmethod def tearDownClass(cls): try: - cls.api_client = super(TestRevokeIngressRule, cls).getClsTestClient().getApiClient() - #Cleanup resources used + cls.api_client = super( + TestRevokeIngressRule, + cls).getClsTestClient().getApiClient() + # Cleanup resources used cleanup_resources(cls.api_client, cls._cleanup) except Exception as e: @@ -583,70 +524,70 @@ class TestRevokeIngressRule(cloudstackTestCase): return - @attr(tags = ["sg", "eip", "advancedsg"]) + @attr(tags=["sg", "eip", "advancedsg"]) def test_01_revokeIngressRule(self): """Test revoke ingress rule """ - # Validate the following: - #1. Create Security group for the account. - #2. Createsecuritygroup (ssh-incoming) for this account - #3. authorizeSecurityGroupIngress to allow ssh access to the VM - #4. deployVirtualMachine into this security group (ssh-incoming) - #5. Revoke the ingress rule, SSH access should fail + # 1. Create Security group for the account. + # 2. Createsecuritygroup (ssh-incoming) for this account + # 3. authorizeSecurityGroupIngress to allow ssh access to the VM + # 4. deployVirtualMachine into this security group (ssh-incoming) + # 5. Revoke the ingress rule, SSH access should fail security_group = SecurityGroup.create( - self.apiclient, - self.services["security_group"], - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + self.testdata["security_group"], + account=self.account.name, + domainid=self.account.domainid + ) self.debug("Created security group with ID: %s" % security_group.id) # Default Security group should not have any ingress rule sercurity_groups = SecurityGroup.list( - self.apiclient, - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + account=self.account.name, + domainid=self.account.domainid + ) self.assertEqual( - isinstance(sercurity_groups, list), - True, - "Check for list security groups response" - ) + isinstance(sercurity_groups, list), + True, + "Check for list security groups response" + ) self.assertEqual( - len(sercurity_groups), - 2, - "Check List Security groups response" - ) + len(sercurity_groups), + 2, + "Check List Security groups response" + ) # Authorize Security group to SSH to VM - self.debug("Authorizing ingress rule for sec group ID: %s for ssh access" - % security_group.id) + self.debug( + "Authorizing ingress rule for sec group ID: %s for ssh access" % + security_group.id) ingress_rule = security_group.authorize( - self.apiclient, - self.services["security_group"], - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + self.testdata["ingress_rule"], + account=self.account.name, + domainid=self.account.domainid + ) self.assertEqual( - isinstance(ingress_rule, dict), - True, - "Check ingress rule created properly" - ) + isinstance(ingress_rule, dict), + True, + "Check ingress rule created properly" + ) ssh_rule = (ingress_rule["ingressrule"][0]).__dict__ self.virtual_machine = VirtualMachine.create( - self.apiclient, - self.services["virtual_machine"], - accountid=self.account.name, - domainid=self.account.domainid, - serviceofferingid=self.service_offering.id, - securitygroupids=[security_group.id], - mode=self.services['mode'] - ) + self.apiclient, + self.testdata["virtual_machine"], + accountid=self.account.name, + domainid=self.account.domainid, + serviceofferingid=self.service_offering.id, + securitygroupids=[security_group.id], + mode=self.testdata['mode'] + ) self.debug("Deploying VM in account: %s" % self.account.name) # Should be able to SSH VM @@ -654,17 +595,17 @@ class TestRevokeIngressRule(cloudstackTestCase): self.debug("SSH into VM: %s" % self.virtual_machine.id) self.virtual_machine.get_ssh_client() except Exception as e: - self.fail("SSH Access failed for %s: %s" % \ + self.fail("SSH Access failed for %s: %s" % (self.virtual_machine.ipaddress, e) ) self.debug("Revoking ingress rule for sec group ID: %s for ssh access" - % security_group.id) + % security_group.id) # Revoke Security group to SSH to VM security_group.revoke( - self.apiclient, - id=ssh_rule["ruleid"] - ) + self.apiclient, + id=ssh_rule["ruleid"] + ) # SSH Attempt to VM should fail with self.assertRaises(Exception): @@ -673,7 +614,7 @@ class TestRevokeIngressRule(cloudstackTestCase): self.virtual_machine.ssh_port, self.virtual_machine.username, self.virtual_machine.password - ) + ) return @@ -688,7 +629,7 @@ class TestDhcpOnlyRouter(cloudstackTestCase): def tearDown(self): try: - #Clean up, terminate the created templates + # Clean up, terminate the created templates cleanup_resources(self.apiclient, self.cleanup) except Exception as e: @@ -700,51 +641,53 @@ class TestDhcpOnlyRouter(cloudstackTestCase): cls.testClient = super(TestDhcpOnlyRouter, cls).getClsTestClient() cls.api_client = cls.testClient.getApiClient() - cls.services = Services().services + # Fill testdata from the external config file + cls.testdata = cls.testClient.getParsedTestDataConfig() # Get Zone, Domain and templates cls.domain = get_domain(cls.api_client) cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests()) - cls.services['mode'] = cls.zone.networktype + cls.testdata['mode'] = cls.zone.networktype template = get_template( - cls.api_client, - cls.zone.id, - cls.services["ostype"] - ) + cls.api_client, + cls.zone.id, + cls.testdata["ostype"] + ) - cls.services["domainid"] = cls.domain.id - cls.services["virtual_machine"]["zoneid"] = cls.zone.id - cls.services["virtual_machine"]["template"] = template.id + cls.testdata["domainid"] = cls.domain.id + cls.testdata["virtual_machine"]["zoneid"] = cls.zone.id + cls.testdata["virtual_machine"]["template"] = template.id cls.service_offering = ServiceOffering.create( - cls.api_client, - cls.services["service_offering"] - ) + cls.api_client, + cls.testdata["service_offering"] + ) cls.account = Account.create( - cls.api_client, - cls.services["account"], - domainid=cls.domain.id - ) - cls.services["account"] = cls.account.name + cls.api_client, + cls.testdata["account"], + domainid=cls.domain.id + ) cls.virtual_machine = VirtualMachine.create( - cls.api_client, - cls.services["virtual_machine"], - accountid=cls.account.name, - domainid=cls.account.domainid, - serviceofferingid=cls.service_offering.id, - mode=cls.services['mode'] - ) + cls.api_client, + cls.testdata["virtual_machine"], + accountid=cls.account.name, + domainid=cls.account.domainid, + serviceofferingid=cls.service_offering.id, + mode=cls.testdata['mode'] + ) cls._cleanup = [ - cls.account, - cls.service_offering - ] + cls.account, + cls.service_offering + ] return @classmethod def tearDownClass(cls): try: - cls.api_client = super(TestDhcpOnlyRouter, cls).getClsTestClient().getApiClient() - #Cleanup resources used + cls.api_client = super( + TestDhcpOnlyRouter, + cls).getClsTestClient().getApiClient() + # Cleanup resources used cleanup_resources(cls.api_client, cls._cleanup) except Exception as e: @@ -752,67 +695,66 @@ class TestDhcpOnlyRouter(cloudstackTestCase): return - @attr(tags = ["sg", "eip", "basic"], required_hardware="true") + @attr(tags=["sg", "eip", "basic"], required_hardware="true") def test_01_dhcpOnlyRouter(self): """Test router services for user account """ - # Validate the following - #1. List routers for any user account - #2. The only service supported by this router should be dhcp + # 1. List routers for any user account + # 2. The only service supported by this router should be dhcp # Find router associated with user account list_router_response = Router.list( - self.apiclient, - zoneid=self.zone.id, - listall=True - ) + self.apiclient, + zoneid=self.zone.id, + listall=True + ) self.assertEqual( - isinstance(list_router_response, list), - True, - "Check list response returns a valid list" - ) + isinstance(list_router_response, list), + True, + "Check list response returns a valid list" + ) router = list_router_response[0] hosts = Host.list( - self.apiclient, - zoneid=router.zoneid, - type='Routing', - state='Up', - id=router.hostid - ) + self.apiclient, + zoneid=router.zoneid, + type='Routing', + state='Up', + id=router.hostid + ) self.assertEqual( - isinstance(hosts, list), - True, - "Check list host returns a valid list" - ) + isinstance(hosts, list), + True, + "Check list host returns a valid list" + ) host = hosts[0] self.debug("Router ID: %s, state: %s" % (router.id, router.state)) self.assertEqual( - router.state, - 'Running', - "Check list router response for router state" - ) + router.state, + 'Running', + "Check list router response for router state" + ) result = get_process_status( - host.ipaddress, - self.services['host']["publicport"], - self.services['host']["username"], - self.services['host']["password"], - router.linklocalip, - "service dnsmasq status" - ) + host.ipaddress, + self.testdata['configurableData']['host']["publicport"], + self.testdata['configurableData']['host']["username"], + self.testdata['configurableData']['host']["password"], + router.linklocalip, + "service dnsmasq status" + ) res = str(result) self.debug("Dnsmasq process status: %s" % res) self.assertEqual( - res.count("running"), - 1, - "Check dnsmasq service is running or not" - ) + res.count("running"), + 1, + "Check dnsmasq service is running or not" + ) return @@ -827,7 +769,7 @@ class TestdeployVMWithUserData(cloudstackTestCase): def tearDown(self): try: - #Clean up, terminate the created templates + # Clean up, terminate the created templates cleanup_resources(self.apiclient, self.cleanup) except Exception as e: @@ -836,46 +778,50 @@ class TestdeployVMWithUserData(cloudstackTestCase): @classmethod def setUpClass(cls): - cls.testClient = super(TestdeployVMWithUserData, cls).getClsTestClient() + cls.testClient = super( + TestdeployVMWithUserData, + cls).getClsTestClient() cls.api_client = cls.testClient.getApiClient() - cls.services = Services().services + # Fill testdata from the external config file + cls.testdata = cls.testClient.getParsedTestDataConfig() # Get Zone, Domain and templates cls.domain = get_domain(cls.api_client) cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests()) - cls.services['mode'] = cls.zone.networktype + cls.testdata['mode'] = cls.zone.networktype template = get_template( - cls.api_client, - cls.zone.id, - cls.services["ostype"] - ) + cls.api_client, + cls.zone.id, + cls.testdata["ostype"] + ) - cls.services["domainid"] = cls.domain.id - cls.services["virtual_machine"]["zoneid"] = cls.zone.id - cls.services["virtual_machine"]["template"] = template.id + cls.testdata["domainid"] = cls.domain.id + cls.testdata["virtual_machine"]["zoneid"] = cls.zone.id + cls.testdata["virtual_machine"]["template"] = template.id cls.service_offering = ServiceOffering.create( - cls.api_client, - cls.services["service_offering"] - ) + cls.api_client, + cls.testdata["service_offering"] + ) cls.account = Account.create( - cls.api_client, - cls.services["account"], - domainid=cls.domain.id - ) - cls.services["account"] = cls.account.name + cls.api_client, + cls.testdata["account"], + domainid=cls.domain.id + ) cls._cleanup = [ - cls.account, - cls.service_offering - ] + cls.account, + cls.service_offering + ] return @classmethod def tearDownClass(cls): try: - cls.api_client = super(TestdeployVMWithUserData, cls).getClsTestClient().getApiClient() - #Cleanup resources used + cls.api_client = super( + TestdeployVMWithUserData, + cls).getClsTestClient().getApiClient() + # Cleanup resources used cleanup_resources(cls.api_client, cls._cleanup) except Exception as e: @@ -883,112 +829,112 @@ class TestdeployVMWithUserData(cloudstackTestCase): return - @attr(tags = ["sg", "eip", "advancedsg"]) + @attr(tags=["sg", "eip", "advancedsg"]) def test_01_deployVMWithUserData(self): """Test Deploy VM with User data""" - # Validate the following # 1. CreateAccount of type user # 2. CreateSecurityGroup ssh-incoming # 3. authorizeIngressRule to allow ssh-access - # 4. deployVirtualMachine into this group with some base64 encoded user-data - # 5. wget http://10.1.1.1/latest/user-data to get the latest userdata from the - # router for this VM + # 4. deployVirtualMachine into this group with some + # base64 encoded user-data + # 5. wget http://10.1.1.1/latest/user-data to get the + # latest userdata from the router for this VM # Find router associated with user account list_router_response = Router.list( - self.apiclient, - zoneid=self.zone.id, - listall=True - ) + self.apiclient, + zoneid=self.zone.id, + listall=True + ) self.assertEqual( - isinstance(list_router_response, list), - True, - "Check list response returns a valid list" - ) + isinstance(list_router_response, list), + True, + "Check list response returns a valid list" + ) router = list_router_response[0] security_group = SecurityGroup.create( - self.apiclient, - self.services["security_group"], - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + self.testdata["security_group"], + account=self.account.name, + domainid=self.account.domainid + ) self.debug("Created security group with ID: %s" % security_group.id) # Default Security group should not have any ingress rule sercurity_groups = SecurityGroup.list( - self.apiclient, - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + account=self.account.name, + domainid=self.account.domainid + ) self.assertEqual( - isinstance(sercurity_groups, list), - True, - "Check for list security groups response" - ) + isinstance(sercurity_groups, list), + True, + "Check for list security groups response" + ) self.assertEqual( - len(sercurity_groups), - 2, - "Check List Security groups response" - ) + len(sercurity_groups), + 2, + "Check List Security groups response" + ) self.debug( - "Authorize Ingress Rule for Security Group %s for account: %s" \ - % ( - security_group.id, - self.account.name - )) + "Authorize Ingress Rule for Security Group %s for account: %s" + % ( + security_group.id, + self.account.name + )) # Authorize Security group to SSH to VM ingress_rule = security_group.authorize( - self.apiclient, - self.services["security_group"], - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + self.testdata["ingress_rule"], + account=self.account.name, + domainid=self.account.domainid + ) self.assertEqual( - isinstance(ingress_rule, dict), - True, - "Check ingress rule created properly" - ) + isinstance(ingress_rule, dict), + True, + "Check ingress rule created properly" + ) self.virtual_machine = VirtualMachine.create( - self.apiclient, - self.services["virtual_machine"], - accountid=self.account.name, - domainid=self.account.domainid, - serviceofferingid=self.service_offering.id, - securitygroupids=[security_group.id], - mode=self.services['mode'] - ) + self.apiclient, + self.testdata["virtual_machine"], + accountid=self.account.name, + domainid=self.account.domainid, + serviceofferingid=self.service_offering.id, + securitygroupids=[security_group.id], + mode=self.testdata['mode'] + ) self.debug("Deploying VM in account: %s" % self.account.name) # Should be able to SSH VM try: self.debug( - "SSH to VM with IP Address: %s"\ - % self.virtual_machine.ssh_ip - ) + "SSH to VM with IP Address: %s" + % self.virtual_machine.ssh_ip + ) ssh = self.virtual_machine.get_ssh_client() except Exception as e: - self.fail("SSH Access failed for %s: %s" % \ + self.fail("SSH Access failed for %s: %s" % (self.virtual_machine.ipaddress, e) ) cmds = [ - "wget http://%s/latest/user-data" % router.guestipaddress, - "cat user-data", - ] + "wget http://%s/latest/user-data" % router.guestipaddress, + "cat user-data", + ] for c in cmds: result = ssh.execute(c) self.debug("%s: %s" % (c, result)) res = str(result) self.assertEqual( - res.count(self.services["virtual_machine"]["userdata"]), - 1, - "Verify user data" - ) + res.count(self.testdata["virtual_machine"]["userdata"]), + 1, + "Verify user data" + ) return @@ -999,42 +945,39 @@ class TestDeleteSecurityGroup(cloudstackTestCase): self.apiclient = self.testClient.getApiClient() self.dbclient = self.testClient.getDbConnection() - self.services = Services().services - # Get Zone, Domain and templates self.domain = get_domain(self.apiclient) self.zone = get_zone(self.apiclient, self.testClient.getZoneForTests()) - self.services['mode'] = self.zone.networktype + self.testdata['mode'] = self.zone.networktype template = get_template( - self.apiclient, - self.zone.id, - self.services["ostype"] - ) + self.apiclient, + self.zone.id, + self.testdata["ostype"] + ) - self.services["domainid"] = self.domain.id - self.services["virtual_machine"]["zoneid"] = self.zone.id - self.services["virtual_machine"]["template"] = template.id + self.testdata["domainid"] = self.domain.id + self.testdata["virtual_machine"]["zoneid"] = self.zone.id + self.testdata["virtual_machine"]["template"] = template.id self.service_offering = ServiceOffering.create( - self.apiclient, - self.services["service_offering"] - ) + self.apiclient, + self.testdata["service_offering"] + ) self.account = Account.create( - self.apiclient, - self.services["account"], - domainid=self.domain.id - ) - self.services["account"] = self.account.name + self.apiclient, + self.testdata["account"], + domainid=self.domain.id + ) self.cleanup = [ - self.account, - self.service_offering - ] + self.account, + self.service_offering + ] return def tearDown(self): try: - #Clean up, terminate the created templates + # Clean up, terminate the created templates cleanup_resources(self.apiclient, self.cleanup) except Exception as e: @@ -1043,16 +986,20 @@ class TestDeleteSecurityGroup(cloudstackTestCase): @classmethod def setUpClass(cls): - cls.services = Services().services - cls.api_client = super(TestDeleteSecurityGroup, cls).getClsTestClient().getApiClient() + cls.testClient = super(TestDeleteSecurityGroup, cls).getClsTestClient() + # Fill testdata from the external config file + cls.testdata = cls.testClient.getParsedTestDataConfig() + cls.api_client = cls.testClient.getApiClient() cls._cleanup = [] return @classmethod def tearDownClass(cls): try: - cls.api_client = super(TestDeleteSecurityGroup, cls).getClsTestClient().getApiClient() - #Cleanup resources used + cls.api_client = super( + TestDeleteSecurityGroup, + cls).getClsTestClient().getApiClient() + # Cleanup resources used cleanup_resources(cls.api_client, cls._cleanup) except Exception as e: @@ -1060,11 +1007,10 @@ class TestDeleteSecurityGroup(cloudstackTestCase): return - @attr(tags = ["sg", "eip", "advancedsg"]) + @attr(tags=["sg", "eip", "advancedsg"]) def test_01_delete_security_grp_running_vm(self): """Test delete security group with running VM""" - # Validate the following # 1. createsecuritygroup (ssh-incoming) for this account # 2. authorizeSecurityGroupIngress to allow ssh access to the VM @@ -1073,83 +1019,82 @@ class TestDeleteSecurityGroup(cloudstackTestCase): # complaining there are running VMs in this group security_group = SecurityGroup.create( - self.apiclient, - self.services["security_group"], - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + self.testdata["security_group"], + account=self.account.name, + domainid=self.account.domainid + ) self.debug("Created security group with ID: %s" % security_group.id) # Default Security group should not have any ingress rule sercurity_groups = SecurityGroup.list( - self.apiclient, - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + account=self.account.name, + domainid=self.account.domainid + ) self.assertEqual( - isinstance(sercurity_groups, list), - True, - "Check for list security groups response" - ) + isinstance(sercurity_groups, list), + True, + "Check for list security groups response" + ) self.assertEqual( - len(sercurity_groups), - 2, - "Check List Security groups response" - ) + len(sercurity_groups), + 2, + "Check List Security groups response" + ) self.debug( - "Authorize Ingress Rule for Security Group %s for account: %s" \ - % ( - security_group.id, - self.account.name - )) + "Authorize Ingress Rule for Security Group %s for account: %s" + % ( + security_group.id, + self.account.name + )) # Authorize Security group to SSH to VM ingress_rule = security_group.authorize( - self.apiclient, - self.services["security_group"], - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + self.testdata["ingress_rule"], + account=self.account.name, + domainid=self.account.domainid + ) self.assertEqual( - isinstance(ingress_rule, dict), - True, - "Check ingress rule created properly" - ) + isinstance(ingress_rule, dict), + True, + "Check ingress rule created properly" + ) self.virtual_machine = VirtualMachine.create( - self.apiclient, - self.services["virtual_machine"], - accountid=self.account.name, - domainid=self.account.domainid, - serviceofferingid=self.service_offering.id, - securitygroupids=[security_group.id] - ) + self.apiclient, + self.testdata["virtual_machine"], + accountid=self.account.name, + domainid=self.account.domainid, + serviceofferingid=self.service_offering.id, + securitygroupids=[security_group.id] + ) self.debug("Deploying VM in account: %s" % self.account.name) # Deleting Security group should raise exception with self.assertRaises(Exception): security_group.delete(self.apiclient) - #sleep to ensure that Security group is deleted properly - time.sleep(self.services["sleep"]) + # sleep to ensure that Security group is deleted properly + time.sleep(self.testdata["sleep"]) # Default Security group should not have any ingress rule sercurity_groups = SecurityGroup.list( - self.apiclient, - id=security_group.id - ) + self.apiclient, + id=security_group.id + ) self.assertNotEqual( - sercurity_groups, - None, - "Check List Security groups response" - ) + sercurity_groups, + None, + "Check List Security groups response" + ) return - @attr(tags = ["sg", "eip", "advancedsg"]) + @attr(tags=["sg", "eip", "advancedsg"]) def test_02_delete_security_grp_withoout_running_vm(self): """Test delete security group without running VM""" - # Validate the following # 1. createsecuritygroup (ssh-incoming) for this account # 2. authorizeSecurityGroupIngress to allow ssh access to the VM @@ -1158,56 +1103,56 @@ class TestDeleteSecurityGroup(cloudstackTestCase): # complaining there are running VMs in this group security_group = SecurityGroup.create( - self.apiclient, - self.services["security_group"], - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + self.testdata["security_group"], + account=self.account.name, + domainid=self.account.domainid + ) self.debug("Created security group with ID: %s" % security_group.id) # Default Security group should not have any ingress rule sercurity_groups = SecurityGroup.list( - self.apiclient, - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + account=self.account.name, + domainid=self.account.domainid + ) self.assertEqual( - isinstance(sercurity_groups, list), - True, - "Check for list security groups response" - ) + isinstance(sercurity_groups, list), + True, + "Check for list security groups response" + ) self.assertEqual( - len(sercurity_groups), - 2, - "Check List Security groups response" - ) + len(sercurity_groups), + 2, + "Check List Security groups response" + ) self.debug( - "Authorize Ingress Rule for Security Group %s for account: %s" \ - % ( - security_group.id, - self.account.name - )) + "Authorize Ingress Rule for Security Group %s for account: %s" + % ( + security_group.id, + self.account.name + )) # Authorize Security group to SSH to VM ingress_rule = security_group.authorize( - self.apiclient, - self.services["security_group"], - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + self.testdata["ingress_rule"], + account=self.account.name, + domainid=self.account.domainid + ) self.assertEqual( - isinstance(ingress_rule, dict), - True, - "Check ingress rule created properly" - ) + isinstance(ingress_rule, dict), + True, + "Check ingress rule created properly" + ) self.virtual_machine = VirtualMachine.create( - self.apiclient, - self.services["virtual_machine"], - accountid=self.account.name, - domainid=self.account.domainid, - serviceofferingid=self.service_offering.id, - securitygroupids=[security_group.id] - ) + self.apiclient, + self.testdata["virtual_machine"], + accountid=self.account.name, + domainid=self.account.domainid, + serviceofferingid=self.service_offering.id, + securitygroupids=[security_group.id] + ) self.debug("Deploying VM in account: %s" % self.account.name) # Destroy the VM @@ -1217,7 +1162,7 @@ class TestDeleteSecurityGroup(cloudstackTestCase): self.debug("Deleting Security Group: %s" % security_group.id) security_group.delete(self.apiclient) except Exception as e: - self.fail("Failed to delete security group - ID: %s: %s" \ + self.fail("Failed to delete security group - ID: %s: %s" % (security_group.id, e) ) return @@ -1231,42 +1176,39 @@ class TestIngressRule(cloudstackTestCase): self.dbclient = self.testClient.getDbConnection() self.cleanup = [] - self.services = Services().services - # Get Zone, Domain and templates self.domain = get_domain(self.apiclient) self.zone = get_zone(self.apiclient, self.testClient.getZoneForTests()) - self.services['mode'] = self.zone.networktype + self.testdata['mode'] = self.zone.networktype template = get_template( - self.apiclient, - self.zone.id, - self.services["ostype"] - ) + self.apiclient, + self.zone.id, + self.testdata["ostype"] + ) - self.services["domainid"] = self.domain.id - self.services["virtual_machine"]["zoneid"] = self.zone.id - self.services["virtual_machine"]["template"] = template.id + self.testdata["domainid"] = self.domain.id + self.testdata["virtual_machine"]["zoneid"] = self.zone.id + self.testdata["virtual_machine"]["template"] = template.id self.service_offering = ServiceOffering.create( - self.apiclient, - self.services["service_offering"] - ) + self.apiclient, + self.testdata["service_offering"] + ) self.account = Account.create( - self.apiclient, - self.services["account"], - domainid=self.domain.id - ) - self.services["account"] = self.account.name + self.apiclient, + self.testdata["account"], + domainid=self.domain.id + ) self.cleanup = [ - self.account, - self.service_offering - ] + self.account, + self.service_offering + ] return def tearDown(self): try: - #Clean up, terminate the created templates + # Clean up, terminate the created templates cleanup_resources(self.apiclient, self.cleanup) except Exception as e: @@ -1275,16 +1217,20 @@ class TestIngressRule(cloudstackTestCase): @classmethod def setUpClass(cls): - cls.services = Services().services - cls.api_client = super(TestIngressRule, cls).getClsTestClient().getApiClient() + cls.testClient = super(TestIngressRule, cls).getClsTestClient() + # Fill testdata from the external config file + cls.testdata = cls.testClient.getParsedTestDataConfig() + cls.api_client = cls.testClient.getApiClient() cls._cleanup = [] return @classmethod def tearDownClass(cls): try: - cls.api_client = super(TestIngressRule, cls).getClsTestClient().getApiClient() - #Cleanup resources used + cls.api_client = super( + TestIngressRule, + cls).getClsTestClient().getApiClient() + # Cleanup resources used cleanup_resources(cls.api_client, cls._cleanup) except Exception as e: @@ -1292,124 +1238,123 @@ class TestIngressRule(cloudstackTestCase): return - @attr(tags = ["sg", "eip", "advancedsg"]) + @attr(tags=["sg", "eip", "advancedsg"]) def test_01_authorizeIngressRule_AfterDeployVM(self): """Test delete security group with running VM""" - # Validate the following # 1. createsecuritygroup (ssh-incoming, 22via22) for this account # 2. authorizeSecurityGroupIngress to allow ssh access to the VM # 3. deployVirtualMachine into this security group (ssh-incoming) - # 4. authorizeSecurityGroupIngress to allow ssh access (startport:222 to - # endport:22) to the VM + # 4. authorizeSecurityGroupIngress to allow ssh access (startport:222 + # to endport:22) to the VM security_group = SecurityGroup.create( - self.apiclient, - self.services["security_group"], - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + self.testdata["security_group"], + account=self.account.name, + domainid=self.account.domainid + ) self.debug("Created security group with ID: %s" % security_group.id) # Default Security group should not have any ingress rule sercurity_groups = SecurityGroup.list( - self.apiclient, - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + account=self.account.name, + domainid=self.account.domainid + ) self.assertEqual( - isinstance(sercurity_groups, list), - True, - "Check for list security groups response" - ) + isinstance(sercurity_groups, list), + True, + "Check for list security groups response" + ) self.assertEqual( - len(sercurity_groups), - 2, - "Check List Security groups response" - ) + len(sercurity_groups), + 2, + "Check List Security groups response" + ) self.debug( - "Authorize Ingress Rule for Security Group %s for account: %s" \ - % ( - security_group.id, - self.account.name - )) + "Authorize Ingress Rule for Security Group %s for account: %s" + % ( + security_group.id, + self.account.name + )) # Authorize Security group to SSH to VM ingress_rule_1 = security_group.authorize( - self.apiclient, - self.services["security_group"], - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + self.testdata["ingress_rule"], + account=self.account.name, + domainid=self.account.domainid + ) self.assertEqual( - isinstance(ingress_rule_1, dict), - True, - "Check ingress rule created properly" - ) + isinstance(ingress_rule_1, dict), + True, + "Check ingress rule created properly" + ) self.virtual_machine = VirtualMachine.create( - self.apiclient, - self.services["virtual_machine"], - accountid=self.account.name, - domainid=self.account.domainid, - serviceofferingid=self.service_offering.id, - securitygroupids=[security_group.id], - mode=self.services['mode'] - ) + self.apiclient, + self.testdata["virtual_machine"], + accountid=self.account.name, + domainid=self.account.domainid, + serviceofferingid=self.service_offering.id, + securitygroupids=[security_group.id], + mode=self.testdata['mode'] + ) self.debug("Deploying VM in account: %s" % self.account.name) self.debug( - "Authorize Ingress Rule for Security Group %s for account: %s" \ - % ( - security_group.id, - self.account.name - )) + "Authorize Ingress Rule for Security Group %s for account: %s" + % ( + security_group.id, + self.account.name + )) # Authorize Security group to SSH to VM ingress_rule_2 = security_group.authorize( - self.apiclient, - self.services["security_group_2"], - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + self.testdata["ingress_rule_ICMP"], + account=self.account.name, + domainid=self.account.domainid + ) self.assertEqual( - isinstance(ingress_rule_2, dict), - True, - "Check ingress rule created properly" - ) + isinstance(ingress_rule_2, dict), + True, + "Check ingress rule created properly" + ) # SSH should be allowed on 22 & 2222 ports try: self.debug("Trying to SSH into VM %s on port %s" % ( - self.virtual_machine.ssh_ip, - self.services["security_group"]["endport"] - )) + self.virtual_machine.ssh_ip, + self.testdata["ingress_rule"]["endport"] + )) self.virtual_machine.get_ssh_client() except Exception as e: - self.fail("SSH access failed for ingress rule ID: %s, %s" \ + self.fail("SSH access failed for ingress rule ID: %s, %s" % (ingress_rule_1["id"], e)) # User should be able to ping VM try: self.debug("Trying to ping VM %s" % self.virtual_machine.ssh_ip) - result = subprocess.call(['ping', '-c 1', self.virtual_machine.ssh_ip]) + result = subprocess.call( + ['ping', '-c 1', self.virtual_machine.ssh_ip]) self.debug("Ping result: %s" % result) # if ping successful, then result should be 0 self.assertEqual( - result, - 0, - "Check if ping is successful or not" - ) + result, + 0, + "Check if ping is successful or not" + ) except Exception as e: - self.fail("Ping failed for ingress rule ID: %s, %s" \ + self.fail("Ping failed for ingress rule ID: %s, %s" % (ingress_rule_2["id"], e)) return - @attr(tags = ["sg", "eip", "advancedsg"]) + @attr(tags=["sg", "eip", "advancedsg"]) def test_02_revokeIngressRule_AfterDeployVM(self): """Test Revoke ingress rule after deploy VM""" - # Validate the following # 1. createsecuritygroup (ssh-incoming, 22via22) for this account # 2. authorizeSecurityGroupIngress to allow ssh access to the VM @@ -1422,79 +1367,79 @@ class TestIngressRule(cloudstackTestCase): # but allowed through port 22 security_group = SecurityGroup.create( - self.apiclient, - self.services["security_group"], - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + self.testdata["security_group"], + account=self.account.name, + domainid=self.account.domainid + ) self.debug("Created security group with ID: %s" % security_group.id) # Default Security group should not have any ingress rule sercurity_groups = SecurityGroup.list( - self.apiclient, - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + account=self.account.name, + domainid=self.account.domainid + ) self.assertEqual( - isinstance(sercurity_groups, list), - True, - "Check for list security groups response" - ) + isinstance(sercurity_groups, list), + True, + "Check for list security groups response" + ) self.assertEqual( - len(sercurity_groups), - 2, - "Check List Security groups response" - ) + len(sercurity_groups), + 2, + "Check List Security groups response" + ) self.debug( - "Authorize Ingress Rule for Security Group %s for account: %s" \ - % ( - security_group.id, - self.account.name - )) + "Authorize Ingress Rule for Security Group %s for account: %s" + % ( + security_group.id, + self.account.name + )) # Authorize Security group to SSH to VM ingress_rule = security_group.authorize( - self.apiclient, - self.services["security_group"], - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + self.testdata["ingress_rule"], + account=self.account.name, + domainid=self.account.domainid + ) self.assertEqual( - isinstance(ingress_rule, dict), - True, - "Check ingress rule created properly" - ) + isinstance(ingress_rule, dict), + True, + "Check ingress rule created properly" + ) self.virtual_machine = VirtualMachine.create( - self.apiclient, - self.services["virtual_machine"], - accountid=self.account.name, - domainid=self.account.domainid, - serviceofferingid=self.service_offering.id, - securitygroupids=[security_group.id], - mode=self.services['mode'] - ) + self.apiclient, + self.testdata["virtual_machine"], + accountid=self.account.name, + domainid=self.account.domainid, + serviceofferingid=self.service_offering.id, + securitygroupids=[security_group.id], + mode=self.testdata['mode'] + ) self.debug("Deploying VM in account: %s" % self.account.name) self.debug( - "Authorize Ingress Rule for Security Group %s for account: %s" \ - % ( - security_group.id, - self.account.name - )) + "Authorize Ingress Rule for Security Group %s for account: %s" + % ( + security_group.id, + self.account.name + )) # Authorize Security group to SSH to VM ingress_rule_2 = security_group.authorize( - self.apiclient, - self.services["security_group_2"], - account=self.account.name, - domainid=self.account.domainid - ) + self.apiclient, + self.testdata["ingress_rule_ICMP"], + account=self.account.name, + domainid=self.account.domainid + ) self.assertEqual( - isinstance(ingress_rule_2, dict), - True, - "Check ingress rule created properly" - ) + isinstance(ingress_rule_2, dict), + True, + "Check ingress rule created properly" + ) ssh_rule = (ingress_rule["ingressrule"][0]).__dict__ icmp_rule = (ingress_rule_2["ingressrule"][0]).__dict__ @@ -1502,69 +1447,70 @@ class TestIngressRule(cloudstackTestCase): # SSH should be allowed on 22 try: self.debug("Trying to SSH into VM %s on port %s" % ( - self.virtual_machine.ssh_ip, - self.services["security_group"]["endport"] - )) + self.virtual_machine.ssh_ip, + self.testdata["ingress_rule"]["endport"] + )) self.virtual_machine.get_ssh_client() except Exception as e: - self.fail("SSH access failed for ingress rule ID: %s, %s" \ + self.fail("SSH access failed for ingress rule ID: %s, %s" % (ssh_rule["ruleid"], e)) - # User should be able to ping VM + # User should be able to ping VM try: self.debug("Trying to ping VM %s" % self.virtual_machine.ssh_ip) - result = subprocess.call(['ping', '-c 1', self.virtual_machine.ssh_ip]) + result = subprocess.call( + ['ping', '-c 1', self.virtual_machine.ssh_ip]) self.debug("Ping result: %s" % result) # if ping successful, then result should be 0 self.assertEqual( - result, - 0, - "Check if ping is successful or not" - ) + result, + 0, + "Check if ping is successful or not" + ) except Exception as e: - self.fail("Ping failed for ingress rule ID: %s, %s" \ + self.fail("Ping failed for ingress rule ID: %s, %s" % (icmp_rule["ruleid"], e)) self.debug( - "Revoke Ingress Rule for Security Group %s for account: %s" \ -