Commits

Mattias de Hollander committed f9d42dc

Adding support for OpenNebula

Comments (0)

Files changed (5)

 from cm.util import paths
 
 from cm.clouds.ec2 import EC2Interface
+from cm.clouds.opennebula import ONInterface
+from cm.clouds.dummy import DummyInterface
+
 
 log = logging.getLogger( 'cloudman' )
 logging.getLogger('boto').setLevel(logging.INFO)
         else:
             self.TESTFLAG = False
             self.logger.setLevel(logging.INFO)
+
+        if self.ud.has_key("localflag"):
+            self.LOCALFLAG = bool(self.ud['localflag'])
+            self.logger.setLevel(logging.DEBUG)
+        else:
+            self.LOCALFLAG = False
+            self.logger.setLevel(logging.INFO)
+            
         log.addHandler(self.logger)
         # Read config file and check for errors
         self.config = config.Configuration( **kwargs )
         config.configure_logging( self.config )
         log.debug( "Initializing app" )
         self.manager = None
+        # DBTODO make this flexible for other cloud interfaces.
+        if self.ud['cloud_type'] == "ec2":
+            self.cloud_interface = EC2Interface(aws_access_key=self.ud['access_key'], aws_secret_key=self.ud['secret_key'], app=self)
+        elif self.ud['cloud_type'] == 'opennebula':
+            self.cloud_interface = ONInterface(aws_access_key=self.ud['access_key'], aws_secret_key=self.ud['secret_key'], app=self, on_username=self.ud['on_username'], on_password=self.ud['on_password'], on_host=self.ud['on_host'])
+        elif self.ud['cloud_type'] == 'dummy':
+            self.cloud_interface = DummyInterface(aws_access_key=self.ud['access_key'], aws_secret_key=self.ud['secret_key'], app=self, on_username=self.ud['on_username'], on_password=self.ud['on_password'], on_host=self.ud['on_host'])
+
         # Update user data to include persistent data stored in cluster's bucket, if it exists
         # This enables cluster configuration to be recovered on cluster re-instantiation
         if self.ud.has_key('bucket_cluster'):
                 
     def shutdown(self, delete_cluster=False):
         if self.manager:
-            self.manager.shutdown(delete_cluster=delete_cluster)
+            self.manager.shutdown(delete_cluster=delete_cluster)

cm/clouds/opennebula.py

