incubator-tashi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From strou...@apache.org
Subject svn commit: r1226188 - /incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/vmcontrol/qemu.py
Date Sat, 31 Dec 2011 21:09:12 GMT
Author: stroucki
Date: Sat Dec 31 21:09:11 2011
New Revision: 1226188

URL: http://svn.apache.org/viewvc?rev=1226188&view=rev
Log:
qemu.py: finish revising qemu.py

Modified:
    incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/vmcontrol/qemu.py

Modified: incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/vmcontrol/qemu.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/vmcontrol/qemu.py?rev=1226188&r1=1226187&r2=1226188&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/vmcontrol/qemu.py (original)
+++ incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/vmcontrol/qemu.py Sat
Dec 31 21:09:11 2011
@@ -98,6 +98,8 @@ class Qemu(VmControlInterface):
 		self.statsInterval = float(self.config.get("Qemu", "statsInterval"))
 		# XXXstroucki amount of reserved memory could be configurable
 		self.reservedMem = 512
+		# XXXstroucki perhaps make this configurable
+		self.ifPrefix = "tashi"
 		self.controlledVMs = {}
 		self.usedPorts = []
 		self.usedPortsLock = threading.Lock()
@@ -492,7 +494,7 @@ class Qemu(VmControlInterface):
 		nicString = ""
 		for i in range(0, len(instance.nics)):
 			nic = instance.nics[i]
-			nicString = nicString + "-net nic,macaddr=%s,model=%s,vlan=%d -net tap,ifname=tashi%d.%d,vlan=%d,script=/etc/qemu-ifup.%d
" % (nic.mac, nicModel, nic.network, instance.id, i, nic.network, nic.network)
+			nicString = nicString + "-net nic,macaddr=%s,model=%s,vlan=%d -net tap,ifname=%s%d.%d,vlan=%d,script=/etc/qemu-ifup.%d
" % (nic.mac, nicModel, nic.network, self.ifPrefix, instance.id, i, nic.network, nic.network)
 
 		#  ACPI
 		if (boolean(instance.hints.get("noAcpi", False))):
@@ -574,32 +576,52 @@ class Qemu(VmControlInterface):
 		if (target):
 			retry = self.migrationRetries
 			while (retry > 0):
+				# migrate in foreground respecting cow backed
+				# images
+				# XXXstroucki if we're doing this in the fg
+				# then it may still be ongoing when the timeout
+				# happens, and no way of interrupting it
+				# trying to restart the migration by running
+				# the command again (when qemu is ready to
+				# listen again) is probably not helpful
 				res = self.__enterCommand(child, "migrate -i %s" % (target), timeout=self.migrateTimeout)
 				retry = retry - 1
 				if (res.find("migration failed") == -1):
-					retry = -1
+					retry = 0
+					break
 				else:
 					log.error("Migration (transiently) failed: %s\n", res)
 			if (retry == 0):
 				log.error("Migration failed: %s\n", res)
 				child.errorBit = True
 				raise RuntimeError
+		# XXXstroucki what if migration is still ongoing, and
+		# qemu is not listening?
 		self.__enterCommand(child, "quit", expectPrompt=False)
 		return vmId
 
 	# extern	
 	def instantiateVm(self, instance):
-		(vmId, cmd) = self.__startVm(instance, None)
-		child = self.__getChildFromPid(vmId)
-		self.__getPtyInfo(child, False)
-		child.cmd = cmd
-		self.__saveChildInfo(child)
-		return vmId
+		try:
+			(vmId, cmd) = self.__startVm(instance, None)
+			child = self.__getChildFromPid(vmId)
+			self.__getPtyInfo(child, False)
+			child.cmd = cmd
+			# XXXstroucki Should make sure Running state is saved
+			# otherwise on restart it will appear as Activating
+			# until we update the state in __matchSystemPids
+			child.instance.state = InstanceState.Running
+			self.nm.vmStateChange(vmId, None, InstanceState.Running)
+			self.__saveChildInfo(child)
+			return vmId
+		except:
+			log.exception("instantiateVm failed")
+			return None
 	
 	# extern
 	def suspendVm(self, vmId, target):
 		child = self.__getChildFromPid(vmId)
-		tmpTarget = "/tmp/tashi_qemu_suspend_%d_%d" % (os.getpid(), vmId)
+		tmpTarget = "/%s/tashi_qemu_suspend_%d_%d" % (self.scratchDir, os.getpid(), vmId)
 		# XXX: Use fifo to improve performance
 		vmId = self.__stopVm(vmId, "\"exec:gzip -c > %s\"" % (tmpTarget), True)
 		self.dfs.copyTo(tmpTarget, target)
@@ -618,6 +640,8 @@ class Qemu(VmControlInterface):
 		while ("running" not in status):
 			status = self.__enterCommand(child, "info status")
 			time.sleep(1)
+		child.instance.state = InstanceState.Running
+		self.__saveChildInfo(child)
 	
 	# extern
 	def resumeVm(self, instance, source):
