1. Denis Bilenko
  2. gevent


gevent / util / virtualbox.py

Denis Bilenko 71bd6cf 

Denis Bilenko 6e6e7a3 

Denis Bilenko 71bd6cf 

Denis Bilenko 260bc49 

Denis Bilenko 71bd6cf 

Denis Bilenko 0df167e 
Denis Bilenko 71bd6cf 

Denis Bilenko 0df167e 

Denis Bilenko 71bd6cf 

import sys
import os
import re
import time
import datetime
from functools import wraps
from subprocess import Popen

def remote_wrapper(function):
    def wrapper(*args, **kwargs):
        print '%s: STARTED.' % datetime.datetime.now()
        result = function(*args, **kwargs)
        print '%s: DONE.' % datetime.datetime.now()
        return result
    return wrapper

def get_output(command):
    # XXX use Popen
    result = os.popen(command)
    output = result.read()
    exitcode = result.close()
    if exitcode:
        raise SystemExit('Command %r failed with code %r' % (command, exitcode))
    return output

info_re = ('(^|\n)Name:\s*(?P<name>[^\n]+)'
           '(\nGuest OS:\s*(?P<os>[^\n]+))?'

info_re = re.compile(info_re, re.DOTALL)
description_re = re.compile('^Description:(?P<desc>.*?\n.*?\n)', re.M)
state_re = re.compile('^State:\s*(.*?)$', re.M)

def get_machines(filter_inaccessible=True):
    output = get_output('VBoxManage list -l vms 2> /dev/null')
    results = []
    for m in info_re.finditer(output):
        info = m.groupdict()
        info['start'] = m.end(0)
        if results:
            results[-1]['end'] = m.start(0)

    for result in results:
        text = output[result.pop('start', 0):result.pop('end', None)]
        d = description_re.findall(text)
        if d:
            assert len(d) == 1, (result, d)
            result['desc'] = d[0].strip()

    if filter_inaccessible:
        results = [m for m in results if m.get('name') and m.get('name') != '<inaccessible!>']

    return results

def get_machine(name):
    for machine in get_machines():
        if machine['name'] == name:
            return machine

def vbox_get_state(name):
    if not isinstance(name, basestring):
        raise TypeError('Expected string: %r' % (name, ))
    output = get_output('VBoxManage showvminfo %s' % name)
    state = state_re.findall(output)
    assert len(state) == 1, state
    return state[0].split('(')[0].replace(' ', '')

def get_default_machine(desc=None, os='windows'):
    machines = get_machines()

    if os:
        machines = [m for m in machines if os in m.get('os', '').lower()]
        if len(machines) == 1:
            return machines[0]

    if desc:
        machines = [m for m in machines if desc in m.get('desc', '').lower()]
        if len(machines) == 1:
            return machines[0]

    if not machines:
        sys.exit('Could not find an appropriate VirtualBox VM. Pass --machine NAME.')

    if machines:
        sys.exit('More than one machine matches. Pass --machine NAME.')

def system(command, fail=True):
    noisy = system.noisy
    if noisy:
        print 'Running %r' % command
    if isinstance(command, basestring):
        args = command.split()
        args = command
    result = Popen(args).wait()
    if result:
        msg = 'Command %r failed with code %r' % (command, result)
        if fail:
        elif noisy:
            sys.stderr.write(msg + '\n')
            return result
    if noisy:
        msg = 'Command %r succeeded' % command
        print msg
        print '-' * min(78, len(msg))
    return result

system.noisy = False

def unlink(path):
    except OSError, ex:
        if ex.errno == 2:  # No such file or directory

class VirtualBox(object):

    def __init__(self, name, username, password='', type=None):
        self.name = name
        self.username = username
        self.password = password
        self.type = type

    def start(self):
        self.initial_state = start(self.name, self.type)
        return self.initial_state

    def stop(self):
        if self.initial_state == 'paused':
        elif self.initial_state == 'saved':
        elif self.initial_state != 'running':

    def pause(self):
        return vbox_pause(self.name)

    def poweroff(self):
        return vbox_poweroff(self.name)

    def restore(self):
        return vbox_restore(self.name)

    def mkdir(self, path, **kwargs):
        return vbox_mkdir(self.name, path, username=self.username, password=self.password, **kwargs)

    def copyto(self, source, dest):
        return vbox_copyto(self.name, source, dest, username=self.username, password=self.password)

    def copyfrom(self, source, dest):
        return vbox_copyfrom(self.name, source, dest, username=self.username, password=self.password)

    def execute(self, exe, arguments):
        return vbox_execute(self.name, exe, arguments, username=self.username, password=self.password)

def start(name, type=None):
    state = vbox_get_state(name)
    if state == 'running':
    elif state == 'paused':
    elif state in ('saved', 'poweredoff'):
        vbox_startvm(name, type)
        print 'Weird state: %r' % state
        vbox_poweroff(name, fail=False)
        vbox_startvm(name, type)
    return state

def vbox_startvm(name, type=None):
    if type:
        options = ' --type ' + type
        options = ''
    system('VBoxManage startvm %s%s' % (name, options))

def vbox_resume(name):
    system('VBoxManage controlvm %s resume' % name)

def vbox_pause(name):
    system('VBoxManage controlvm %s pause' % name)

def vbox_poweroff(name, **kwargs):
    system('VBoxManage controlvm %s poweroff' % name, **kwargs)

def vbox_restorecurrent(name):
    system('VBoxManage snapshot %s restorecurrent' % name)

def vbox_restore(name):
    state = vbox_get_state(name)
    if state == 'saved':
    if state == 'running':

def _get_options(username=None, password=None, image=None):
    from pipes import quote
    options = ''
    if username:
        options += ' --username %s' % quote(username)
    if password:
        options += ' --password %s' % quote(password)
    if image:
        options += ' --image %s' % quote(image)
    return options

def _vbox_mkdir(name, path, username=None, password=None):
    from pipes import quote
    system('VBoxManage guestcontrol %s mkdir %s%s' % (name, quote(path), _get_options(username, password)))

def vbox_mkdir(name, path, username=None, password=None, timeout=90):
    end = time.time() + timeout
    while True:
            return _vbox_mkdir(name, path, username=username, password=password)
        except SystemExit:
            if time.time() > end:

def vbox_copyto(name, source, dest, username=None, password=None):
    from pipes import quote
    args = (name, quote(os.path.abspath(source)), quote(dest), _get_options(username, password))
    system('VBoxManage guestcontrol %s copyto %s %s%s' % args)

def vbox_copyfrom(name, source, dest, username=None, password=None):
    from pipes import quote
    args = (name, quote(source), quote(os.path.abspath(dest)), _get_options(username, password))
    system('VBoxManage guestcontrol %s copyfrom %s %s%s' % args)

def vbox_execute(name, image, arguments=None, username=None, password=None):
    from pipes import quote
    options = _get_options(username, password, image)
    options += ' --wait-stdout --wait-stderr'
        command = 'VBoxManage guestcontrol %s execute %s' % (name, options)
        if arguments:
            command += ' -- %s' % ' '.join(quote(x) for x in arguments)
    except SystemExit, ex:
        sys.stderr.write(str(ex) + '\n')

if __name__ == '__main__':
    command = sys.argv[1]
    command = globals()['vbox_' + command]
    assert callable(command), command