+import urllib, socket
+from cm.clouds import CloudInterface
+from cm.services.data.filesystem import Volume
+
+from oca import Client, VirtualMachine, VirtualMachinePool, CONNECTED
+from oca.exceptions import OpenNebulaException
+
+import new
+
+# Obtaining IP and MAC addresses
+import socket
+import fcntl
+import struct
+
+import subprocess
+
+import logging
+log = logging.getLogger( 'cloudman' )
+
+class ONInterface(CloudInterface):
+    
+    def __init__(self, aws_access_key, aws_secret_key, app=None, on_username=None, on_password=None, on_host=None):
+        super(ONInterface, self).__init__()
+        self.aws_access_key = aws_access_key
+        self.aws_secret_key = aws_secret_key
+        self.app = app
+        self.on_username = on_username
+        self.on_password = on_password
+        self.on_host = on_host
+        self.bridge = 72
+
+    def _getIpAddress(self, ifname):
+        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        try:
+            ip = socket.inet_ntoa(fcntl.ioctl(
+                                            s.fileno(),
+                                            0x8915,  # SIOCGIFADDR
+                                            struct.pack('256s', ifname[:15])
+                                            )[20:24])
+        except IOError:
+            return None
+        return ip
+
+    def _getMacAddress(self, ifname):
+        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        info = fcntl.ioctl(s.fileno(), 0x8927,  struct.pack('256s', ifname[:15]))
+        return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1]
+
+    def get_ami( self ):
+        pass
+    
+    def get_type( self ):
+        #return 'worker'
+        pass
+    
+    def get_instance_id( self ):
+        # There does not exist a method (yet) to get the instance
+        # of a running virtual machine
+        # The instance is now identified by its MAC address
+        # All connected instances are checked if they
+        # match with the current mac address and that VM ID 
+        # is returned
+        mac = self._getMacAddress('eth0')
+        log.debug(mac)
+        vmpool = VirtualMachinePool(self.s3_conn)
+        vmpool.info(CONNECTED)
+        for vm in list(vmpool):
+            vm_info = self.s3_conn.call("vm.info", vm.id)
+            if mac in vm_info:
+                return str(vm.id)
+        return None
+    
+    def get_zone( self ):
+        pass
+    
+    def get_security_groups( self ):
+        pass
+    
+    def get_key_pair_name( self ):
+        pass
+    
+    def get_self_private_ip( self ):
+        # TODO: Change this to the masters IP
+        log.debug("Asking for private IP")
+        return self._getIpAddress('eth0')
+    
+    def get_self_public_ip( self ):
+        # TODO: Change this to the masters IP
+        log.debug("Asking for public IP")
+        return self._getIpAddress('eth1')
+    
+    def get_fqdn(self):
+        # Return the hostname
+        return socket.getfqdn()
+    
+    def get_ec2_connection( self ):
+        log.debug( 'Getting OpenNebula connection' )
+        if self.ec2_conn == None:
+            log.debug("No OpenNebula Connection, creating a new one.")
+            try:
+                self.ec2_conn = Client("%s:%s" % (self.on_username, self.on_password), self.on_host)
+                self.ec2_conn.lookup = new.instancemethod(lookup, self.ec2_conn, self.ec2_conn.__class__)
+                self.ec2_conn.create_bucket = new.instancemethod(create_bucket, self.ec2_conn, self.ec2_conn.__class__)
+
+                try:
+                     vmpool = VirtualMachinePool(self.ec2_conn)
+                     vmpool.info(CONNECTED)
+                     list(vmpool)
+                     log.debug( 'Got OpenNebula connection.' )
+                except OpenNebulaException, e:
+                     log.error("Cannot validate provided credentials: %s" % e)
+                     self.ec2_conn = False
+            except Exception, e:
+                log.error(e)
+        return self
+    
+    def get_s3_connection( self ):
+        #Note: S3 = storage (bucket)
+        #Opennebula cloud has no storage
+        log.debug( 'Getting OpenNebula connection' )
+        if self.s3_conn == None:
+            log.debug("No OpenNebula Connection, creating a new one.")
+            try:
+                self.s3_conn = Client("%s:%s" % (self.on_username, self.on_password), self.on_host)
+                self.s3_conn.lookup = new.instancemethod(lookup, self.s3_conn, self.s3_conn.__class__)
+                self.s3_conn.create_bucket = new.instancemethod(create_bucket, self.s3_conn, self.s3_conn.__class__)
+ 
+                           
+                try:
+                     vmpool = VirtualMachinePool(self.s3_conn)
+                     vmpool.info(CONNECTED)
+                     list(vmpool)
+                     log.debug( 'Got OpenNebula connection.' )
+                except OpenNebulaException, e:
+                     log.error("Cannot validate provided credentials: %s" % e)
+                     self.s3_conn = False
+            except socket.error, e:
+                log.debug( "Socket error: %s:" % e )
+                log.debug("Trying to reboot the machine")
+                # It should be prevented that its reboot on a local machine (kvm)
+                # local machine hwaddr = 00:22:64:ae:3d:fd
+                # TODO
+                if self._getMacAddress('eth0') != '00:22:64:ae:3d:fd':
+                #if int(self.get_self_private_ip().split('.')[2]) == int(self.bridge):
+                    ret_code = subprocess.call( 'sudo telinit 6', shell=True )
+
+                
+            except Exception, e:
+                log.error(e)
+        return self.s3_conn
+        #return None
+    
+    def run_instances(self, image_id, min_count, max_count, key_name, security_groups, user_data, instance_type, placement):
+
+        #TODO: Change this!!
+        username = 'mdhollander'
+        diskimage = 'vm-lucid-amd64-serial-galaxy-worker.img'
+        vmname = "Cloudman_Node"
+        
+        log.debug("Worker user data: %s" % user_data)
+        log.debug("Adding OpenNebula Worker nodes")
+        
+        # TODO: Remove public NIC? Save disk?
+        vmtemplatestring ="""
+NAME=\"%s\" MEMORY=1024 CPU=1 OS=[BOOT=\"hd\"] 
+GRAPHICS=[type=\"vnc\"] 
+DISK=[TYPE=\"disk\", SOURCE=\"/home/%s/images/%s\", TARGET=\"hda\", CLONE=\"yes\", SAVE=\"no\", READONLY=\"n\" ]
+NIC=[NETWORK=\"%s\", MODEL=\"virtio\"]
+NIC=[NETWORK=\"public\", MODEL=\"virtio\"]
+""" % (vmname, username, diskimage, username)
+
+        r = Reservations()
+        for i in range(min_count,max_count+1):
+            new_vm_id = VirtualMachine.allocate(self.s3_conn, vmtemplatestring)
+            
+            # Get the just initiated instances
+            # TODO: Is there another way to retrieve it? Using new_vm_id?
+            vmpool = VirtualMachinePool(self.s3_conn)
+            vmpool.info(CONNECTED)
+            vm_instance = list(vmpool)[-1]
+            
+            vm_instance.add_tag = new.instancemethod(add_tag, vm_instance, vm_instance.__class__)
+            vm_instance.update = new.instancemethod(update, vm_instance, vm_instance.__class__)
+
+            #vm_instance.id = str(vm_instance.id)
+            
+            # Add tags dictionary
+            vm_instance.tags = {}
+
+            r.instances.append(vm_instance)
+        
+        return r
+
+# Emulate EC2 objects
+
+    def get_all_instances(self, filters={}, *args, **kwargs):
+        client = Client("%s:%s" % (self.on_username, self.on_password), self.on_host)
+        vmpool = VirtualMachinePool(client)
+        vmpool.info(CONNECTED)
+        reservations = []
+        log.debug("Get all instances")
+        
+        for vm_instance in vmpool:
+            # Make a distinction between worker and the master node
+            # The workers nodes have a role defined as worker (dictionary)
+            # The master node has a empty list as filter string
+            
+            # Instance ID needs to be a string
+            vm_instance.id = str(vm_instance.id)
+            
+            # Hold tags in a dictionary
+            vm_instance.tags = {}
+            try:
+                req_id = filters[0]
+                if int(req_id) == int(vm_instance.id):
+                    reservations = []
+                    r = Reservations()
+                    vm_instance.add_tag = new.instancemethod(add_tag, vm_instance, vm_instance.__class__)
+                    vm_instance.update = new.instancemethod(update, vm_instance, vm_instance.__class__)
+                    r.instances.append(vm_instance)
+                    reservations.append(r)
+                    return reservations
+            except:
+                pass
+            # TODO Add tags to instance? (unsure if this it the right way to do it)
+            try:
+                for key, value in filters.iteritems():
+                    tag = key.split(':')[1]
+                    vm_instance.tags[tag] = value
+            except AttributeError:
+                pass
+            
+            try:
+                role = filters['tag:role'] 
+                if role == "worker" and vm_instance.name.strip() == "Cloudman_Node":
+                    r = Reservations()
+                    vm_instance.add_tag = new.instancemethod(add_tag, vm_instance, vm_instance.__class__)
+                    vm_instance.update = new.instancemethod(update, vm_instance, vm_instance.__class__)
+                    #vm_instance.get_m_state = get_m_state
+                    
+                    r.instances.append(vm_instance)
+                    reservations.append(r)
+            except TypeError :
+                if vm_instance.name.strip() == "Galaxy_Main":
+                    r = Reservations()
+                    vm_instance.add_tag = new.instancemethod(add_tag, vm_instance, vm_instance.__class__)
+                    vm_instance.update = new.instancemethod(update, vm_instance, vm_instance.__class__)
+                    r.instances.append(vm_instance)
+                    reservations.append(r)
+                    
+        return reservations
+
+    def get_all_volumes(self, *args, **kwargs):
+        pass
+    
+    def terminate_instances(self, instances):
+        log.debug("Terminate instances")
+        
+        return True
+
+    def reboot_instances(self, instances, *args, **kwargs):
+        print "Rebooting %s" % instances
+        from time import sleep
+        # doing a 'slow' reboot: change state to stop and then to resume
+        # faster way: sending sudo telinit 6 to worker
+        while 1:
+            vmpool = VirtualMachinePool(self.s3_conn)
+            vmpool.info(CONNECTED)
+            for vm_instance in vmpool:
+                if vm_instance.id == int(instances[0]):
+                    if vm_instance.lcm_state == 3:
+                        log.debug("stop now!")
+                        vm_instance.stop()
+                        sleep(40)
+                    if vm_instance.state == 4:
+                        log.debug("rebooting")
+                        vm_instance.resume()
+                        return True
+                    
+    # EC2 can add volumes to a image
+    # For ON we will create a filesystem
+    # Call: Filesystem.add
+    def create_volume(self, size, zone, snapshot):
+        # Doing nothing
+        v = Volume('dummyapp')
+        v.id = None
+        v.add_tag = new.instancemethod(add_tag, v, v.__class__)
+
+        return v
+    
+
+
+        
+class Reservations(object):
+    def __init__(self):
+        self.instances = []
+
+# A EC2 instance object has a add_tag method
+# which is lacking in the OpenNebula object
+def add_tag(self, key, value, *args, **kwargs):
+    self.tags[key] = value
+
+def update(self):
+    """Should update instance"""
+    log.debug("Trying to update")
+    log.debug("Instance currently in %s state" % self.state)
+    log.debug("Instance id: %s" % self.id)
+
+
+
+#def get_m_state(self):
+#    print "GET M STATE"
+#    print self.state
+
+#TODO: Work on Bucket System!!
+# A EC2 connection object has lookup method
+# which is lacking in the oca Client object
+def lookup(self, *args, **kwargs):
+    pass
+
+def create_bucket(self, *args, **kwargs):
+    pass

