From tashi-commits-return-323-apmail-incubator-tashi-commits-archive=incubator.apache.org@incubator.apache.org Sat Dec 31 21:09:39 2011 Return-Path: X-Original-To: apmail-incubator-tashi-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-tashi-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 35E5CB001 for ; Sat, 31 Dec 2011 21:09:39 +0000 (UTC) Received: (qmail 98701 invoked by uid 500); 31 Dec 2011 21:09:39 -0000 Delivered-To: apmail-incubator-tashi-commits-archive@incubator.apache.org Received: (qmail 98677 invoked by uid 500); 31 Dec 2011 21:09:39 -0000 Mailing-List: contact tashi-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: tashi-dev@incubator.apache.org Delivered-To: mailing list tashi-commits@incubator.apache.org Received: (qmail 98669 invoked by uid 99); 31 Dec 2011 21:09:38 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 31 Dec 2011 21:09:38 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FRT_STOCK2 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 31 Dec 2011 21:09:32 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 5AAA2238897D; Sat, 31 Dec 2011 21:09:12 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1226188 - /incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/vmcontrol/qemu.py Date: Sat, 31 Dec 2011 21:09:12 -0000 To: tashi-commits@incubator.apache.org From: stroucki@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111231210912.5AAA2238897D@eris.apache.org> Author: stroucki Date: Sat Dec 31 21:09:11 2011 New Revision: 1226188 URL: http://svn.apache.org/viewvc?rev=1226188&view=rev Log: qemu.py: finish revising qemu.py Modified: incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/vmcontrol/qemu.py Modified: incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/vmcontrol/qemu.py URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/vmcontrol/qemu.py?rev=1226188&r1=1226187&r2=1226188&view=diff ============================================================================== --- incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/vmcontrol/qemu.py (original) +++ incubator/tashi/branches/stroucki-accounting/src/tashi/nodemanager/vmcontrol/qemu.py Sat Dec 31 21:09:11 2011 @@ -98,6 +98,8 @@ class Qemu(VmControlInterface): self.statsInterval = float(self.config.get("Qemu", "statsInterval")) # XXXstroucki amount of reserved memory could be configurable self.reservedMem = 512 + # XXXstroucki perhaps make this configurable + self.ifPrefix = "tashi" self.controlledVMs = {} self.usedPorts = [] self.usedPortsLock = threading.Lock() @@ -492,7 +494,7 @@ class Qemu(VmControlInterface): nicString = "" for i in range(0, len(instance.nics)): nic = instance.nics[i] - nicString = nicString + "-net nic,macaddr=%s,model=%s,vlan=%d -net tap,ifname=tashi%d.%d,vlan=%d,script=/etc/qemu-ifup.%d " % (nic.mac, nicModel, nic.network, instance.id, i, nic.network, nic.network) + nicString = nicString + "-net nic,macaddr=%s,model=%s,vlan=%d -net tap,ifname=%s%d.%d,vlan=%d,script=/etc/qemu-ifup.%d " % (nic.mac, nicModel, nic.network, self.ifPrefix, instance.id, i, nic.network, nic.network) # ACPI if (boolean(instance.hints.get("noAcpi", False))): @@ -574,32 +576,52 @@ class Qemu(VmControlInterface): if (target): retry = self.migrationRetries while (retry > 0): + # migrate in foreground respecting cow backed + # images + # XXXstroucki if we're doing this in the fg + # then it may still be ongoing when the timeout + # happens, and no way of interrupting it + # trying to restart the migration by running + # the command again (when qemu is ready to + # listen again) is probably not helpful res = self.__enterCommand(child, "migrate -i %s" % (target), timeout=self.migrateTimeout) retry = retry - 1 if (res.find("migration failed") == -1): - retry = -1 + retry = 0 + break else: log.error("Migration (transiently) failed: %s\n", res) if (retry == 0): log.error("Migration failed: %s\n", res) child.errorBit = True raise RuntimeError + # XXXstroucki what if migration is still ongoing, and + # qemu is not listening? self.__enterCommand(child, "quit", expectPrompt=False) return vmId # extern def instantiateVm(self, instance): - (vmId, cmd) = self.__startVm(instance, None) - child = self.__getChildFromPid(vmId) - self.__getPtyInfo(child, False) - child.cmd = cmd - self.__saveChildInfo(child) - return vmId + try: + (vmId, cmd) = self.__startVm(instance, None) + child = self.__getChildFromPid(vmId) + self.__getPtyInfo(child, False) + child.cmd = cmd + # XXXstroucki Should make sure Running state is saved + # otherwise on restart it will appear as Activating + # until we update the state in __matchSystemPids + child.instance.state = InstanceState.Running + self.nm.vmStateChange(vmId, None, InstanceState.Running) + self.__saveChildInfo(child) + return vmId + except: + log.exception("instantiateVm failed") + return None # extern def suspendVm(self, vmId, target): child = self.__getChildFromPid(vmId) - tmpTarget = "/tmp/tashi_qemu_suspend_%d_%d" % (os.getpid(), vmId) + tmpTarget = "/%s/tashi_qemu_suspend_%d_%d" % (self.scratchDir, os.getpid(), vmId) # XXX: Use fifo to improve performance vmId = self.__stopVm(vmId, "\"exec:gzip -c > %s\"" % (tmpTarget), True) self.dfs.copyTo(tmpTarget, target) @@ -618,6 +640,8 @@ class Qemu(VmControlInterface): while ("running" not in status): status = self.__enterCommand(child, "info status") time.sleep(1) + child.instance.state = InstanceState.Running + self.__saveChildInfo(child) # extern def resumeVm(self, instance, source): @@ -626,7 +650,19 @@ class Qemu(VmControlInterface): child = self.__getChildFromPid(vmId) child.cmd = cmd return vmId - + + def __checkPortListening(self, port): + lc = 0 + # XXXpipe: find whether something is listening yet on the port + (stdin, stdout) = os.popen2("netstat -ln | grep 0.0.0.0:%d | wc -l" % (port)) + stdin.close() + r = stdout.read() + lc = int(r.strip()) + if (lc < 1): + return False + else: + return True + # extern def prepReceiveVm(self, instance, source): self.usedPortsLock.acquire() @@ -643,16 +679,10 @@ class Qemu(VmControlInterface): child.cmd = cmd child.transportCookie = transportCookie self.__saveChildInfo(child) - # XXX: Cleanly wait until the port is open - lc = 0 - while (lc < 1): -# XXXpipe: find whether something is listening yet on the port - (stdin, stdout) = os.popen2("netstat -ln | grep 0.0.0.0:%d | wc -l" % (port)) - stdin.close() - r = stdout.read() - lc = int(r.strip()) - if (lc < 1): - time.sleep(1.0) + # XXX: Cleanly wait until the port is listening + while self.__checkPortListening(port) is not True: + time.sleep(1) + return transportCookie # extern @@ -662,9 +692,11 @@ class Qemu(VmControlInterface): (port, _vmId, _hostname) = cPickle.loads(transportCookie) child = self.__getChildFromPid(vmId) child.migratingOut = True + # tell the VM to live-migrate out res = self.__stopVm(vmId, "tcp:%s:%d" % (target, port), False) # XXX: Some sort of feedback would be nice # XXX: Should we block? + # XXXstroucki: isn't this what __waitForExit does? self.__waitForExit(vmId) finally: self.migrationSemaphore.release() @@ -693,15 +725,22 @@ class Qemu(VmControlInterface): def pauseVm(self, vmId): child = self.__getChildFromPid(vmId) self.__enterCommand(child, "stop") + # XXXstroucki we have no Stopped state, so consider + # the VM still Running? # extern def unpauseVm(self, vmId): child = self.__getChildFromPid(vmId) self.__enterCommand(child, "c") + # XXXstroucki as above, should this be a state change + # or not? # extern def shutdownVm(self, vmId): """'system_powerdown' doesn't seem to actually shutdown the VM on some versions of KVM with some versions of Linux""" + # If clean shutdown is desired, should try on VM first, + # shutdownVm second and if that doesn't work use + # destroyVm child = self.__getChildFromPid(vmId) self.__enterCommand(child, "system_powerdown") @@ -712,53 +751,80 @@ class Qemu(VmControlInterface): # XXX: the child could have exited between these two points, but I don't know how to fix that since it might not be our child process os.kill(child.pid, signal.SIGKILL) + def __specificStartVnc(self, vmId): + child = self.__getChildFromPid(vmId) + hostname = socket.gethostname() + if (child.vncPort == -1): + self.vncPortLock.acquire() + port = 0 + while (port in self.vncPorts): + port += 1 + + self.vncPorts.append(port) + self.vncPortLock.release() + self.__enterCommand(child, "change vnc :%d" % (port)) + child.vncPort = port + self.__saveChildInfo(child) + port = child.vncPort + return "VNC running on %s:%d" % (hostname, port + 5900) + + def __specificStopVnc(self, vmId): + child = self.__getChildFromPid(vmId) + self.__enterCommand(child, "change vnc none") + if (child.vncPort != -1): + self.vncPortLock.acquire() + self.vncPorts.remove(child.vncPort) + self.vncPortLock.release() + child.vncPort = -1 + self.__saveChildInfo(child) + return "VNC halted" + + def __specificChangeCdRom(self, vmId, iso): + child = self.__getChildFromPid(vmId) + imageLocal = self.dfs.getLocalHandle("images/" + iso) + self.__enterCommand(child, "change ide1-cd0 %s" % (imageLocal)) + return "Changed ide1-cd0 to %s" % (iso) + + def __specificStartConsole(self, vmId): + child = self.__getChildFromPid(vmId) + hostname = socket.gethostname() + self.consolePortLock.acquire() + # XXXstroucki why not use the existing ports scheme? + consolePort = self.consolePort + self.consolePort += 1 + self.consolePortLock.release() + threading.Thread(target=controlConsole, args=(child,consolePort)).start() + return "Control console listening on %s:%d" % (hostname, consolePort) + # extern def vmmSpecificCall(self, vmId, arg): arg = arg.lower() + changeCdText = "changecdrom:" + if (arg == "startvnc"): - child = self.__getChildFromPid(vmId) - hostname = socket.gethostname() - if (child.vncPort == -1): - self.vncPortLock.acquire() - port = 0 - while (port in self.vncPorts): - port = port + 1 - self.vncPorts.append(port) - self.vncPortLock.release() - self.__enterCommand(child, "change vnc :%d" % (port)) - child.vncPort = port - self.__saveChildInfo(child) - port = child.vncPort - return "VNC started on %s:%d" % (hostname, port+5900) + return self.__specificStartVnc(vmId) + elif (arg == "stopvnc"): - child = self.__getChildFromPid(vmId) - self.__enterCommand(child, "change vnc none") - if (child.vncPort != -1): - self.vncPortLock.acquire() - self.vncPorts.remove(child.vncPort) - self.vncPortLock.release() - child.vncPort = -1 - self.__saveChildInfo(child) - return "VNC halted" - elif (arg.startswith("changecdrom:")): - child = self.__getChildFromPid(vmId) - iso = scrubString(arg[12:]) - imageLocal = self.dfs.getLocalHandle("images/" + iso) - self.__enterCommand(child, "change ide1-cd0 %s" % (imageLocal)) - return "Changed ide1-cd0 to %s" % (iso) + return self.__specificStopVnc(vmId) + + elif (arg.startswith(changeCdText)): + iso = scrubString(arg[len(changeCdText):]) + return self.__specificChangeCdRom(vmId, iso) + elif (arg == "startconsole"): - child = self.__getChildFromPid(vmId) - hostname = socket.gethostname() - self.consolePortLock.acquire() - consolePort = self.consolePort - self.consolePort = self.consolePort+1 - self.consolePortLock.release() - threading.Thread(target=controlConsole, args=(child,consolePort)).start() - return "Control console listenting on %s:%d" % (hostname, consolePort) + return self.__specificStartConsole(vmId) + elif (arg == "list"): - return "startVnc\nstopVnc\nchangeCdrom:\nstartConsole" + commands = [ + "startVnc", + "stopVnc", + "changeCdrom:", + "startConsole", + ] + return "\n".join(commands) + else: - return "Unknown arg %s" % (arg) + return "Unknown command %s" % (arg) # extern def listVms(self): @@ -769,6 +835,7 @@ class Qemu(VmControlInterface): ticksPerSecond = float(os.sysconf('SC_CLK_TCK')) netStats = {} cpuStats = {} + # XXXstroucki be more exact here? last = time.time() - self.statsInterval while True: now = time.time() @@ -777,7 +844,7 @@ class Qemu(VmControlInterface): netData = f.readlines() f.close() for l in netData: - if (l.find("tashi") != -1): + if (l.find(self.ifPrefix) != -1): (dev, sep, ld) = stringPartition(l, ":") dev = dev.strip() ws = ld.split() @@ -785,6 +852,9 @@ class Qemu(VmControlInterface): sendBytes = float(ws[8]) (recvMBs, sendMBs, lastRecvBytes, lastSendBytes) = netStats.get(dev, (0.0, 0.0, recvBytes, sendBytes)) if (recvBytes < lastRecvBytes): + # We seem to have overflowed + # XXXstroucki How likely is this to happen? + if (lastRecvBytes > 2**32): lastRecvBytes = lastRecvBytes - 2**64 else: @@ -814,7 +884,7 @@ class Qemu(VmControlInterface): child = self.controlledVMs[vmId] (recvMBs, sendMBs, recvBytes, sendBytes) = (0.0, 0.0, 0.0, 0.0) for i in range(0, len(child.instance.nics)): - netDev = "tashi%d.%d" % (child.instance.id, i) + netDev = "%s%d.%d" % (self.ifPrefix, child.instance.id, i) (tmpRecvMBs, tmpSendMBs, tmpRecvBytes, tmpSendBytes) = netStats.get(netDev, (0.0, 0.0, 0.0, 0.0)) (recvMBs, sendMBs, recvBytes, sendBytes) = (recvMBs + tmpRecvMBs, sendMBs + tmpSendMBs, recvBytes + tmpRecvBytes, sendBytes + tmpSendBytes) self.stats[vmId] = self.stats.get(vmId, {})