Commits

Enis Afgan committed e6251af

Various fixes for getting the Eucalyptus support to also work with other clouds

Comments (0)

Files changed (12)

         # self.formatter = logging.Formatter("[%(levelname)s] %(module)s:%(lineno)d %(asctime)s: %(message)s")
         self.setFormatter(self.formatter)
         self.logmessages = []
-    
+
     def emit(self, record):
         self.logmessages.append(self.formatter.format(record))
-    
+
 
 class UniverseApplication( object ):
     """Encapsulates the state of a Universe application"""
         else:
             self.TESTFLAG = False
             self.logger.setLevel(logging.INFO)
-        
+
         if self.ud.has_key("localflag"):
             self.LOCALFLAG = bool(self.ud['localflag'])
             self.logger.setLevel(logging.DEBUG)
         config.configure_logging(self.config)
         log.debug( "Initializing app" )
         log.debug("Running on '{0}' type of cloud.".format(self.cloud_type))
-        
+
         # App-wide object to store messages that need to travel between the back-end
-        # and the UI. 
+        # and the UI.
         # TODO: Ideally, this should be stored some form of more persistent
         # medium (eg, database, file, session) and used as a simple module (vs. object)
         # but that's hopefully still forthcoming.
         self.msgs = messages.Messages()
-        
+
         # Check that we actually got user creds in user data and inform user
         if not ('access_key' in self.ud or 'secret_key' in self.ud):
             self.msgs.error("No access credentials provided in user data. "
             self.manager.console_monitor.start()
         else:
             log.error("************ No ROLE in %s - this is a fatal error. ************" % paths.USER_DATA_FILE)
-    
+
     def shutdown(self, delete_cluster=False):
         if self.manager:
             self.manager.shutdown(delete_cluster=delete_cluster)
-    
-    
+
+

cm/clouds/__init__.py

     # Global fields
     ec2_conn = None
     s3_conn = None
+    tags_supported = False
+    tags = {}
     # Instance details
     ami = None
     instance_type = None
     user_data = None
     aws_access_key = None
     aws_secret_key = None
-    
+
     def get_user_data(self, force=False):
-        """ Override this method in a cloud-specific interface if the 
+        """ Override this method in a cloud-specific interface if the
             default approach does not apply.
             NOTE that this method should call the set_configuration method!
-            
+
             :type force: boolean
             :param force: If set to True, reload the user data regardless if
                           the data is already stored in the class field
-                          
+
             :rtype: dict
             :return: A key-value dictionary containing the user data.
         """
             self.user_data = misc.load_yaml_file(paths.USER_DATA_FILE)
             self.set_configuration()
         return self.user_data
-    
+
     def set_configuration(self):
         """ Set the configuration fields for the given cloud interface.
             This should primarily be used to set credentials and such.
             It is expected that this method is called in the process of loading
             the user data.
-            
+
             This method should be overriden for any cloud interface that requires
             more credentials than the access credentials included in this default
             implementation.
         self.aws_access_key = self.user_data.get('access_key', None)
         self.aws_secret_key = self.user_data.get('secret_key', None)
         self.tags = {}
-    
+
     def get_configuration(self):
         """ Return a dict with all the class variables.
         """
         return vars(self)
-    
+
     # Non-implemented methods
-    
+
     def get_local_hostname(self):
         log.warning("Unimplemented")
         pass
-    
+
     def run_instances(self, num, instance_type, **kwargs):
         """ Run an image.
         """
         log.warning("Unimplemented")
         pass
-    
+
 
 
 class EC2Interface(CloudInterface):
-    
+
     def __init__(self, app=None):
         super(EC2Interface, self).__init__()
         self.app = app
+        self.tags_supported = True
         self.set_configuration()
-    
+
     def get_ami( self ):
         if self.ami is None:
             if self.app.TESTFLAG is True:
                 except IOError:
                     pass
         return self.ami
-    
+
     @TestFlag('something.good')
     def get_type( self ):
         if self.instance_type is None:
                 except IOError:
                     pass
         return self.instance_type
-    
+
     def get_instance_id( self ):
         if self.instance_id is None:
             if self.app.TESTFLAG is True:
                 except IOError:
                     pass
         return self.instance_id
-    
+
     def get_instance_object(self):
         log.debug("Getting instance object: %s" % self.instance)
         if self.instance is None:
             except Exception, e:
                 log.debug("Error retrieving instance object: {0}".format(e))
         return self.instance
-    
+
     def get_zone( self ):
         if self.zone is None:
             if self.app.TESTFLAG is True:
                 except IOError:
                     pass
         return self.zone
-    
+
     def get_security_groups( self ):
         if self.security_groups is None:
             if self.app.TESTFLAG is True:
                 except IOError:
                     pass
         return self.security_groups
-    
+
     def get_key_pair_name( self ):
         if self.key_pair_name is None:
             if self.app.TESTFLAG is True:
                 except IOError:
                     pass
         return self.key_pair_name
-    
+
     def get_private_ip( self ):
         if self.self_private_ip is None:
             if self.app.TESTFLAG is True:
                 except IOError:
                     pass
         return self.self_private_ip
-    
+
     def get_local_hostname(self):
         if self.local_hostname is None:
             if self.app.TESTFLAG is True:
                 except IOError:
                     pass
         return self.local_hostname
-    
+
     def get_public_hostname( self ):
         if self.self_public_ip is None:
             if self.app.TESTFLAG is True:
                         break
                 except Exception, e:
                     log.error ( "Error retrieving FQDN: %s" % e )
-                                
+
         return self.self_public_ip
-    
+
     def get_public_ip( self ):
         if self.self_public_ip is None:
             if self.app.TESTFLAG is True:
                         break
                 except Exception, e:
                     log.error ( "Error retrieving FQDN: %s" % e )
-                                
+
         return self.self_public_ip
-    
+
     def get_fqdn(self):
         log.debug( "Retrieving FQDN" )
         if self.fqdn == None:
             except IOError:
                 pass
         return self.fqdn
-    
+
     def get_ec2_connection( self ):
         if self.ec2_conn == None:
             try:
             except Exception, e:
                 log.error(e)
         return self.ec2_conn
-    
+
     def get_s3_connection( self ):
         # log.debug( 'Getting boto S3 connection' )
         if self.s3_conn == None:
             except Exception, e:
                 log.error(e)
         return self.s3_conn
-    
+
     def add_tag(self, resource, key, value):
         """ Add tag as key value pair to the `resource` object. The `resource`
         object must be an instance of a cloud object and support tagging.
         """
-        if not self.tags_not_supported:
+        if self.tags_supported:
             try:
                 log.debug("Adding tag '%s:%s' to resource '%s'" % (key, value, resource.id if resource.id else resource))
                 resource.add_tag(key, value)
             except EC2ResponseError, e:
                 log.error("Exception adding tag '%s:%s' to resource '%s': %s" % (key, value, resource, e))
-                self.tags_not_supported = True
+                self.tags_supported = False
         resource_tags = self.tags.get(resource.id, {})
         resource_tags[key] = value
         self.tags[resource.id] = resource_tags
-    
+
     def get_tag(self, resource, key):
         """ Get tag on `resource` cloud object. Return None if tag does not exist.
         """
         value = None
-        if not self.tags_not_supported:
+        if self.tags_supported:
             try:
                 log.debug("Getting tag '%s' on resource '%s'" % (key, resource.id))
                 value = resource.tags.get(key, None)
             except EC2ResponseError, e:
                 log.error("Exception getting tag '%s' on resource '%s': %s" % (key, resource, e))
-                self.tags_not_supported = True
+                self.tags_supported = False
         if not value:
             resource_tags = self.tags.get(resource.id,{})
             value = resource_tags.get(key)
-        return value    
-    
+        return value
+
     def run_instances(self, num, instance_type, spot_price=None, **kwargs):
         use_spot = False
         if spot_price is not None:
             self._make_spot_request(num, instance_type, spot_price, worker_ud)
         else:
             self._run_ondemand_instances(num, instance_type, spot_price, worker_ud)
-        
+
     def _run_ondemand_instances(self, num, instance_type, spot_price, worker_ud, min_num=1):
         worker_ud_str = "\n".join(['%s: %s' % (key, value) for key, value in worker_ud.iteritems()])
         log.debug("Starting instance(s) with the following command : ec2_conn.run_instances( "
-              "image_id='{iid}', min_count='{min_num}, max_count='{num}', key_name='{key}', "
+              "image_id='{iid}', min_count='{min_num}', max_count='{num}', key_name='{key}', "
               "security_groups=['{sgs}'], user_data=[{ud}], instance_type='{type}', placement='{zone}')"
               .format(iid=self.get_ami(), min_num=min_num, num=num, key=self.get_key_pair_name(), \
               sgs=", ".join(self.get_security_groups()), ud=worker_ud_str, type=instance_type, \
             log.error( err )
             return False
         log.debug( "Started %s instance(s)" % num )
-    
+
     def _make_spot_request(self, num, instance_type, price, worker_ud):
         worker_ud_str = "\n".join(['%s: %s' % (key, value) for key, value in worker_ud.iteritems()])
         log.debug("Making a Spot request with the following command: "
         except Exception, e:
             log.error("An error when making a spot request: {0}".format(e))
             return False
-    
+
     def terminate_instance(self, instance_id, spot_request_id=None):
         inst_terminated = request_canceled = True
         if instance_id is not None:
         if spot_request_id is not None:
             request_canceled = self._cancel_spot_request(spot_request_id)
         return (inst_terminated and request_canceled)
-        
+
     def _terminate_instance(self, instance_id):
         ec2_conn = self.get_ec2_connection()
         try:
                 return True
             else:
                 log.error("EC2 exception terminating instance '%s': %s" % (instance_id, e))
-        except Exception, e:
-            log.error("Exception terminating instance %s: %s" % (instance_id, e))
+        except Exception, ex:
+            log.error("Exception terminating instance %s: %s" % (instance_id, ex))
         return False
-    
+
     def _cancel_spot_request(self, request_id):
         ec2_conn = self.get_ec2_connection()
         try:
         except EC2ResponseError, e:
             log.error("Trouble cancelling spot request {0}: {1}".format(request_id, e))
             return False
-    
+
     def _compose_worker_user_data(self):
-        """ Compose worker instance user data.
+        """
+        Compose worker instance user data.
         """
         worker_ud = {}
         worker_ud['role'] = 'worker'
         worker_ud = dict(self.app.ud.items() + worker_ud.items())
         return worker_ud
 
-    def get_all_volumes(self,volume_ids=None, filters=None):
+    def get_all_volumes(self, volume_ids=None, filters=None):
+        """
+        Get all Volumes associated with the current credentials.
+
+        :type volume_ids: list
+        :param volume_ids: Optional list of volume IDs.  If this list
+                           is present, only the volumes associated with
+                           these volume IDs will be returned.
+
+        :type filters: dict
+        :param filters: Optional filters that can be used to limit
+                        the results returned.  Filters are provided
+                        in the form of a dictionary consisting of
+                        filter names as the key and filter values
+                        as the value. The set of allowable filter
+                        names/values is dependent on the request
+                        being performed. Check the EC2 API guide
+                        for details.
+
+        :rtype: list of :class:`boto.ec2.volume.Volume`
+        :return: The requested Volume objects
+        """
+        if not isinstance(volume_ids, list):
+            volume_ids = [volume_ids]
         return self.get_ec2_connection().get_all_volumes(volume_ids=volume_ids, filters=filters)
-    
-    def get_all_instances(self,instance_ids=None,filters=None):
-        return self.get_ec2_connection().get_all_instances(instance_ids=instance_ids,filters=filters)
+
+    def get_all_instances(self, instance_ids=None, filters=None):
+        """
+        Retrieve all the instances associated with current credentials.
+
+        :type instance_ids: list
+        :param instance_ids: Optional list of strings of instance IDs.
+                             If this list if present, only the instances
+                             associated instance IDs will be returned.
+
+        :type filters: dict
+        :param filters: Optional filters that can be used to limit the
+                        results returned. Filters are provided in the form of a
+                        dictionary consisting of filter names as the key and
+                        filter values as the value. The set of allowable filter
+                        names/values is dependent on the request being performed.
+                        Check the EC2 API guide for details.
+
+        :rtype: list
+        :return: A list of  :class:`boto.ec2.instance.Reservation`
+        """
+        if not isinstance(instance_ids, list):
+            instance_ids = [instance_ids]
+        return self.get_ec2_connection().get_all_instances(instance_ids=instance_ids, filters=filters)

cm/clouds/eucalyptus.py

     pass
 
 def stacktrace_wrapper(old_method):
-            
+
     def _w(self,*args,**kwargs):
         try:
             raise DebugException()
             log.debug('get_all_instances() called. Trace follows...')
             traceback.print_stack()
         return old_method(self,*args,**kwargs)
-    
+
     return _w
 
 # EC2Connection.get_all_instances = stacktrace_wrapper(EC2Connection.get_all_instances)
-            
+
 
 
 import logging
     """
         A specialization of cm.clouds.ec2 allowing use of private Eucalyptus clouds.
         User data should also include keys s3_url and ec2_url to declare where the connection
-        should be made to.    
+        should be made to.
     """
     def __init__(self, app=None):
         if not app:
             app = _DummyApp()
         super(EucaInterface, self).__init__()
         self.app = app
-        self.tags_not_supported = True
-        self.tags = {}
+        self.tags_supported = False
         self.local_hostname = None
         self.public_hostname = None
         self._min_boto_delay = 2
                     zone = self.get_zone()
                     region = RegionInfo(name=zone,endpoint=host)
                     self.ec2_conn = EC2Connection(
-                                             aws_access_key_id = self.aws_access_key, 
+                                             aws_access_key_id = self.aws_access_key,
                                              aws_secret_access_key = self.aws_secret_key,
                                              is_secure = is_secure,
                                              host = host,
                         self.ec2_conn = False
                 except Exception, e:
                     log.error(e) # to match interface for Ec2Interface
-                    
+
             else:
                 super(EucaInterface,self).get_ec2_connection() # default to ec2 connection
         return self.ec2_conn
 
-    
+
     def get_s3_connection(self):
         log.debug( 'Getting boto S3 connection' )
         if self.s3_conn == None:
                 if len(r) == 4:
                     log.debug('local hostname ({0}) appears to be an IP address.'.format(self.local_hostname))
                     self.local_hostname = None
-                    
+
             if not self.local_hostname:
                 log.debug('Fetching hostname from local system hostname()')
                 self.local_hostname = socket.gethostname()
         return self.local_hostname
-        
+
     def get_public_hostname(self):
         # Eucalyptus meta-data/public-hostname return the IP address. Fake it, assuming that it will be ip-NUM-NUM-NUM-NUM
         super(EucaInterface,self).get_public_hostname()
                 r = filter(lambda x: int(x) < 256 and x > 0, toks)
                 if len(r) == 4:
                     self.public_hostname = None
-                    
+
             if not self.public_hostname:
                 log.debug('Faking local hostname based on IP address {0}'.format('.'.join(toks)))
                 self.public_hostname = 'ip-%s' % '-'.join(toks)
         self._run_ondemand_instances(num, instance_type, spot_price, worker_ud, min_num=num) # eucalyptus only starts min_num instances
 
     def get_all_instances(self,instance_ids=None, filters=None):
-            
+
         if isinstance(instance_ids,basestring):
             instance_ids=(instance_ids,)
             cache_key = instance_ids
             cache_key = ','.join(instance_ids)
         else:
             cache_key = ''
-            
+
         # eucalyptus stops responding if you check the same thing too often
         if self._last_instance_check and cache_key in self._instances and time.time() <= self._last_instance_check + self._min_boto_delay:
             log.debug('Using cached instance information for {0}'.format(str(instance_ids)))
                 else:
                     log.error('Could not filter instance on unknown filter key {0}'.format(key))
         res = [ i for i in reservations if i not in excluded]
-        
+
         return res
-    
+
     def get_all_volumes(self,volume_ids=None, filters=None):
         # eucalyptus does not allow filters in get_all_volumes
         if isinstance(volume_ids,basestring):
         vols = [v for v in volumes if v not in excluded_vols]
         # log.debug("(get_all_volumes) Returning volumes: {0}".format(vols))
         return vols
-            
+

cm/clouds/openstack.py

 
 
 class OSInterface(EC2Interface):
-    
+
     def __init__(self, app=None):
         super(OSInterface, self).__init__()
         self.app = app
+        self.tags_supported = False
         self.set_configuration()
-    
+
     def set_configuration(self):
         super(OSInterface, self).set_configuration()
         # Cloud details gotten from the user data
         self.s3_port = self.user_data.get('s3_port', None)
         self.s3_conn_path = self.user_data.get('s3_conn_path', None)
         self.calling_format = OrdinaryCallingFormat()
-    
+
     def get_ec2_connection( self ):
         if self.ec2_conn == None:
             try:
             except Exception, e:
                 log.error(e)
         return self.ec2_conn
-    
+
     def _get_default_ec2_conn(self, region=None):
         ec2_conn = None
         try:
         except EC2ResponseError, e:
             log.error("Trouble creating a Nova connection: {0}".format(e))
         return ec2_conn
-    
+
     def get_s3_connection(self):
         # TODO: Port this use_object_store logic to other clouds as well
         if self.app and not self.app.use_object_store:
             except Exception, e:
                 log.error("Trouble creating a Swift connection: {0}".format(e))
         return self.s3_conn
-    
+
     def get_public_ip( self ):
         """ NeCTAR's public & private IPs are the same and also local-ipv4 metadata filed
             returns empty so do some monkey patching.
                     log.error ( "Error retrieving FQDN: %s" % e )
 
         return self.self_public_ip
-    
-    
+
+
     def add_tag(self, resource, key, value):
         log.debug("Would add tag {key}:{value} to resource {resource} but OpenStack does not " \
             "support tags".format(key=key, value=value, resource=resource))
         pass
-    
+
     def get_tag(self, resource, key):
         log.debug("Would get tag {key} from resource {resource} but OpenStack does not support tags" \
             .format(key=key, resource=resource))
         pass
-    
+

cm/services/apps/postgres.py

 
     def remove(self):
         log.info("Removing '%s' service" % self.svc_type)
-        self.state = service_states.SHUTTING_DOWN
+        # self.state = service_states.SHUTTING_DOWN
         self.manage_postgres(False)
 
     def manage_postgres( self, to_be_started=True ):

cm/services/apps/sge.py

                 '%s/bin/lx24-amd64/qconf -Mhgrp %s' \
                 % (paths.P_SGE_ROOT, paths.P_SGE_ROOT, ah_file), \
                 "Problems updating @allhosts aimed at adding '%s'" % inst_id, \
-                "Successfully updated @allhosts to add '%s' with IP '%s'" % (inst_id, inst_private_ip)):
+                "Successfully updated @allhosts to add '%s' with address '%s'" % (inst_id, inst_private_ip)):
                 error = True
         else:
             log.info("Instance '%s' IP is already in SGE's @allhosts" % inst_id)

cm/services/data/filesystem.py

         return None
 
     def check_and_update_volume(self, device):
-        f = {'attachment.device': device, 'attachment.instance-id': self.app.cloud_interface.get_instance_id()}
-        vols = self.app.cloud_interface.get_all_volumes(filters=f)
+        # TODO: Abstract filtering into the cloud interface classes
+        if self.app.cloud_type == "ec2":
+            # filtering w/ boto is supported only with ec2
+            f = {'attachment.device':device, 'attachment.instance-id':self.app.cloud_interface.get_instance_id()}
+            vols = self.app.cloud_interface.get_ec2_connection().get_all_volumes(filters=f)
+        else:
+            vols = []
+            all_vols = self.app.cloud_interface.get_ec2_connection().get_all_volumes()
+            for vol in all_vols:
+                if vol.attach_data.instance_id == self.app.cloud_interface.get_instance_id() and \
+                   vol.attach_data.device == device:
+                       vols.append(vol)
         if len(vols) == 1:
             att_vol = vols[0]
             for vol in self.volumes: # Currently, bc. only 1 vol can be assoc w/ FS, we'll only deal w/ 1 vol

cm/services/data/volume.py

             return "No volume ID yet; {0} ({1})".format(self.from_snapshot_id, self.fs.get_full_name())
 
     def get_full_name(self):
+        """
+        Returns a string specifying the volume ID and name of the file system
+        this volume belongs to.
+        """
         return "{vol} ({fs})".format(vol=self.volume_id, fs=self.fs.get_full_name())
 
     def _get_details(self, details):
         return details
 
     def update(self, vol_id):
-        """ switch to a different boto.ec2.volume.Volume """
+        """
+        Switch to a different boto.ec2.volume.Volume
+        """
         log.debug('vol_id = {0} ({1})'.format(vol_id, type(vol_id)))
-        if isinstance(vol_id,basestring):
-            vols = self.app.cloud_interface.get_all_volumes( volume_ids=(vol_id, ))
+        if isinstance(vol_id, basestring):
+            vols = self.app.cloud_interface.get_all_volumes(volume_ids=(vol_id, ))
             if not vols:
                 log.error('Attempting to connect to a non-existent volume {0}'.format(vol_id))
             vol = vols[0]
 
     @property
     def volume_id(self):
+        """
+        Returns the cloud ID for this volume.
+        """
         if self.volume:
             return self.volume.id
         else:
             return None
-    
+
     @property
     def attach_device(self):
+        """
+        Returns the device this volume is attached as, as reported by the cloud middleware.
+        """
         if self.volume:
+            self.volume.update()
             return self.volume.attach_data.device
         else:
             return None
-    
+
     @property
     def status(self):
+        """
+        Returns the current status of this volume, as reported by the cloud middleware.
+        """
         if not self.volume:
             # no volume active
             status = volume_status.NONE
                 status = volume_status.NONE
         return status
 
-    def wait_for_status(self,status,timeout=120):
-        """Wait for timeout seconds, or until the volume reaches a desired status
-        Returns false if it never hit the request status before timeout"""
+    def wait_for_status(self, status, timeout=-1):
+        """
+        Wait for ``timeout`` seconds, or until the volume reaches a desired status
+        Returns ``False`` if it never hit the request status before timeout.
+        If ``timeout`` is set to ``-1``, wait until the desired status is reached.
+        Note that this may potentially be forever.
+        """
         if self.status == volume_status.NONE:
             log.debug('Attempted to wait for a status ({0} ) on a non-existent volume'.format(status))
             return False # no volume means not worth waiting
         else:
-            start_time=time.time()
-            checks = 10
+            start_time = time.time()
             end_time = start_time + timeout
-            wait_time = float(timeout)/checks
-            while time.time() <= end_time:
+            if timeout == -1:
+                checks = "infinite"
+                wait_time = 5
+                wait_forever = True
+            else:
+                checks = 10
+                wait_time = float(timeout)/checks
+                wait_forever = False
+            while wait_forever or time.time() <= end_time:
                 if self.status == status:
-                    log.debug('Volume {0} has reached status {1}'.format(self.volume_id,status))
+                    log.debug('Volume {0} ({1}) has reached status {1}'.format(self.volume_id,
+                        self.fs.get_full_name(), status))
                     return True
                 else:
-                    log.debug('Waiting for volume {0} (status {1}) to reach status {2}. Remaining checks: {3}'.format(self.volume_id,self.status,status,checks))
-                    checks -= 1
+                    log.debug('Waiting for volume {0} (status "{1}"; {2}) to reach status "{3}". '
+                        'Remaining checks: {4}'.format(self.volume_id, self.status,
+                        self.fs.get_full_name(), status, checks))
+                    if timeout != -1:
+                        checks -= 1
                     time.sleep(wait_time)
-            log.debug('Wait for volume {0} to reach status {1} timed out. Current status {2}'.format(self.volume_id,status,self.status))
+            log.debug('Wait for volume {0} ({1}) to reach status {2} timed out. Current status {3}.'\
+                .format(self.volume_id, self.fs.get_full_name(), status, self.status))
             return False
 
-
     def create(self, filesystem=None):
+        """
+        Create a new volume.
+        """
         if not self.size and not self.from_snapshot_id:
             log.error('Cannot add a volume without a size or snapshot ID')
             return None
-        
+
         if self.status == volume_status.NONE:
             try:
                 log.debug("Creating a new volume of size '%s' in zone '%s' from snapshot '%s'" % (self.size, self.app.cloud_interface.get_zone(), self.from_snapshot_id))
             log.error("Error adding tags to volume: %s" % e)
 
     def delete(self):
+        """
+        Delete this volume.
+        """
         try:
             volume_id = self.volume_id
             self.volume.delete()
     # attachment helper methods
 
     def _get_device_list(self):
+        """
+        Get a list of system devices as an iterable list of strings.
+        """
         return frozenset(glob('/dev/*d[a-z]'))
 
-    def _increment_device_id(self,device_id):
-        new_id = device_id[0:-1] + chr(ord(device_id[-1])+1)
+    def _increment_device_id(self, device_id):
+        """
+        Increment the system ``device_id`` to the next letter in alphabet.
+        For example, providing ``/dev/vdc`` as the ``device_id``,
+        returns ``/dev/vdd``.
+
+        There is an AWS-specific customization in this method; namely, AWS
+        allows devices to be attached as /dev/sdf through /dev/sdp
+        Subsequently, those devices are visible under /dev/xvd[f-p]. So, if
+        ``/dev/xvd?`` is provided as the ``device_id``, replace the device
+        base with ``/dev/sd`` and ensure the last letter of the device is
+        never smaller than ``f``. For example, given ``/dev/xvdb`` as the
+        ``device_id``, return ``/dev/sdf``; given ``/dev/xvdg``, return
+        ``/dev/sdh``.
+        """
+        base = device_id[0:-1]
+        letter = device_id[-1]
+        # AWS-specific munging
+        if base == '/dev/xvd':
+            base = '/dev/sd'
+        if letter < 'f':
+            letter = 'e'
+        # Get the next device in line
+        new_id = base + chr(ord(letter)+1)
         return new_id
 
-    def _get_likely_next_devices(self,devices=None):
-        """Returns a list of possible devices to attempt to attempt to connect to.
-        If using virtio, then the devices get attached as /dev/vd?.
-        Newer ubuntu kernels might get devices attached as /dev/xvd?.
-        Otherwise, it'll probably get attached as /dev/sd?
-        If either /dev/vd? or /dev/xvd? devices exist, then we know to use the next of those.
-        Otherwise, test /dev/sd?, /dev/xvd?, then /dev/vd?
-        This is so totally not thread-safe. If other devices get attached externally, the device id may already be in use when we get there."""
+    def _get_likely_next_devices(self, devices=None):
+        """
+        Returns a list of possible devices to attempt to attempt to attach to.
+
+        If using virtio, then the devices get attached as ``/dev/vd?``.
+        Newer Ubuntu kernels might get devices attached as ``/dev/xvd?``.
+        Otherwise, it'll probably get attached as ``/dev/sd?``
+        If either ``/dev/vd?`` or ``/dev/xvd?`` devices exist, then we know to
+        use the next of those. Otherwise, test ``/dev/sd?``, ``/dev/xvd?``, then ``/dev/vd?``.
+
+        **This is so totally not thread-safe.** If other devices get attached externally,
+        the device id may already be in use when we get there.
+        """
         if not devices:
             devices = self._get_device_list()
         device_map = map(lambda x:(x.split('/')[-1], x), devices)  # create a dict of id:/dev/id from devices
         elif xvds:
             return ( self._increment_device_id(xvds[-1] ), )
         elif sds:
-            return ( self._increment_device_id( sds[-1] ), '/dev/vda', '/dev/xvda' ) 
+            return ( self._increment_device_id( sds[-1] ), '/dev/vda', '/dev/xvda' )
         else:
             log.error("Could not determine next available device from {0}".format(devices))
             return None
 
     def _do_attach(self, attach_device):
+        """
+        Do the actual process of attaching this volume to the current instance.
+        Returns the current status of the volume.
+        """
         try:
             if attach_device is not None:
-                log.debug("Attaching volume '%s' to instance '%s' as device '%s'" % (self.volume_id,  self.app.cloud_interface.get_instance_id(), attach_device))
-                self.volume.attach(self.app.cloud_interface.get_instance_id(),attach_device)
+                log.debug("Attaching volume '%s' to instance '%s' as device '%s'" %
+                    (self.volume_id, self.app.cloud_interface.get_instance_id(), attach_device))
+                self.volume.attach(self.app.cloud_interface.get_instance_id(), attach_device)
             else:
                 log.error("Attaching volume '%s' to instance '%s' failed because could not determine device." % (self.volume_id,  self.app.cloud_interface.get_instance_id()))
                 return False
         Try it for some time.
         Returns the attached device path, or None if it can't attach
         """
-        log.debug('Starting Volume.attach. volume {0}'.format(self.volume_id,self.app.cloud_interface.get_instance_id()))
-        # bail if the volume is doesn't exist, or is already attached
+        log.debug('Starting volume.attach for volume {0} ({1})'.format(self.volume_id,
+            self.fs.get_full_name()))
+        # Bail if the volume doesn't exist, or is already attached
         if self.status == volume_status.NONE or self.status == volume_status.DELETING:
             log.error('Attempt to attach non-existent volume {0}'.format(self.volume_id))
             return None
             log.debug('Volume {0} already attached')
             return self.device
 
-        # wait for the volume to become available
+        # Wait for the volume to become available
         if self.from_snapshot_id and  self.status == volume_status.CREATING:
             # Eucalyptus can take an inordinate amount of time to create a volume from a snapshot
-            log.warning("Waiting for volume to be created from a snapshot...") 
-            if not self.wait_for_status(volume_status.AVAILABLE,timeout=600): 
+            log.warning("Waiting for volume to be created from a snapshot...")
+            if not self.wait_for_status(volume_status.AVAILABLE,timeout=600):
                 log.error('Volume never reached available from creating status. Status is {0}'.format(self.status))
         elif self.status != volume_status.AVAILABLE:
-            if not self.wait_for_status(volume_status.AVAILABLE, timeout=60):
+            if not self.wait_for_status(volume_status.AVAILABLE):
                 log.error('Volume never became available to attach. Status is {0}'.format(self.status))
                 return None
 
             log.debug('Before attach, devices = {0}'.format(' '.join(pre_devices)))
             if self._do_attach(attempted_device):
                 if self.wait_for_status(volume_status.ATTACHED):
-                    time.sleep(30) # give a few seconds for the device to show up in the OS
+                    time.sleep(10) # give a few seconds for the device to show up in the OS
                     post_devices = self._get_device_list()
                     log.debug('After attach, devices = {0}'.format(' '.join(post_devices)))
                     new_devices = post_devices - pre_devices
         return True
 
     def snapshot(self, snap_description=None):
+        """
+        Crete a point-in-time snapshot of the current volume, optionally specifying
+        a description for the snapshot.
+        """
         log.info("Initiating creation of a snapshot for the volume '%s'" % self.volume_id)
         try:
             snapshot = self.volume.create_snapshot(description=snap_description)
             return None
 
     def get_from_snap_id(self):
+        """
+        Returns the ID of the snapshot this volume was created from, ``None``
+        of the volume was not created from a snapshot.
+        """
         return self.from_snapshot_id
 
     def add(self):

cm/util/master.py

                     if fs['kind'] == 'volume':
                         if 'ids' not in fs and 'size' in fs:
                             filesystem.add_volume(size=fs['size'])
-                        else:    
+                        else:
                             for vol_id in fs['ids']:
                                 filesystem.add_volume(vol_id=vol_id)
                     elif fs['kind'] == 'snapshot':
         """
         log.debug("Trying to discover any volumes attached to this instance...")
         attached_volumes = []
+        # TODO: Abstract filtering into the cloud interface classes
         try:
-            f = {'attachment.instance-id': self.app.cloud_interface.get_instance_id()}
-            volumes = self.app.cloud_interface.get_all_volumes(filters=f)
+            if self.app.cloud_type == 'ec2':
+                # filtering w/ boto is supported only with ec2
+                f = {'attachment.instance-id': self.app.cloud_interface.get_instance_id()}
+                attached_volumes = self.app.cloud_interface.get_ec2_connection().get_all_volumes(filters=f)
+            else:
+                volumes = self.app.cloud_interface.get_ec2_connection().get_all_volumes()
+                for vol in volumes:
+                    if vol.attach_data.instance_id == self.app.cloud_interface.get_instance_id():
+                        attached_volumes.append(vol)
         except EC2ResponseError, e:
             log.debug( "Error checking for attached volumes: %s" % e )
-        log.debug("Attached volumes: %s" % volumes)
-        return volumes
+        log.debug("Attached volumes: %s" % attached_volumes)
+        return attached_volumes
 
     def shutdown(self, sd_galaxy=True, sd_sge=True, sd_postgres=True, sd_filesystems=True,
                 sd_instances=True, sd_autoscaling=True, delete_cluster=False, sd_spot_requests=True,
     def delete_cluster(self):
         log.info("All services shut down; deleting this cluster.")
         # Delete any remaining volume(s) assoc. w/ given cluster
-        filters = {'tag:clusterName': self.app.ud['cluster_name']}
         try:
-            vols = self.app.cloud_interface.get_all_volumes(filters=filters)
-            for vol in vols:
-                log.debug("As part of cluster deletion, deleting volume '%s'" % vol.id)
-                vol.delete()
+            # TODO: Fix filtering for non-ec2 clouds
+            if self.app.cloud_type == 'ec2':
+                filters = {'tag:clusterName': self.app.ud['cluster_name']}
+                vols = self.app.cloud_interface.get_all_volumes(filters=filters)
+                for vol in vols:
+                    log.debug("As part of cluster deletion, deleting volume '%s'" % vol.id)
+                    vol.delete()
         except EC2ResponseError, e:
-            log.error("Error deleting volume %s: %s" % (vol.id, e))
+            log.error("Error deleting a volume: %s" % e)
         # Delete cluster bucket on S3
         s3_conn = self.app.cloud_interface.get_s3_connection()
         if s3_conn:
                         # (print all lines except the one w/ instance IP back to the file)
                         if not inst.private_ip in line:
                             print line
-                inst.terminate()
+                try:
+                    inst.terminate()
+                except EC2ResponseError, e:
+                    log.error("Trouble terminating instance '{0}': {1}".format(instance_id, e))
         log.info("Initiated requested termination of instance. Terminating '%s'." % instance_id)
 
     def reboot_instance(self, instance_id=''):
         self.monitor_thread = threading.Thread( target=self.__monitor )
 
     def start( self ):
+        """
+        Start the monitor thread, which monitors and manages all the services
+        visible to CloudMan.
+        """
         self.last_state_change_time = dt.datetime.utcnow()
         if not self.app.TESTFLAG:
             # Set 'role' and 'clusterName' tags for the master instance
                 ir = self.app.cloud_interface.get_all_instances(i_id)
                 self.app.cloud_interface.add_tag(ir[0].instances[0], 'clusterName', self.app.ud['cluster_name'])
                 self.app.cloud_interface.add_tag(ir[0].instances[0], 'role', self.app.ud['role'])
-            except EC2ResponseError, e:
-                log.debug("Error setting 'role' tag: %s" % e)
+            except Exception, e:
+                log.debug("Error setting tags on the master instance: %s" % e)
         self.monitor_thread.start()
 
     def shutdown( self ):
                                'tool_data_table_conf.xml',
                                'shed_tool_conf.xml',
                                'datatypes_conf.xml']:
-                    if (not misc.file_exists_in_bucket(s3_conn, self.app.ud['bucket_cluster'], '%s.cloud' % f_name) and os.path.exists(os.path.join(paths.P_GALAXY_HOME, f_name))) or \
+                    if (os.path.exists(os.path.join(paths.P_GALAXY_HOME, f_name))) or \
                        (misc.file_in_bucket_older_than_local(s3_conn, self.app.ud['bucket_cluster'], '%s.cloud' % f_name, os.path.join(paths.P_GALAXY_HOME, f_name))):
                         log.debug("Saving current Galaxy configuration file '%s' to cluster bucket '%s' as '%s.cloud'" % (f_name, self.app.ud['bucket_cluster'], f_name))
                         misc.save_file_to_bucket(s3_conn, self.app.ud['bucket_cluster'], '%s.cloud' % f_name, os.path.join(paths.P_GALAXY_HOME, f_name))
 
     @TestFlag(None)
     def send_alive_request(self):
-        self.app.manager.console_monitor.conn.send( 'ALIVE_REQUEST', self.id 
+        self.app.manager.console_monitor.conn.send( 'ALIVE_REQUEST', self.id
                                                     )
     def send_status_check( self ):
         # log.debug("\tMT: Sending STATUS_CHECK message" )
     set ``galaxy_home: /my/custom/path/to/galaxy`` in the user data. If the custom
     path is not provided, just return the ``default_path``.
     """
+    path = None
     try:
         path = misc.load_yaml_file(USER_DATA_FILE).get(name, None)
         if path is None:
 #!/usr/bin/env python
 """
-Requires: 
+Requires:
     PyYAML http://pyyaml.org/wiki/PyYAMLDocumentation (easy_install pyyaml)
     boto http://code.google.com/p/boto/ (easy_install boto)
 """
     try:
         b = s3_conn.get_bucket(bucket_name)
         k = Key(b, remote_filename)
-    
+
         log.debug("Attempting to retrieve file '%s' from bucket '%s'" % (remote_filename, bucket_name))
         if k.exists():
             k.get_contents_to_filename(local_filename)
     log.info("<< Starting nginx >>")
     # Because nginx needs the uplaod directory to start properly, create it now.
     # However, because user data will be mounted after boot and because given
-    # directory already exists on the user's data disk, must remove it after 
+    # directory already exists on the user's data disk, must remove it after
     # nginx starts
-    # In case an nginx configuration file different than the one on the image needs to be included 
+    # In case an nginx configuration file different than the one on the image needs to be included
     # local_nginx_conf_file = '/opt/galaxy/conf/nginx.conf'
     # url = 'http://userwww.service.emory.edu/~eafgan/content/nginx.conf'
     # log.info("Getting nginx conf file (using wget) from '%s' and saving it to '%s'" % (url, local_nginx_conf_file))
         _run('/etc/init.d/apache2 stop')
         _run('/etc/init.d/tntnet stop') # On Ubuntu 12.04, this server also starts?
         _run('/opt/galaxy/sbin/nginx')
-    if rmdir: 
+    if rmdir:
         _run('rm -rf /mnt/galaxyData')
 
 def _write_conf_file(contents_descriptor, path):
     else:
         log.info("Writing out configuration file encoded in user-data:")
         with open(path, "w") as output:
-            output.write(base64.b64decode(contents_descriptor))    
+            output.write(base64.b64decode(contents_descriptor))
 
 def _configure_nginx(ud):
     # User specified nginx.conf file, can be specified as
     secret_key = ud['secret_key']
 
     s3_url = ud.get('s3_url', AMAZON_S3_URL)
-    if not 's3_host' in ud: # assuming url implies workalikes
+    cloud_type = ud.get('cloud_type', 'ec2')
+    if cloud_type in ['ec2', 'eucalyptus']:
         if s3_url == AMAZON_S3_URL:
             log.info('connecting to Amazon S3 at {0}'.format(s3_url))
         else:
         log.debug('Got boto S3 connection')
     except BotoServerError as e:
         log.error("Exception getting S3 connection; {0}".format(e))
-        
+
     return s3_conn
 
 def _get_cm(ud):
         url = os.path.join(ud['s3_url'], default_bucket_name, CM_REMOTE_FILENAME)
         if _run('wget --output-document=%s %s' % (local_cm_file, url)):
             log.info("Retrieved Cloudman by url from '%s' and saving it to '%s'" % (url, local_cm_file))
-            return True 
+            return True
     # ELSE Default to public Amazon repo and wget via URL
     url = os.path.join(AMAZON_S3_URL, default_bucket_name, CM_REMOTE_FILENAME)
     log.info("Attempting to retrieve from Amazon S3 from %s" % (url))
     return _run('wget --output-document=%s %s' % (local_cm_file, url))
 
 def _write_cm_revision_to_file(s3_conn, bucket_name):
-    """ Get the revision number associated with the CM_REMOTE_FILENAME and save 
+    """ Get the revision number associated with the CM_REMOTE_FILENAME and save
     it locally to CM_REV_FILENAME """
     with open(os.path.join(CM_HOME, CM_REV_FILENAME), 'w') as rev_file:
         rev = _get_file_metadata(s3_conn, bucket_name, CM_REMOTE_FILENAME, 'revision')
             rev_file.write('9999999')
 
 def _get_file_metadata(conn, bucket_name, remote_filename, metadata_key):
-    """ Get metadata value for the given key. If bucket or remote_filename is not 
-    found, the method returns None.    
+    """
+    Get ``metadata_key`` value for the given key. If ``bucket_name`` or
+    ``remote_filename`` is not found, the method returns ``None``.
     """
     log.debug("Getting metadata '%s' for file '%s' from bucket '%s'" % (metadata_key, remote_filename, bucket_name))
     b = None
         try:
             b = conn.get_bucket( bucket_name )
             break
-        except S3ResponseError: 
+        except S3ResponseError:
             log.debug ( "Bucket '%s' not found, attempt %s/5" % ( bucket_name, i+1 ) )
             time.sleep(2)
     if b is not None:
         k = b.get_key(remote_filename)
         if k and metadata_key:
             return k.get_metadata(metadata_key)
-    return None 
+    return None
 
 def _unpack_cm():
     local_path = os.path.join(CM_HOME, CM_LOCAL_FILENAME)