cm/services/apps/galaxy.py

         self.status()
         self.start()
     
-    def manage_galaxy(self, to_be_started=True):
-        if self.app.TESTFLAG is True:
+    def manage_galaxy( self, to_be_started=True ):
+        if self.app.TESTFLAG is True and self.app.LOCALFLAG is False:
             log.debug( "Attempted to manage Galaxy, but TESTFLAG is set." )
             return
         os.putenv( "GALAXY_HOME", self.galaxy_home )
                     return False
                 # Retrieve config files from a persistent data repository (i.e., S3)
                 if not misc.get_file_from_bucket( s3_conn, self.app.ud['bucket_cluster'], 'universe_wsgi.ini.cloud', self.galaxy_home + '/universe_wsgi.ini' ):
-                    log.debug("Did not get Galaxy configuration file from cluster bucket '%s'" % self.app.ud['bucket_cluster'])
-                    log.debug("Trying to retrieve latest one (universe_wsgi.ini.cloud) from '%s' bucket..." % self.app.ud['bucket_default'])
-                    misc.get_file_from_bucket( s3_conn, self.app.ud['bucket_default'], 'universe_wsgi.ini.cloud', self.galaxy_home + '/universe_wsgi.ini' )
+                   log.debug("Did not get Galaxy configuration file from cluster bucket '%s'" % self.app.ud['bucket_cluster'])
+                   log.debug("Trying to retrieve latest one (universe_wsgi.ini.cloud) from '%s' bucket..." % self.app.ud['bucket_default'])
+                   misc.get_file_from_bucket( s3_conn, self.app.ud['bucket_default'], 'universe_wsgi.ini.cloud', self.galaxy_home + '/universe_wsgi.ini' )
                 self.add_galaxy_admin_users()
                 os.chown( self.galaxy_home + '/universe_wsgi.ini', pwd.getpwnam( "galaxy" )[2], grp.getgrnam( "galaxy" )[2] )
                 if not misc.get_file_from_bucket( s3_conn, self.app.ud['bucket_cluster'], 'tool_conf.xml.cloud', self.galaxy_home + '/tool_conf.xml' ):