@@ -626,7 +650,19 @@ class Qemu(VmControlInterface):
 		child = self.__getChildFromPid(vmId)
 		child.cmd = cmd
 		return vmId
-	
+
+	def __checkPortListening(self, port):
+		lc = 0
+		# XXXpipe: find whether something is listening yet on the port
+		(stdin, stdout) = os.popen2("netstat -ln | grep 0.0.0.0:%d | wc -l" % (port))
+		stdin.close()
+		r = stdout.read()
+		lc = int(r.strip())
+		if (lc < 1):
+			return False
+		else:
+			return True
+
 	# extern
 	def prepReceiveVm(self, instance, source):
 		self.usedPortsLock.acquire()
@@ -643,16 +679,10 @@ class Qemu(VmControlInterface):
 		child.cmd = cmd
 		child.transportCookie = transportCookie
 		self.__saveChildInfo(child)
-		# XXX: Cleanly wait until the port is open
-		lc = 0
-		while (lc < 1):
-# XXXpipe: find whether something is listening yet on the port
-			(stdin, stdout) = os.popen2("netstat -ln | grep 0.0.0.0:%d | wc -l" % (port))
-			stdin.close()
-			r = stdout.read()
-			lc = int(r.strip())
-			if (lc < 1):
-				time.sleep(1.0)
+		# XXX: Cleanly wait until the port is listening
+		while self.__checkPortListening(port) is not True:
+			time.sleep(1)
+
 		return transportCookie
 	
 	# extern
@@ -662,9 +692,11 @@ class Qemu(VmControlInterface):
 			(port, _vmId, _hostname) = cPickle.loads(transportCookie)
 			child = self.__getChildFromPid(vmId)
 			child.migratingOut = True
+			# tell the VM to live-migrate out
 			res = self.__stopVm(vmId, "tcp:%s:%d" % (target, port), False)
 			# XXX: Some sort of feedback would be nice
 			# XXX: Should we block?
+			# XXXstroucki: isn't this what __waitForExit does?
 			self.__waitForExit(vmId)
 		finally:
 			self.migrationSemaphore.release()
@@ -693,15 +725,22 @@ class Qemu(VmControlInterface):
 	def pauseVm(self, vmId):
 		child = self.__getChildFromPid(vmId)
 		self.__enterCommand(child, "stop")
+		# XXXstroucki we have no Stopped state, so consider
+		# the VM still Running?
 	
 	# extern
 	def unpauseVm(self, vmId):
 		child = self.__getChildFromPid(vmId)
 		self.__enterCommand(child, "c")
+		# XXXstroucki as above, should this be a state change
+		# or not?
 	
 	# extern
 	def shutdownVm(self, vmId):
 		"""'system_powerdown' doesn't seem to actually shutdown the VM on some versions of KVM
with some versions of Linux"""
+		# If clean shutdown is desired, should try on VM first,
+		# shutdownVm second and if that doesn't work use
+		# destroyVm
 		child = self.__getChildFromPid(vmId)
 		self.__enterCommand(child, "system_powerdown")
 	
@@ -712,53 +751,80 @@ class Qemu(VmControlInterface):
 		# XXX: the child could have exited between these two points, but I don't know how to fix
that since it might not be our child process
 		os.kill(child.pid, signal.SIGKILL)
 	
+	def __specificStartVnc(self, vmId):
+		child = self.__getChildFromPid(vmId)
+		hostname = socket.gethostname()
+		if (child.vncPort == -1):
+			self.vncPortLock.acquire()
+			port = 0
+			while (port in self.vncPorts):
+				port += 1
+
+			self.vncPorts.append(port)
+			self.vncPortLock.release()
+			self.__enterCommand(child, "change vnc :%d" % (port))
+			child.vncPort = port
+			self.__saveChildInfo(child)
+		port = child.vncPort
+		return "VNC running on %s:%d" % (hostname, port + 5900)
+
+	def __specificStopVnc(self, vmId):
+		child = self.__getChildFromPid(vmId)
+		self.__enterCommand(child, "change vnc none")
+		if (child.vncPort != -1):
+			self.vncPortLock.acquire()
+			self.vncPorts.remove(child.vncPort)
+			self.vncPortLock.release()
+			child.vncPort = -1
+			self.__saveChildInfo(child)
+		return "VNC halted"
+
+	def __specificChangeCdRom(self, vmId, iso):
+		child = self.__getChildFromPid(vmId)
+		imageLocal = self.dfs.getLocalHandle("images/" + iso)
+		self.__enterCommand(child, "change ide1-cd0 %s" % (imageLocal))
+		return "Changed ide1-cd0 to %s" % (iso)
+
+	def __specificStartConsole(self, vmId):
+		child = self.__getChildFromPid(vmId)
+		hostname = socket.gethostname()
+		self.consolePortLock.acquire()
+		# XXXstroucki why not use the existing ports scheme?
+		consolePort = self.consolePort
+		self.consolePort += 1
+		self.consolePortLock.release()
+		threading.Thread(target=controlConsole, args=(child,consolePort)).start()
+		return "Control console listening on %s:%d" % (hostname, consolePort)
+
 	# extern
 	def vmmSpecificCall(self, vmId, arg):
 		arg = arg.lower()