-                    log.debug("Did not get Galaxy tool configuration file from cluster bucket '%s'" % self.app.ud['bucket_cluster'])
-                    log.debug("Trying to retrieve latest one (tool_conf.xml.cloud) from '%s' bucket..." % self.app.ud['bucket_default'])
-                    misc.get_file_from_bucket( s3_conn, self.app.ud['bucket_default'], 'tool_conf.xml.cloud', self.galaxy_home + '/tool_conf.xml' )
+                   log.debug("Did not get Galaxy tool configuration file from cluster bucket '%s'" % self.app.ud['bucket_cluster'])
+                   log.debug("Trying to retrieve latest one (tool_conf.xml.cloud) from '%s' bucket..." % self.app.ud['bucket_default'])
+                   misc.get_file_from_bucket( s3_conn, self.app.ud['bucket_default'], 'tool_conf.xml.cloud', self.galaxy_home + '/tool_conf.xml' )
                 os.chown( self.galaxy_home + '/tool_conf.xml', pwd.getpwnam( "galaxy" )[2], grp.getgrnam( "galaxy" )[2] )
                 if not misc.get_file_from_bucket( s3_conn, self.app.ud['bucket_cluster'], 'tool_data_table_conf.xml.cloud', self.galaxy_home + '/tool_data_table_conf.xml.cloud' ):
-                    log.debug("Did not get Galaxy tool_data_table_conf.xml.cloud file from cluster bucket '%s'" % self.app.ud['bucket_cluster'])
-                    log.debug("Trying to retrieve latest one (tool_data_table_conf.xml.cloud) from '%s' bucket..." % self.app.ud['bucket_default'])
-                    misc.get_file_from_bucket( s3_conn, self.app.ud['bucket_default'], 'tool_data_table_conf.xml.cloud', self.galaxy_home + '/tool_data_table_conf.xml.cloud' )
-                shutil.copy('%s/tool_data_table_conf.xml.cloud' % self.galaxy_home, '%s/tool_data_table_conf.xml' % self.galaxy_home)
-                os.chown( self.galaxy_home + '/tool_data_table_conf.xml', pwd.getpwnam( "galaxy" )[2], grp.getgrnam( "galaxy" )[2] )
+                   log.debug("Did not get Galaxy tool_data_table_conf.xml.cloud file from cluster bucket '%s'" % self.app.ud['bucket_cluster'])
+                   log.debug("Trying to retrieve latest one (tool_data_table_conf.xml.cloud) from '%s' bucket..." % self.app.ud['bucket_default'])
+                   misc.get_file_from_bucket( s3_conn, self.app.ud['bucket_default'], 'tool_data_table_conf.xml.cloud', self.galaxy_home + '/tool_data_table_conf.xml.cloud' )
+                try:
+                    shutil.copy('%s/tool_data_table_conf.xml.cloud' % self.galaxy_home, '%s/tool_data_table_conf.xml' % self.galaxy_home)
+                    os.chown( self.galaxy_home + '/tool_data_table_conf.xml', pwd.getpwnam( "galaxy" )[2], grp.getgrnam( "galaxy" )[2] )
+                except:
+                    pass
+                # 
+                #===============================================================
+                
                 # Make sure the temporary job_working_directory exists on user data volume (defined in universe_wsgi.ini.cloud)
                 if not os.path.exists('%s/tmp/job_working_directory' % paths.P_GALAXY_DATA):
                     os.makedirs('%s/tmp/job_working_directory/' % paths.P_GALAXY_DATA)
                 log.info( "Starting Galaxy..." )
                 # Make sure admin users get added
                 self.add_galaxy_admin_users()
+                log.debug('%s - galaxy -c "export SGE_ROOT=%s; sh $GALAXY_HOME/run.sh --daemon"' % (paths.P_SU, paths.P_SGE_ROOT))
                 if not misc.run('%s - galaxy -c "export SGE_ROOT=%s; sh $GALAXY_HOME/run.sh --daemon"' % (paths.P_SU, paths.P_SGE_ROOT), "Error invoking Galaxy", "Successfully initiated Galaxy start."):
                     self.state = service_states.ERROR
                     self.last_state_change_time = datetime.utcnow()
                 new_config_file.write(line)
             new_config_file.close()
             shutil.move(os.path.join(self.galaxy_home, 'universe_wsgi.ini.new'), os.path.join(self.galaxy_home, 'universe_wsgi.ini'))
-    
+    

cm/util/master.py

         and start available cluster services (as provided in the cluster's 
         configuration and persistent data )"""
         log.debug("ud at manager start: %s" % self.app.ud)
-        if self.app.TESTFLAG is True:
+        if self.app.TESTFLAG is True and self.app.LOCALFLAG is False:
             log.debug("Attempted to start the ConsoleManager. TESTFLAG is set; nothing to start, passing.")
             return None
         self.app.manager.services.append(SGEService(self.app))
+        if self.app.LOCALFLAG is True:
+            self.init_cluster(cluster_type='Galaxy')
+        
         if not self.add_preconfigured_services():
             return False
         self.manager_started = True
     def add_preconfigured_services(self):
         """ Inspect cluster configuration and persistent data and add 
         available/preconfigured services. """
-        if self.app.TESTFLAG is True:
+        if self.app.TESTFLAG is True and self.app.LOCALFLAG is False:
             log.debug("Attempted to add preconfigured cluster services but the TESTFLAG is set.")
             return None
         try:
     def get_worker_instances( self ):
         instances = []
         if self.app.TESTFLAG is True:
-            # for i in range(5):
+            #for i in range(5):
             #     instance = Instance( self.app, inst=None, m_state="Pending" )
             #     instance.id = "WorkerInstance"
             #     instances.append(instance)
         self.master_state = new_state
     
     def get_idle_instances( self ):
-        # log.debug( "Looking for idle instances" )
+        #log.debug( "Looking for idle instances" )
         idle_instances = [] # List of Instance objects corresponding to idle instances
         if os.path.exists('%s/default/common/settings.sh' % paths.P_SGE_ROOT):
             proc = subprocess.Popen( "export SGE_ROOT=%s; . $SGE_ROOT/default/common/settings.sh; %s/bin/lx24-amd64/qstat -f | grep all.q" % (paths.P_SGE_ROOT, paths.P_SGE_ROOT), shell=True, stdout=subprocess.PIPE )
         
             for idle_instance_dn in idle_instances_dn:
                  for w_instance in self.worker_instances:
-                     # log.debug( "Trying to match worker instance with private IP '%s' to idle instance '%s'" % ( w_instance.get_private_ip(), idle_instance_dn) )
+                     log.debug( "Trying to match worker instance with private IP '%s' to idle instance '%s'" % ( w_instance.get_private_ip(), idle_instance_dn) )
                      if w_instance.get_private_ip() is not None:
                          if w_instance.get_private_ip().lower().startswith( str(idle_instance_dn).lower() ) is True:
                             # log.debug( "Marking instance '%s' with FQDN '%s' as idle." % ( w_instance.id, idle_instance_dn ) )
         reservation = None
         if instance_type == '':
             instance_type = self.app.cloud_interface.get_type()
-        log.debug( "Using following command: ec2_conn.run_instances( image_id='%s', min_count=1, max_count='%s', key_name='%s', security_groups=['%s'], user_data=[%s], instance_type='%s', placement='%s' )"
-               % ( self.app.cloud_interface.get_ami(), num_nodes, self.app.cloud_interface.get_key_pair_name(), ", ".join( self.app.cloud_interface.get_security_groups() ), worker_ud_str, instance_type, self.app.cloud_interface.get_zone() ) )
+        #log.debug( "Using following command: ec2_conn.run_instances( image_id='%s', min_count=1, max_count='%s', key_name='%s', security_groups=['%s'], user_data=[%s], instance_type='%s', placement='%s' )"
+        #       % ( self.app.cloud_interface.get_ami(), num_nodes, self.app.cloud_interface.get_key_pair_name(), ", ".join( self.app.cloud_interface.get_security_groups() ), worker_ud_str, instance_type, self.app.cloud_interface.get_zone() ) )
         try:
-            # log.debug( "Would be starting worker instance(s)..." )
+            log.debug( "Would be starting worker instance(s)..." )
             reservation = ec2_conn.run_instances( image_id=self.app.cloud_interface.get_ami(),
                                                   min_count=1,
                                                   max_count=num_nodes,
                     self.app.cloud_interface.add_tag(instance, 'clusterName', self.app.ud['cluster_name'])
                     self.app.cloud_interface.add_tag(instance, 'role', worker_ud['role'])
                     i = Instance( self.app, inst=instance, m_state=instance.state )
+                    log.debug("Adding instance: %s" % instance)
                     self.worker_instances.append( i )
         except BotoServerError, e:
             log.error( "boto server error when starting an instance: %s" % str( e ) )
         :param pss: Persistent Storage Size associated with data volumes being 
             created for the given cluster
         """