+		changeCdText = "changecdrom:"
+
 		if (arg == "startvnc"):
-			child = self.__getChildFromPid(vmId)
-			hostname = socket.gethostname()
-			if (child.vncPort == -1):
-				self.vncPortLock.acquire()
-				port = 0
-				while (port in self.vncPorts):
-					port = port + 1
-				self.vncPorts.append(port)
-				self.vncPortLock.release()
-				self.__enterCommand(child, "change vnc :%d" % (port))
-				child.vncPort = port
-				self.__saveChildInfo(child)
-			port = child.vncPort
-			return "VNC started on %s:%d" % (hostname, port+5900)
+			return self.__specificStartVnc(vmId)
+
 		elif (arg == "stopvnc"):
-			child = self.__getChildFromPid(vmId)
-			self.__enterCommand(child, "change vnc none")
-			if (child.vncPort != -1):
-				self.vncPortLock.acquire()
-				self.vncPorts.remove(child.vncPort)
-				self.vncPortLock.release()
-				child.vncPort = -1
-				self.__saveChildInfo(child)
-			return "VNC halted"
-		elif (arg.startswith("changecdrom:")):
-			child = self.__getChildFromPid(vmId)
-			iso = scrubString(arg[12:])
-			imageLocal = self.dfs.getLocalHandle("images/" + iso)
-			self.__enterCommand(child, "change ide1-cd0 %s" % (imageLocal))
-			return "Changed ide1-cd0 to %s" % (iso)
+			return self.__specificStopVnc(vmId)
+
+		elif (arg.startswith(changeCdText)):
+			iso = scrubString(arg[len(changeCdText):])
+			return self.__specificChangeCdRom(vmId, iso)
+
 		elif (arg == "startconsole"):
-			child = self.__getChildFromPid(vmId)
-			hostname = socket.gethostname()
-			self.consolePortLock.acquire()
-			consolePort = self.consolePort
-			self.consolePort = self.consolePort+1
-			self.consolePortLock.release()
-			threading.Thread(target=controlConsole, args=(child,consolePort)).start()
-			return "Control console listenting on %s:%d" % (hostname, consolePort)
+			return self.__specificStartConsole(vmId)
+
 		elif (arg == "list"):
-			return "startVnc\nstopVnc\nchangeCdrom:<image.iso>\nstartConsole"
+			commands = [
+				"startVnc",
+				"stopVnc",
+				"changeCdrom:<image.iso>",
+				"startConsole",
+				]
+			return "\n".join(commands)
+				
 		else:
-			return "Unknown arg %s" % (arg)
+			return "Unknown command %s" % (arg)
 	
 	# extern
 	def listVms(self):
@@ -769,6 +835,7 @@ class Qemu(VmControlInterface):
 		ticksPerSecond = float(os.sysconf('SC_CLK_TCK'))
 		netStats = {}
 		cpuStats = {}
+		# XXXstroucki be more exact here?
 		last = time.time() - self.statsInterval
 		while True:
 			now = time.time()
@@ -777,7 +844,7 @@ class Qemu(VmControlInterface):
 				netData = f.readlines()
 				f.close()
 				for l in netData:
-					if (l.find("tashi") != -1):
+					if (l.find(self.ifPrefix) != -1):
 						(dev, sep, ld) = stringPartition(l, ":")
 						dev = dev.strip()
 						ws = ld.split()
@@ -785,6 +852,9 @@ class Qemu(VmControlInterface):
 						sendBytes = float(ws[8])
 						(recvMBs, sendMBs, lastRecvBytes, lastSendBytes) = netStats.get(dev, (0.0, 0.0, recvBytes,
sendBytes))
 						if (recvBytes < lastRecvBytes):
+							# We seem to have overflowed
+							# XXXstroucki How likely is this to happen?
+
 							if (lastRecvBytes > 2**32):
 								lastRecvBytes = lastRecvBytes - 2**64
 							else:
@@ -814,7 +884,7 @@ class Qemu(VmControlInterface):
 					child = self.controlledVMs[vmId]
 					(recvMBs, sendMBs, recvBytes, sendBytes) = (0.0, 0.0, 0.0, 0.0)
 					for i in range(0, len(child.instance.nics)):
-						netDev = "tashi%d.%d" % (child.instance.id, i)
+						netDev = "%s%d.%d" % (self.ifPrefix, child.instance.id, i)
 						(tmpRecvMBs, tmpSendMBs, tmpRecvBytes, tmpSendBytes) = netStats.get(netDev, (0.0, 0.0,
0.0, 0.0))
 						(recvMBs, sendMBs, recvBytes, sendBytes) = (recvMBs + tmpRecvMBs, sendMBs + tmpSendMBs,
recvBytes + tmpRecvBytes, sendBytes + tmpSendBytes)
 					self.stats[vmId] = self.stats.get(vmId, {})



Mime
View raw message