-        if self.app.TESTFLAG is True:
+        if self.app.TESTFLAG is True and self.app.LOCALFLAG is False:
             log.debug("Attempted to initialize a new cluster of type '%s', but TESTFLAG is set." % cluster_type)
             return
         self.app.manager.initial_cluster_type = cluster_type
         log.info("Initializing a '%s' cluster." % cluster_type)
         if cluster_type == 'Galaxy':
             # Add required services:
+            
             # Static data - get snapshot IDs from the default bucket and add respective file systems
             s3_conn = self.app.cloud_interface.get_s3_connection()
             snaps_file = 'cm_snaps.yaml'
             snaps = None
             if self.app.ud.has_key('static_filesystems'):
-                snaps = self.app.ud['static_filesystems']                
+                snaps = self.app.ud['static_filesystems']
             elif misc.get_file_from_bucket(s3_conn, self.app.ud['bucket_default'], 'snaps.yaml', snaps_file):
                 snaps_file = misc.load_yaml_file(snaps_file)
                 snaps = snaps_file['static_filesystems']
                     # Check if an already attached volume maps to the current filesystem
                     att_vol = self.get_vol_if_fs(attached_volumes, snap['filesystem'])
                     if att_vol:
-                        fs.add_volume(vol_id=att_vol.id, size=att_vol.size, from_snapshot_id=att_vol.snapshot_id)
+                         fs.add_volume(vol_id=att_vol.id, size=att_vol.size, from_snapshot_id=att_vol.snapshot_id)
                     else:
-                        fs.add_volume(size=snap['size'], from_snapshot_id=snap['snap_id'])
+                         fs.add_volume(size=snap['size'], from_snapshot_id=snap['snap_id'])
                     log.debug("Adding static filesystem: '%s'" % snap['filesystem'])
                     self.services.append(fs)
-            # User data - add a new file system for user data of size 'pss'                    
-            fs_name = 'galaxyData'
-            log.debug("Creating a new data filesystem: '%s'" % fs_name)
-            fs = Filesystem(self.app, fs_name)
-            fs.add_volume(size=pss)
-            self.services.append(fs)
-            # PostgreSQL
+                    
+            #User data - add a new file system for user data of size 'pss'                    
+            if self.app.ud['cloud_type'] not in ['opennebula', 'dummy']:
+                fs_name = 'galaxyData'
+                log.debug("Creating a new data filesystem: '%s'" % fs_name)
+                fs = Filesystem(self.app, fs_name)
+                fs.add_volume(size=pss)
+                self.services.append(fs)
+            
+            #PostgreSQL
             self.services.append(PostgresService(self.app))
             # Galaxy
             self.services.append(GalaxyService(self.app))
             # service that would indicate the configuration of the service is
             # complete. This could probably be done by monitoring
             # the service state flag that is already maintained?
-            if added_srvcs:
+            if added_srvcs and self.app.ud['cloud_type'] != 'opennebula':
                 self.store_cluster_config()
             # Check and grow file system
             svcs = self.app.manager.get_services('Filesystem')
             for svc in svcs:
                 if svc.name == 'galaxyData' and svc.grow is not None:
                      self.expand_user_data_volume()
-                     self.store_cluster_config()
+                     # Opennebula has no storage like S3, so this is not working (yet)
+                     if self.app.ud['cloud_type'] != 'opennebula':
+                         self.store_cluster_config()
             # Check status of worker instances
             for w_instance in self.app.manager.worker_instances:
                 if w_instance.check_if_instance_alive() is False:
                     try:
                         ec2_conn = self.app.cloud_interface.get_ec2_connection()
                         log.debug("Instance '%s' reboot required. Rebooting now." % w_instance.id)
-                        ec2_conn.reboot_instances([w_instance.id])
+                        if self.app.ud['cloud_type'] == 'opennebula':
+                            self.app.manager.console_monitor.conn.send( 'REBOOT | %s' % w_instance.id, w_instance.id )
+                            log.info( "\tMT: Sent REBOOT message to worker '%s'" % w_instance.id )
+                        else:
+                            ec2_conn.reboot_instances([w_instance.id])
                         w_instance.reboot_required = False
                     except EC2ResponseError, e:
                         log.debug("Error rebooting instance '%s': %s" % (w_instance.id, e))
                 def do_match():
                     match = False
                     for inst in self.app.manager.worker_instances:
-                        if inst.id == m.properties['reply_to']:
+                        if str(inst.id) == str(m.properties['reply_to']):
                             match = True
                             inst.handle_message( m.body )
                     return match
         self.private_ip = None
         if inst:
             try:
-                self.id = inst.id
+                self.id = str(inst.id)
             except EC2ResponseError, e:
                 log.error( "Error retrieving instance id: %s" % e )
         self.m_state = m_state
     def get_m_state( self ):
         if self.app.TESTFLAG is True:
             return "running"
+        if self.app.ud['cloud_type'] == 'opennebula':
+            reservation = self.app.cloud_interface.get_all_instances([self.id])
+            if reservation and len(reservation[0].instances)==1:
+                instance = reservation[0].instances[0]
+                self.inst = instance
+            
         if self.inst:
             try:
                 self.inst.update()
-                state = self.inst.state
+                state = self.inst.lcm_state
                 if state != self.m_state:
                     self.m_state = state
                     self.last_m_state_change = dt.datetime.utcnow()
             except EC2ResponseError, e:
                 log.debug( "Error updating instance state: %s" % e )
+
         return self.m_state
+
     
     def send_status_check( self ):
-        # log.debug("\tMT: Sending STATUS_CHECK message" )
+        log.debug("\tMT: Sending STATUS_CHECK message" )
         if self.app.TESTFLAG is True:
             return
         self.app.manager.console_monitor.conn.send( 'STATUS_CHECK', self.id )
-        # log.debug( "\tMT: Message STATUS_CHECK sent; waiting on response" )
+        log.debug( "\tMT: Message STATUS_CHECK sent; waiting on response" )
     
     def send_worker_restart( self ):
         # log.info("\tMT: Sending restart message to worker %s" % self.id)
         log.info( "\tMT: Sent RESTART message to worker '%s'" % self.id )
     
     def check_if_instance_alive( self ):
-        # log.debug( "In '%s' state." % self.app.manager.master_state )
-        # log.debug("\tMT: Waiting on worker instance(s) to start up (wait time: %s sec)..." % (dt.datetime.utcnow() - self.last_state_change_time).seconds )
+        log.debug( "In '%s' state." % self.app.manager.master_state )
+        #log.debug("\tMT: Waiting on worker instance(s) to start up (wait time: %s sec)..." % (dt.datetime.utcnow() - self.last_state_change_time).seconds )
         state = self.get_m_state()
         
         # Somtimes, an instance is terminated by Amazon prematurely so try to catch it 
         elif state == 'pending': # Display pending instances status to console log
             log.debug( "Worker instance '%s' status: '%s' (time in this state: %s sec)" % ( self.id, state, ( dt.datetime.utcnow() - self.last_m_state_change ).seconds ) )
         else:
-            # log.debug( "Worker instance '%s' status: '%s' (time in this state: %s sec)" % ( self.id, state, ( dt.datetime.utcnow() - self.last_m_state_change ).seconds ) )
+            log.debug( "Worker instance '%s' status: '%s' (time in this state: %s sec)" % ( self.id, state, ( dt.datetime.utcnow() - self.last_m_state_change ).seconds ) )
             pass
         if self.app.TESTFLAG is True:
             return True
         return True
     
     def get_private_ip( self ):
-        #log.debug("Getting instance '%s' private IP: '%s'" % ( self.id, self.private_ip ) )
+        log.debug("Getting instance '%s' private IP: '%s'" % ( self.id, self.private_ip ) )
         return self.private_ip
     
     def send_master_pubkey( self ):
                                                                                                       self.zone, 
                                                                                                       self.type, 
                                                                                                       self.ami))
+                
                 # Instance is alive and functional. Send master pubkey.
                 self.send_master_pubkey()
+                
+                # Add hostname to /etc/hosts (for SGE config)
+                if self.app.ud['cloud_type'] == 'opennebula':
+                    f = open( "/etc/hosts", 'a' )
+                    f.write( "%s\tworker-%s\n" %  (self.private_ip, self.id))
+                    f.close()
+        
+                
+                
             elif msg_type == "WORKER_H_CERT":
                 self.is_alive = True #This is for the case that an existing worker is added to a new master.
                 self.app.manager.save_host_cert( msg.split( " | " )[1] )

cm/util/worker.py

             return "TEST_WORKERHOSTCERT"
         log.info( "Retrieving worker host certificate..." )
         w_cert_file = '/tmp/wCert.txt'
+        print '%s - sgeadmin -c "ssh-keyscan -t rsa %s > %s"' % (paths.P_SU, self.app.cloud_interface.get_fqdn(), w_cert_file)
         ret_code = subprocess.call( '%s - sgeadmin -c "ssh-keyscan -t rsa %s > %s"' % (paths.P_SU, self.app.cloud_interface.get_fqdn(), w_cert_file), shell=True )
         if ret_code == 0:
             f = open( w_cert_file, 'r' )
         log.info( "Created SGE install template as file '%s'." % SGE_config_file )
         
         log.info( "Setting up SGE..." )
+        print 'cd %s; ./inst_sge -x -noremote -auto %s' % (paths.P_SGE_ROOT, SGE_config_file)
         ret_code = subprocess.call( 'cd %s; ./inst_sge -x -noremote -auto %s' % (paths.P_SGE_ROOT, SGE_config_file), shell=True )
+
         if ret_code == 0:
             self.sge_started = 1
             log.debug( "Successfully configured SGE." )
                                                   self.app.cloud_interface.get_ami())
         self.conn.send(msg)
         log.debug( "Sending message '%s'" % msg )
-    
+
+        if self.app.ud['cloud_type'] == 'opennebula':
+            if not open('/etc/hostname').readline().startswith('worker'):
+
+                log.debug( "Configuring hostname..." )
+                f = open( "/etc/hostname", 'w' )
+                f.write( "worker-%s" % self.app.cloud_interface.get_instance_id() )
+                f.close()
+        
+                f = open( "/etc/hosts", 'w' )
+                f.write( "127.0.0.1\tlocalhost\n")
+                f.write( "%s\tubuntu\n" % (self.app.ud['master_ip']))
+                f.write( "%s\tworker-%s\n" % (self.app.cloud_interface.get_self_private_ip(), self.app.cloud_interface.get_instance_id()))
+                f.close()
+                
+                # Restart complete node or only hostname process?
+                #ret_code = subprocess.call( "/etc/init.d/hostname restart", shell=True )
+                ret_code = subprocess.call( "sudo telinit 6", shell=True )
+
+
+
     def send_worker_hostcert(self):
         host_cert = self.app.manager.get_host_cert()
         if host_cert != None:
                 self.last_state_change_time = dt.datetime.utcnow()
         elif message.startswith("STATUS_CHECK"):
             self.send_node_status()
+        elif message.startswith("REBOOT"):
+            log.info("Received reboot command")
+            ret_code = subprocess.call( "sudo telinit 6", shell=True )
         else:
             log.debug("Unknown message '%s'" % message)
     
                 #         log.info( "Stuck in state '%s' too long, reseting and trying again..." % self.app.manager.worker_status )
                 #         self.app.manager.worker_status = worker_states.INITIAL_STARTUP
                 #         self.last_state_change_time = dt.datetime.utcnow()
-                m = self.conn.recv()
+                try:
+                    m = self.conn.recv()
+                except IOError, e:
+                    if self.app.ud['cloud_type'] == 'opennebula':
+                        log.debug("Failed connecting master: %s" % e)
+                        log.debug("Trying to reboot the system")
+                        ret_code = subprocess.call( 'sudo telinit 6', shell=True )
+                    else:
+                        pass
+                    
                 while m is not None:
                     self.handle_message(m.body)
                     m = self.conn.recv()