Commits

Enis Afgan committed 3ef1ff7 Merge

Merged in razrichter/cloudman-eucalyptus (pull request #16)

  • Participants
  • Parent commits 34fbf0a, 804e1ea

Comments (0)

Files changed (20)

File cm/app.py

File contents unchanged.

File cm/clouds/__init__.py

             self.get_user_data()
         self.aws_access_key = self.user_data.get('access_key', None)
         self.aws_secret_key = self.user_data.get('secret_key', None)
+        self.tags = {}
     
     def get_configuration(self):
         """ Return a dict with all the class variables.

File cm/clouds/cloud_config.py

 from cm.clouds.openstack import OSInterface
 from cm.clouds.opennebula import ONInterface
 from cm.clouds.dummy import DummyInterface
+from cm.clouds.eucalyptus import EucaInterface
 
 class CloudConfig(object):
     """ Configuration class that is used as a mediator for support between
             cloud_interface = ONInterface(app=self.app)
         elif cloud_type == 'dummy':
             cloud_interface = DummyInterface(app=self.app)
+        elif cloud_type == 'euca' or cloud_type.lower()=='eucalyptus':
+            cloud_interface = EucaInterface(app=self.app)
         return cloud_interface
         
     

File cm/clouds/dummy.py

     def get_key_pair_name( self ):
         pass
     
-    def get_self_private_ip( self ):
+    def get_private_ip( self ):
         return self._getIpAddress('eth0')
     
-    def get_self_public_ip( self ):
+    def get_public_ip( self ):
         pass
     
     def get_fqdn(self):

File cm/clouds/ec2.py

             i_id = self.get_instance_id()
             ec2_conn = self.get_ec2_connection()
             try:
-                ir = ec2_conn.get_all_instances([i_id])
+                ir = ec2_conn.get_all_instances(i_id)
                 self.instance = ir[0].instances[0]
             except EC2ResponseError, e:
                 log.debug("Error getting instance object: {0}".format(e))
                     pass
         return self.key_pair_name
     
-    def get_self_private_ip( self ):
+    def get_private_ip( self ):
         if self.self_private_ip is None:
             if self.app.TESTFLAG is True:
                 log.debug("Attempted to get key pair name, but TESTFLAG is set. Returning '127.0.0.1'")
                     pass
         return self.local_hostname
     
-    def get_self_public_hostname( self ):
+    def get_public_hostname( self ):
         if self.self_public_ip is None:
             if self.app.TESTFLAG is True:
                 log.debug("Attempted to get public hostname, but TESTFLAG is set. Returning '127.0.0.1'")
                                 
         return self.self_public_ip
     
-    def get_self_public_ip( self ):
+    def get_public_ip( self ):
         if self.self_public_ip is None:
             if self.app.TESTFLAG is True:
                 log.debug("Attempted to get public IP, but TESTFLAG is set. Returning '127.0.0.1'")
         """ Add tag as key value pair to the `resource` object. The `resource`
         object must be an instance of a cloud object and support tagging.
         """
-        try:
-            log.debug("Adding tag '%s:%s' to resource '%s'" \
-                % (key, value, resource.id if resource.id else resource))
-            resource.add_tag(key, value)
-        except EC2ResponseError, e:
-            log.error("Exception adding tag '%s:%s' to resource '%s': %s" % (key, value, resource, e))
+        if not self.tags_not_supported:
+            try:
+                log.debug("Adding tag '%s:%s' to resource '%s'" % (key, value, resource.id if resource.id else resource))
+                resource.add_tag(key, value)
+            except EC2ResponseError, e:
+                log.error("Exception adding tag '%s:%s' to resource '%s': %s" % (key, value, resource, e))
+                self.tags_not_supported = True
+        resource_tags = self.tags.get(resource.id, {})
+        resource_tags[key] = value
+        self.tags[resource.id] = resource_tags
     
     def get_tag(self, resource, key):
         """ Get tag on `resource` cloud object. Return None if tag does not exist.
         """
-        try:
-            log.debug("Getting tag '%s' on resource '%s'" % (key, resource.id))
-            return resource.tags.get(key, None)
-        except EC2ResponseError, e:
-            log.error("Exception getting tag '%s' on resource '%s': %s" % (key, resource, e))
-            return None
+        value = None
+        if not self.tags_not_supported:
+            try:
+                log.debug("Getting tag '%s' on resource '%s'" % (key, resource.id))
+                value = resource.tags.get(key, None)
+            except EC2ResponseError, e:
+                log.error("Exception getting tag '%s' on resource '%s': %s" % (key, resource, e))
+                self.tags_not_supported = True
+        if not value:
+            resource_tags = self.tags.get(resource.id,{})
+            value = resource_tags.get(key)
+        return value    
     
     def run_instances(self, num, instance_type, spot_price=None, **kwargs):
         use_spot = False
         else:
             self._run_ondemand_instances(num, instance_type, spot_price, worker_ud)
         
-    def _run_ondemand_instances(self, num, instance_type, spot_price, worker_ud):
+    def _run_ondemand_instances(self, num, instance_type, spot_price, worker_ud, min_num=1):
         worker_ud_str = "\n".join(['%s: %s' % (key, value) for key, value in worker_ud.iteritems()])
         log.debug("Starting instance(s) with the following command : ec2_conn.run_instances( "
-              "image_id='{iid}', min_count=1, max_count='{num}', key_name='{key}', "
+              "image_id='{iid}', min_count='{min_num}, max_count='{num}', key_name='{key}', "
               "security_groups=['{sgs}'], user_data=[{ud}], instance_type='{type}', placement='{zone}')"
-              .format(iid=self.get_ami(), num=num, key=self.get_key_pair_name(), \
+              .format(iid=self.get_ami(), min_num=min_num, num=num, key=self.get_key_pair_name(), \
               sgs=", ".join(self.get_security_groups()), ud=worker_ud_str, type=instance_type, \
               zone=self.get_zone()))
         try:
             reservation = None
             ec2_conn = self.get_ec2_connection()
             reservation = ec2_conn.run_instances( image_id=self.get_ami(),
-                                                  min_count=1,
+                                                  min_count=min_num,
                                                   max_count=num,
                                                   key_name=self.get_key_pair_name(),
                                                   security_groups=self.get_security_groups(),
             ec2_conn.terminate_instances([instance_id])
             # Make sure the instance was terminated
             time.sleep(3) # First give the middleware a chance to register the termination
-            rs = ec2_conn.get_all_instances([instance_id])
+            rs = ec2_conn.get_all_instances(instance_id)
             if len(rs) == 0 or rs[0].instances[0].state == 'shutting-down' or \
                 rs[0].instances[0].state == 'terminated':
                 return True
         """
         worker_ud = {}
         worker_ud['role'] = 'worker'
-        worker_ud['master_ip'] = self.get_self_private_ip()
+        worker_ud['master_ip'] = self.get_private_ip()
         worker_ud['master_hostname'] = self.get_local_hostname()
         worker_ud['cluster_type'] = self.app.manager.initial_cluster_type
         # Merge the worker's user data with the master's user data
         worker_ud = dict(self.app.ud.items() + worker_ud.items())
         return worker_ud
+
+    def get_all_volumes(self,volume_ids=None, filters=None):
+        return self.get_ec2_connection().get_all_volumes(volume_ids=volume_ids, filters=filters)
     
+    def get_all_instances(self,instance_ids=None,filters=None):
+        return self.get_ec2_connection().get_all_instances(instance_ids=instance_ids,filters=filters)

File cm/clouds/eucalyptus.py

+import socket, time
+from cm.clouds.ec2 import EC2Interface
+from cm.util import misc
+from cm.util import paths
+from urlparse import urlparse
+
+from boto.s3.connection import S3Connection, OrdinaryCallingFormat, SubdomainCallingFormat
+from boto.ec2.connection import EC2Connection
+from boto.ec2.regioninfo import RegionInfo
+from boto.exception import EC2ResponseError
+
+## debugging
+import traceback
+class DebugException(Exception):
+    pass
+
+def stacktrace_wrapper(old_method):
+            
+    def _w(self,*args,**kwargs):
+        try:
+            raise DebugException()
+        except DebugException:
+            log.debug('get_all_instances() called. Trace follows...')
+            traceback.print_stack()
+        return old_method(self,*args,**kwargs)
+    
+    return _w
+
+# EC2Connection.get_all_instances = stacktrace_wrapper(EC2Connection.get_all_instances)
+            
+
+
+import logging
+log = logging.getLogger( 'cloudman' )
+
+class _DummyApp:
+    """to allow for checking TESTFLAG attribute if no app passed"""
+    def __init__(self):
+        self.TESTFLAG = False
+
+class EucaInterface(EC2Interface):
+    """
+        A specialization of cm.clouds.ec2 allowing use of private Eucalyptus clouds.
+        User data should also include keys s3_url and ec2_url to declare where the connection
+        should be made to.    
+    """
+    def __init__(self, app=None):
+        if not app:
+            app = _DummyApp()
+        super(EucaInterface, self).__init__()
+        self.app = app
+        self.tags_not_supported = True
+        self.tags = {}
+        self.local_hostname = None
+        self.public_hostname = None
+        self._min_boto_delay = 2
+        self._instances = {}
+        self._last_instance_check = None
+        self._volumes = {}
+        self._last_volume_check = None
+
+    def set_configuration(self):
+        super(EucaInterface,self).set_configuration()
+        self.s3_url = self.user_data.get('s3_url',None)
+        self.ec2_url = self.user_data.get('ec2_url',None)
+
+    def get_ec2_connection( self ):
+        if not self.ec2_conn:
+            if self.ec2_url:
+                url = urlparse(self.ec2_url)
+                host = url.hostname
+                port = url.port
+                path = url.path
+                if url.scheme == 'https':
+                    is_secure = True
+                else:
+                    is_secure = False
+                try:
+                    log.debug('Establishing local boto EC2 connection to %s' % self.ec2_url)
+                    zone = self.get_zone()
+                    region = RegionInfo(name=zone,endpoint=host)
+                    self.ec2_conn = EC2Connection(
+                                             aws_access_key_id = self.aws_access_key, 
+                                             aws_secret_access_key = self.aws_secret_key,
+                                             is_secure = is_secure,
+                                             host = host,
+                                             port = port,
+                                             path = path,
+                                             region = region,
+                                             debug = 2,
+                    )
+                    # Do a simple query to test if provided credentials are valid
+                    try:
+                        self.ec2_conn.get_all_instances()
+                        log.debug("Got local boto EC2 connection to %s for region '%s'" % (self.ec2_url,self.ec2_conn.region.name))
+                    except EC2ResponseError, e:
+                        log.error("Cannot validate provided local AWS credentials to %s (A:%s, S:%s): %s" % (self.ec2_url, self.aws_access_key, self.aws_secret_key, e))
+                        self.ec2_conn = False
+                except Exception, e:
+                    log.error(e) # to match interface for Ec2Interface
+                    
+            else:
+                super(EucaInterface,self).get_ec2_connection() # default to ec2 connection
+        return self.ec2_conn
+
+    
+    def get_s3_connection(self):
+        log.debug( 'Getting boto S3 connection' )
+        if self.s3_conn == None:
+            log.debug("No S3 Connection, creating a new one.")
+            if self.s3_url:
+                url = urlparse(self.s3_url)
+                host = url.hostname
+                port = url.port
+                path = url.path
+                calling_format=SubdomainCallingFormat()
+                if host.find('amazon') == -1:  # assume that non-amazon won't use <bucket>.<hostname> format
+                    calling_format=OrdinaryCallingFormat()
+                if url.scheme == 'https':
+                    is_secure = True
+                else:
+                    is_secure = False
+                try:
+                    self.s3_conn = S3Connection(
+                        aws_access_key_id = self.aws_access_key,
+                        aws_secret_access_key = self.aws_secret_key,
+                        is_secure = is_secure,
+                        port = port,
+                        host = host,
+                        path = path,
+                        calling_format = calling_format,
+                        # debug = 2
+                    )
+                    log.debug('Got boto S3 connection to %s' % self.s3_url)
+                except Exception, e:
+                    log.error("Exception getting S3 connection: %s" % e)
+            else: # default to Amazon connection
+                super(EucaInterface,self).get_s3_connection()
+        return self.s3_conn
+
+    def get_user_data(self, force=False):
+        if self.user_data is None or force:
+            self.user_data = misc.load_yaml_file(paths.USER_DATA_FILE)
+            self.aws_access_key = self.user_data.get('access_key', None)
+            self.aws_secret_key = self.user_data.get('secret_key', None)
+            self.s3_url = self.user_data.get('s3_url',None)
+            self.ec2_url = self.user_data.get('ec2_url',None)
+        return self.user_data
+
+    def get_local_hostname(self):
+        # Currently, Eucalyptus meta-data/local-hostname returns the IP address, not the hostname. Pull from system hostname instead
+        super(EucaInterface,self).get_local_hostname()
+        if self.local_hostname:
+            toks = self.local_hostname.split('.')
+            if len(toks) == 4:
+                r = filter(lambda x: int(x) < 256 and x > 0, toks)
+                if len(r) == 4:
+                    log.debug('local hostname ({0}) appears to be an IP address.'.format(self.local_hostname))
+                    self.local_hostname = None
+                    
+            if not self.local_hostname:
+                log.debug('Fetching hostname from local system hostname()')
+                self.local_hostname = socket.gethostname()
+        return self.local_hostname
+        
+    def get_public_hostname(self):
+        # Eucalyptus meta-data/public-hostname return the IP address. Fake it, assuming that it will be ip-NUM-NUM-NUM-NUM
+        super(EucaInterface,self).get_public_hostname()
+        if self.public_hostname:
+            toks = self.public_hostname.split('.')
+            if len(toks) == 4:
+                r = filter(lambda x: int(x) < 256 and x > 0, toks)
+                if len(r) == 4:
+                    self.public_hostname = None
+                    
+            if not self.public_hostname:
+                log.debug('Faking local hostname based on IP address {0}'.format('.'.join(toks)))
+                self.public_hostname = 'ip-%s' % '-'.join(toks)
+        return self.public_hostname
+
+    def run_instances(self, num, instance_type, spot_price=None, **kwargs):
+        use_spot = False
+        if spot_price is not None:
+            log.warning('Eucalyptus does not support spot instances -- submitting normal request')
+        log.info("Adding {0} instance(s)".format(num))
+        if self.app.TESTFLAG is True:
+            log.debug("Attempted to start instance(s), but TESTFLAG is set.")
+            return
+        worker_ud = self._compose_worker_user_data()
+        # log.debug( "Worker user data: %s " % worker_ud )
+        if instance_type == '':
+            instance_type = self.get_type()
+        self._run_ondemand_instances(num, instance_type, spot_price, worker_ud, min_num=num) # eucalyptus only starts min_num instances
+
+    def get_all_instances(self,instance_ids=None, filters=None):
+            
+        if isinstance(instance_ids,basestring):
+            instance_ids=(instance_ids,)
+            cache_key = instance_ids
+        elif instance_ids:
+            cache_key = ','.join(instance_ids)
+        else:
+            cache_key = ''
+            
+        # eucalyptus stops responding if you check the same thing too often
+        if self._last_instance_check and cache_key in self._instances and time.time() <= self._last_instance_check + self._min_boto_delay:
+            log.debug('Using cached instance information for {0}'.format(str(instance_ids)))
+            reservations = self._instances[cache_key]
+        else:
+            reservations = self.get_ec2_connection().get_all_instances(instance_ids=instance_ids)
+            # Filter for only reservations that include the filtered instance IDs. Needed because Eucalyptus doesn't filter properly
+            if instance_ids:
+                reservations = [r for r in reservations if [i for i in r.instances if i.id in instance_ids ] ]
+            self._instances[cache_key] = reservations
+            self._last_instance_check = time.time()
+
+        if not filters:
+            filters = {}
+        excluded = []
+        for r in reservations:
+            for key in filters.keys():
+                val = filters[key]
+                if key.startswith('tag:'):
+                    tag = key[4:]
+                    if self.get_tag(r.id,tag) != val:
+                        excluded.append(r)
+                        continue
+                else:
+                    log.error('Could not filter instance on unknown filter key {0}'.format(key))
+        res = [ i for i in reservations if i not in excluded]
+        
+        return res
+    
+    def get_all_volumes(self,volume_ids=None, filters=None):
+        # eucalyptus does not allow filters in get_all_volumes
+        if isinstance(volume_ids,basestring):
+            volume_ids = (volume_ids,)
+            cache_key = volume_ids
+        elif volume_ids:
+            cache_key = ','.join(volume_ids)
+        else:
+            cache_key = ''
+
+        # eucalyptus stops responding if you check too often
+        if self._last_volume_check and cache_key in self._volumes and time.time() <= self._last_volume_check + self._min_boto_delay:
+            volumes = self._volumes[cache_key]
+        else:
+            # need to go this roundabout way to get the volume because euca does not filter the get_all_volumes request by the volume ID,
+            # but keep the filter, in case it eventually does
+            volumes = [ v for v in self.get_ec2_connection().get_all_volumes( volume_ids= volume_ids ) if not volume_ids or (v.id in volume_ids) ]
+            self._last_volume_check = time.time()
+            self._volumes[cache_key] = volumes # cache returned volumes (for this set of filters)
+
+        if not filters:
+            filters = {}
+        excluded_vols = []
+        for v in volumes:
+            for key in filters.keys():
+                val = filters[key]
+                if key.startswith('tag:'):
+                    tag = key[4:]
+                    if self.get_tag(v.id,tag) != val:
+                        # log.debug('(get_all_volumes) Excluding volume {0} because tag {1} != {2}. (is {3})'.format(v.id,tag,val,self.get_tag(v.id,tag)))
+                        excluded_vols.append(v)
+                elif key == 'attachment.device':
+                    if v.attach_data.device != val:
+                        # log.debug('(get_all_volumes) Excluding volume {0} because it is not attached as {1} (is {2})'.format(v.id,val,v.attach_data.device))
+                        excluded_vols.append(v)
+                elif key == 'attachment.instance-id':
+                    if v.attach_data.instance_id != val:
+                        # log.debug('(get_all_volume) Excluding vol {0} because it is not attached to {1} (is {2})'.format(v.id,val,v.attach_data.instance_id))
+                        excluded_vols.append(v)
+                else:
+                    log.error('Could not filter on unknown filter key {0}'.format(key))
+        vols = [v for v in volumes if v not in excluded_vols]
+        # log.debug("(get_all_volumes) Returning volumes: {0}".format(vols))
+        return vols
+            

File cm/clouds/opennebula.py

     def get_key_pair_name( self ):
         pass
     
-    def get_self_private_ip( self ):
+    def get_private_ip( self ):
         # TODO: Change this to the masters IP
         log.debug("Asking for private IP")
         return self._getIpAddress('eth1')
     
-    def get_self_public_ip( self ):
+    def get_public_ip( self ):
         # TODO: Change this to the masters IP
         log.debug("Asking for public IP")
         return self._getIpAddress('eth0')
                 # local machine hwaddr = 00:22:64:ae:3d:fd
                 # TODO
                 if self._getMacAddress('eth0') != '00:22:64:ae:3d:fd':
-                #if int(self.get_self_private_ip().split('.')[2]) == int(self.bridge):
+                #if int(self.get_private_ip().split('.')[2]) == int(self.bridge):
                     ret_code = subprocess.call( 'sudo telinit 6', shell=True )
 
                 

File cm/clouds/openstack.py

                 log.error("Trouble creating a Swift connection: {0}".format(e))
         return self.s3_conn
     
-    def get_self_public_ip( self ):
+    def get_public_ip( self ):
         """ NeCTAR's public & private IPs are the same and also local-ipv4 metadata filed
             returns empty so do some monkey patching.
         """
                 try:
                     log.debug('Gathering instance public IP, attempt %s' % i)
                     if self.app.ud.get('cloud_name', 'ec2').lower() == 'nectar':
-                        self.self_public_ip = self.get_self_private_ip()
+                        self.self_public_ip = self.get_private_ip()
                     else:
                         fp = urllib.urlopen('http://169.254.169.254/latest/meta-data/local-ipv4')
                         self.self_public_ip = fp.read()

File cm/controllers/root.py

             else:
                 log_file = os.path.join(paths.P_SGE_CELL, 'messages')
         elif service_name == 'CloudMan':
-                log_file = "paster.log"
+            log_file = "paster.log"
         # Set log length
         if num_lines:
             if show == 'more':
         """
         a_string_type = type ( a_string )
         if a_string_type is str:
-           return unicode( a_string, 'utf-8' )
+            return unicode( a_string, 'utf-8' )
         elif a_string_type is unicode:
-           return a_string
+            return a_string
 
     @expose
     def get_srvc_status(self, trans, srvc):
             # Test if provided values are in email format and remove non-email formatted ones
             admins_list_check = admins_list
             for admin in admins_list_check:
-                 m = re.search('(\w+@\w+(?:\.\w+)+)', admin)
-                 if not m:
-                     admins_list.remove(admin)
+                m = re.search('(\w+@\w+(?:\.\w+)+)', admin)
+                if not m:
+                    admins_list.remove(admin)
             # Get a handle to Galaxy service and add admins
             svcs = self.app.manager.get_services('Galaxy')
             if len(svcs)>0 and len(admins_list)>0:
         """ Check if limits for autoscaling are acceptable."""
         if as_min is not None and as_min.isdigit() and int(as_min)>=0 and int(as_min)<20 and \
            as_max is not None and as_max.isdigit() and int(as_max)>=int(as_min) and int(as_max)<20:
-           return True
+            return True
         else:
-           return False
+            return False
 
     @expose
     def share_a_cluster(self, trans, visibility, user_ids="", cannonical_ids=""):
         for fs in fss:
             filesystems.append(fs.name)
         return trans.fill_template('admin.mako',
-                                   ip=self.app.cloud_interface.get_self_public_ip(),
+                                   ip=self.app.cloud_interface.get_public_hostname(),
                                    key_pair_name=self.app.cloud_interface.get_key_pair_name(),
                                    filesystems=filesystems,
                                    bucket_cluster=self.app.ud['bucket_cluster'],
         DNS address if so, `#` otherwise. """
         g_s = self.app.manager.get_services('Galaxy')
         if g_s and g_s[0].state == service_states.RUNNING:
-            dns = 'http://%s' % str( self.app.cloud_interface.get_self_public_hostname() )
+            dns = 'http://%s' % str( self.app.cloud_interface.get_public_hostname() )
         else:
             # dns = '<a href="http://%s" target="_blank">Access Galaxy</a>' % str( 'localhost:8080' )
             dns = '#'
 
     @expose
     def static_instance_state_json(self, trans, no_json=False):
-        ret_dict = {'master_ip': self.app.cloud_interface.get_self_public_ip(),
+        ret_dict = {'master_ip': self.app.cloud_interface.get_public_ip(),
                     'master_id': self.app.cloud_interface.get_instance_id(),
                     'ami_id' : self.app.cloud_interface.get_ami(),
                     'availability_zone' : self.app.cloud_interface.get_zone(),
                     'key_pair_name' : self.app.cloud_interface.get_key_pair_name(),
                     'security_groups' : self.app.cloud_interface.get_security_groups(),
-                    'master_host_name': self.app.cloud_interface.get_self_public_hostname()
+                    'master_host_name': self.app.cloud_interface.get_public_hostname()
                    }
         if no_json:
             return ret_dict

File cm/services/apps/pss.py

         self.app.cloud_interface.get_zone()
         self.app.cloud_interface.get_key_pair_name()
         self.app.cloud_interface.get_security_groups()
-        self.app.cloud_interface.get_self_private_ip()
-        self.app.cloud_interface.get_self_public_ip()
+        self.app.cloud_interface.get_private_ip()
+        self.app.cloud_interface.get_public_ip()
         self.app.cloud_interface.get_local_hostname()
 
     def add(self):

File cm/services/apps/sge.py

     def _get_sge_install_conf(self):
         # Add master as an execution host
         # Additional execution hosts will be added later, as they start
-        exec_nodes = self.app.cloud_interface.get_self_private_ip()
+        exec_nodes = self.app.cloud_interface.get_private_ip()
         sge_install_template = Template(templates.SGE_INSTALL_TEMPLATE)
         sge_params = {
           "cluster_name": "GalaxyEC2",
-          "admin_host_list": self.app.cloud_interface.get_self_private_ip(),
-          "submit_host_list": self.app.cloud_interface.get_self_private_ip(),
+          "admin_host_list": self.app.cloud_interface.get_private_ip(),
+          "submit_host_list": self.app.cloud_interface.get_private_ip(),
           "exec_host_list": exec_nodes,
           "hostname_resolving": "true",
         }
         log.debug("Composing SGE's @allhosts group config file {0}:".format(filename))
         if self.app.manager.master_exec_host:
             log.debug(" - adding master instance; IP: {0}"\
-                .format(self.app.cloud_interface.get_self_private_ip()))
-            ahl.append(self.app.cloud_interface.get_self_private_ip())
+                .format(self.app.cloud_interface.get_private_ip()))
+            ahl.append(self.app.cloud_interface.get_private_ip())
         else:
             log.debug(" - master is marked as non-exec host and will not be included in @allhosts file")
         # Add worker instances, excluding the one being removed

File cm/services/data/__init__.py

         """
         raise NotImplementedError()
     
-    def create(self):
+    def create(self, filesystem=None):
         """ Create the block storage resource represented by this object
         """
         raise NotImplementedError()
         """
         raise NotImplementedError()
     
-    def snapshot(self):
+    def snapshot(self, snap_description=None):
         """ Create a point-in-time snapshot of this block storage resource
         
         :rtype: string

File cm/services/data/filesystem.py

         return None
 
     def check_and_update_volume(self, device):
-        """
-        Check that the volume used for this file system is actually the volume
-        we have a reference to and update local fields as necessary.
-        """
-        if self.app.cloud_type == "ec2":
-            # filtering w/ boto is supported only with ec2
-            f = {'attachment.device':device, 'attachment.instance-id':self.app.cloud_interface.get_instance_id()}
-            vols = self.app.cloud_interface.get_ec2_connection().get_all_volumes(filters=f)
-        else:
-            vols = []
-            all_vols = self.app.cloud_interface.get_ec2_connection().get_all_volumes()
-            for vol in all_vols:
-                if vol.attach_data.instance_id == self.app.cloud_interface.get_instance_id() and \
-                   vol.attach_data.device == device:
-                       vols.append(vol)
+        f = {'attachment.device': device, 'attachment.instance-id': self.app.cloud_interface.get_instance_id()}
+        vols = self.app.cloud_interface.get_all_volumes(filters=f)
         if len(vols) == 1:
             att_vol = vols[0]
             for vol in self.volumes: # Currently, bc. only 1 vol can be assoc w/ FS, we'll only deal w/ 1 vol

File cm/services/data/volume.py

 class, a single object/volume maps to a complete file system.
 """
 import os
-import re
 import grp
 import pwd
 import time
 import string
 import commands
 import subprocess
+from glob import glob
 
 from boto.exception import EC2ResponseError
 
 log = logging.getLogger('cloudman')
 
 
+MIN_TIME_BETWEEN_STATUS_CHECKS = 2 # seconds to wait before updating volume status
+volume_status_map = {
+                     'creating'    : volume_status.CREATING,
+                     'available'   : volume_status.AVAILABLE,
+                     'in-use'      : volume_status.IN_USE,
+                     'attached'    : volume_status.ATTACHED,
+                     'deleting'    : volume_status.DELETING,
+                     }
+
 class Volume(BlockStorage):
 
     def __init__(self, filesystem, vol_id=None, device=None, attach_device=None,
         self.fs = filesystem
         self.app = self.fs.app
         self.volume = None # boto instance object representing the current volume
-        self.volume_id = vol_id
         self.device = device # Device ID visible by the operating system
-        self.attach_device = attach_device # Device ID visible/reported by cloud
-                                           # provider as the device attach point
         self.size = size
         self.from_snapshot_id = from_snapshot_id
         self.device = None
                              # AND can be deleted upon cluster termination
         self.snapshot_progress = None
         self.snapshot_status = None
+        self._status = volume_status.NONE
+        self._last_status_check = None
+
+        if (vol_id): # get the volume object immediately, if id is passed
+            self.update(vol_id)
+        elif from_snapshot_id:
+            self.create()
 
     def __str__(self):
         return str(self.volume_id)
         details['err_msg']  = None if details.get('err_msg', '') == '' else details['err_msg']
         return details
 
-    def update(self, bsd):
-        """ Update reference to the 'self' to point to argument 'bsd' """
-        log.debug("Updating current volume reference '%s' to a new one '%s'" % (self.volume_id, bsd.id))
-        self.volume = bsd
-        self.volume_id = bsd.id
-        self.device = bsd.attach_data.device
-        self.size = bsd.size
-        self.from_snapshot_id = bsd.snapshot_id
+    def update(self, vol_id):
+        """ switch to a different boto.ec2.volume.Volume """
+        log.debug('vol_id = {0} ({1})'.format(vol_id, type(vol_id)))
+        if isinstance(vol_id,basestring):
+            vols = self.app.cloud_interface.get_all_volumes( volume_ids=(vol_id, ))
+            if not vols:
+                log.error('Attempting to connect to a non-existent volume {0}'.format(vol_id))
+            vol = vols[0]
+        else:
+            vol = vol_id
+        log.debug("Updating current volume reference '%s' to a new one '%s'" % (self.volume_id, vol.id))
+        if vol.attachment_state() == 'attached' and vol.attach_data.instance_id != self.app.cloud_interface.get_instance_id():
+            log.error('Attempting to connect to a volume ({0} that is already attached to a different instance ({1}'.format(vol.volume_id, vol.attach_data.instance_id))
+            self.volume = None
+            self.device = None
+        else:
+            self.volume = vol
+            self.device = vol.attach_data.device
+            self.size = vol.size
+            self.from_snapshot_id = vol.snapshot_id
+            if self.from_snapshot_id == '':
+                self.from_snapshot_id = None
 
+    @property
+    def volume_id(self):
+        if self.volume:
+            return self.volume.id
+        else:
+            return None
+    
+    @property
+    def attach_device(self):
+        if self.volume:
+            return self.volume.attach_data.device
+        else:
+            return None
+    
+    @property
     def status(self):
-        if self.volume_id is None:
-            return volume_status.NONE
+        if not self.volume:
+            # no volume active
+            status = volume_status.NONE
+        elif self._status and self._last_status_check >= time.time() - MIN_TIME_BETWEEN_STATUS_CHECKS:
+            status = self._status
         else:
-            ec2_conn = self.app.cloud_interface.get_ec2_connection()
-            if ec2_conn:
-                try:
-                    self.volume = ec2_conn.get_all_volumes([self.volume_id])[0]
-                except EC2ResponseError, e:
-                    log.error("Cannot retrieve reference to volume '%s'; "
-                        "setting volume id to None. Error: %s" % (self.volume_id, e))
-                    self.volume_id = None
-                    return volume_status.NONE
-                self.from_snapshot_id = self.volume.snapshot_id
-                self.size = self.volume.size
-                if self.from_snapshot_id is '': # ensure consistency
-                    self.from_snapshot_id = None
-                if self.volume.status == 'creating':
-                    return volume_status.CREATING
-                elif self.volume.status == 'available':
-                    return volume_status.AVAILABLE
-                elif self.volume.status == 'in-use':
-                    if self.volume.attach_data.status == 'attached':
-                        return volume_status.ATTACHED
-                    else:
-                        return volume_status.IN_USE
-                elif self.volume.status == 'deleting':
-                    return volume_status.DELETING
+            try:
+                self.volume.update()
+                status = volume_status_map.get(self.volume.status,None)
+                if status == volume_status.IN_USE and self.volume.attachment_state() == 'attached':
+                    status = volume_status.ATTACHED
+                if not status:
+                    log.error('Unknown volume status {0}. Assuming volume_status.NONE'.format(self.volume.status))
+                    status = volume_status.NONE
+                self._status = status
+                self._last_status_check = time.time()
+            except EC2ResponseError as e:
+                log.error('Cannot retrieve status of current volume. {0}'.format(e))
+                status = volume_status.NONE
+        return status
+
+    def wait_for_status(self,status,timeout=120):
+        """Wait for timeout seconds, or until the volume reaches a desired status
+        Returns false if it never hit the request status before timeout"""
+        if self.status == volume_status.NONE:
+            log.debug('Attempted to wait for a status ({0} ) on a non-existent volume'.format(status))
+            return False # no volume means not worth waiting
+        else:
+            start_time=time.time()
+            checks = 10
+            end_time = start_time + timeout
+            wait_time = float(timeout)/checks
+            while time.time() <= end_time:
+                if self.status == status:
+                    log.debug('Volume {0} has reached status {1}'.format(self.volume_id,status))
+                    return True
                 else:
-                    log.debug("Unrecognized volume '%s' status: '%s'" % (self.volume_id, self.volume.status))
-                    return self.volume.status
-            # Connection failed
-            return volume_status.NONE
+                    log.debug('Waiting for volume {0} (status {1}) to reach status {2}. Remaining checks: {3}'.format(self.volume_id,self.status,status,checks))
+                    checks -= 1
+                    time.sleep(wait_time)
+            log.debug('Wait for volume {0} to reach status {1} timed out. Current status {2}'.format(self.volume_id,status,self.status))
+            return False
 
-    def get_attach_device(self, offset=0):
-        # Need to ensure both of the variables are in sync
-        if offset or (not self.device or not self.attach_device):
-            if not self._set_devices(offset):
-                return None
-        return self.attach_device
-
-    def get_device(self, offset=0):
-        # Need to ensure both of the variables are in sync
-        if offset or (not self.device or not self.attach_device):
-            if not self._set_devices(offset):
-                return None
-        return self.device
-
-    def _set_devices(self, offset=0):
-        """
-        Use this method to figure out as which device should this volume get
-        attached and how is it visible to the system once attached.
-        ``offset`` forces creation of a new device ID.
-        """
-        def get_next_device_index(mount_base, offset=0, numeric=False):
-            """
-            Get the next device index in sequence based on ``mount_base``. For
-            example, given ``mount_base`` as ``/dev/vd`` the method will look
-            for any devices starting with that base and return the next available
-            index: if ``/dev/vdc`` is the highest order in the current list of
-            devices with that mount base, the method will return ``d``, for
-            ``/dev/vdd``. If no devices are found, the method returns ``a``. If
-            the highest most device is ends with ``z``, the method returns ``None``.
-
-            If ``offset`` is set, offset the next index by the provided value. This
-            applies to both, numeric and non-numeric indexes.
-
-            If ``numeric`` is set to ``True``, the next index in sequence will be
-            a numeric value. For example, if ``/dev/sdg1`` already exists on the
-            system, the method will return ``'2'``. If no devices with that mount
-            base exist, the method will return ``'1'``. Note that the return value
-            of this method is a string, not an int.
-            """
-            dev_list = commands.getstatusoutput('ls %s*' % mount_base)
-            if dev_list[0] == 0:
-                # We have at least 1 device with this mount base, let's figure out the next one
-                if numeric is True:
-                    device_index = str(max([int(d[len(mount_base):]) for d in dev_list[1].split()])+1+offset)
-                else:
-                    # Get the highest device in sequence
-                    highest_device = dev_list[1].split().pop()
-                    # Remove any potential numbers (ie, partitions) from the device name
-                    # eg, if device was ``/dev/vda1``, get down to only ``/dev/vda``
-                    partitions = re.search(r'\d+$', highest_device)
-                    highest_device = highest_device.rstrip(partitions.group(0) if partitions else '')
-                    # Get the last letter of the device
-                    highest_device_index = highest_device[len(highest_device)-1]
-                    # Get the next device in sequence, taking care of the ``z`` case
-                    if highest_device_index == 'z':
-                        log.error("Cannot get the next device index becaue we're 'z' already!")
-                        return None
-                    hdi_num = string.ascii_lowercase.index(highest_device_index)
-                    device_index = string.ascii_lowercase[hdi_num+1+offset]
-            else:
-                # No devices with this mount base exist, return the defaults
-                if numeric is True:
-                    device_index = '1'
-                else:
-                    device_index = 'a'
-            log.debug("Setting {0} device index to '{1}'".format(self.get_full_name(), device_index))
-            return device_index
-
-        # As of Ubuntu 11.04 (kernel 2.6.38), EC2 EBS volumes attached to /dev/sd*
-        # get attached as device /dev/xvd*. So, look at the current version
-        # of kernel running and set volume's device accordingly
-        kernel_out = commands.getstatusoutput('uname -r')
-        numeric_index = False # By default, device indices will be letter-based
-        if self.app.cloud_type == 'ec2':
-            if kernel_out[0] == 0:
-                numeric_index = True # on ec2, we use numeric device indexing
-                # Extract significant kernel version numbers and test against
-                # lowest kernel version that changes device mappings
-                if map(int, kernel_out[1].split('-')[0].split('.')) >= [2, 6, 38]:
-                    mount_base = '/dev/xvdg'
-                else:
-                    mount_base = '/dev/sdg'
-            else:
-                log.error("Could not discover kernel version required for to obtain volume's device.")
-                return False
-            attach_base = '/dev/sdg'
-        else: # non-ec2 clouds do not seem to have the above issue but attach to a different device
-            mount_base = '/dev/vd'
-            attach_base = '/dev/vd'
-
-        # In case the system was rebooted and the volume is already attached, match
-        # the device ID with the attach device ID
-        # If offset is set, force creation of new IDs
-        if self.attach_device and offset == 0:
-            self.device = re.sub(attach_base, mount_base, self.attach_device)
-            return True
-        device_index = get_next_device_index(mount_base, offset=offset, numeric=numeric_index)
-        self.device = '%s%s' % (mount_base, device_index)
-        self.attach_device = '%s%s' % (attach_base, device_index)
-        log.debug("Running on kernel version %s; set %s volume device as '%s' and attach "
-            "device as '%s'" % (kernel_out[1], self.get_full_name(), self.device, self.attach_device))
-        return True
 
     def create(self, filesystem=None):
-        if self.status() == volume_status.NONE:
+        if not self.size and not self.from_snapshot_id:
+            log.error('Cannot add a volume without a size or snapshot ID')
+            return None
+        
+        if self.status == volume_status.NONE:
             try:
-                # Get the size of a snapshot from which we'll create a volume
-                if self.size == 0 and self.from_snapshot_id is not None:
-                    snap = self.app.cloud_interface.ec2_conn.get_all_snapshots(
-                            snapshot_ids=[self.from_snapshot_id])[0]
-                    self.size = snap.volume_size
-                log.debug("Creating a new volume of size '%s' in zone '%s' from snapshot '%s'" \
-                    % (self.size, self.app.cloud_interface.get_zone(), self.from_snapshot_id))
-                if self.size > 0:
-                    self.volume = self.app.cloud_interface.get_ec2_connection().create_volume(self.size,
-                        self.app.cloud_interface.get_zone(), snapshot=self.from_snapshot_id)
-                    self.volume_id = str(self.volume.id)
-                    self.size = int(self.volume.size)
-                    # Wait until the volume is actually created
-                    # NOTE that this could potentially take forever
-                    volumestatus = self.volume.update()
-                    while volumestatus != 'available':
-                        log.debug("Waiting for volume '%s' to create; status: %s" \
-                            % (self.get_full_name(), volumestatus))
-                        time.sleep(3)
-                        volumestatus = self.volume.update()
-                    log.debug("Created new volume of size '%s' from snapshot '%s' with ID '%s' in zone '%s'" \
-                        % (self.size, self.from_snapshot_id, self.get_full_name(), \
-                        self.app.cloud_interface.get_zone()))
-                else:
-                    log.warning("Cannot create volume of size 0! Volume not created.")
+                log.debug("Creating a new volume of size '%s' in zone '%s' from snapshot '%s'" % (self.size, self.app.cloud_interface.get_zone(), self.from_snapshot_id))
+                self.volume = self.app.cloud_interface.get_ec2_connection().create_volume(self.size, self.app.cloud_interface.get_zone(), snapshot=self.from_snapshot_id)
+                self.size = int(self.volume.size or 0) # when creating from a snapshot in Euca, volume.size may be None
+                log.debug("Created new volume of size '%s' from snapshot '%s' with ID '%s' in zone '%s'" % (self.size, self.from_snapshot_id, self.volume_id, self.app.cloud_interface.get_zone()))
             except EC2ResponseError, e:
                 log.error("Error creating volume: %s" % e)
         else:
-            log.debug("Tried to create a volume but it is in state '%s' (volume ID: %s)" \
-                % (self.status(), self.get_full_name()))
+            log.debug("Tried to create a volume but it is in state '%s' (volume ID: %s)" % (self.status, self.volume_id))
 
         # Add tags to newly created volumes (do this outside the inital if/else
         # to ensure the tags get assigned even if using an existing volume vs.
 
     def delete(self):
         try:
-            self.app.cloud_interface.get_ec2_connection().delete_volume(self.volume_id)
-            log.debug("Deleted volume '%s'" % self.volume_id)
-            self.volume_id = None
+            volume_id = self.volume_id
+            self.volume.delete()
+            log.debug("Deleted volume '%s'" % volume_id)
             self.volume = None
         except EC2ResponseError, e:
-            log.error("Error deleting volume '%s' - you should delete it manually after "
-                "the cluster has shut down: %s" % (self.volume_id, e))
+            log.error("Error deleting volume '%s' - you should delete it manually after the cluster has shut down: %s" % (self.volume_id, e))
 
-    def do_attach(self, attach_device):
+    # attachment helper methods
+
+    def _get_device_list(self):
+        return frozenset(glob('/dev/*d[a-z]'))
+
+    def _increment_device_id(self,device_id):
+        new_id = device_id[0:-1] + chr(ord(device_id[-1])+1)
+        return new_id
+
+    def _get_likely_next_devices(self,devices=None):
+        """Returns a list of possible devices to attempt to attempt to connect to.
+        If using virtio, then the devices get attached as /dev/vd?.
+        Newer ubuntu kernels might get devices attached as /dev/xvd?.
+        Otherwise, it'll probably get attached as /dev/sd?
+        If either /dev/vd? or /dev/xvd? devices exist, then we know to use the next of those.
+        Otherwise, test /dev/sd?, /dev/xvd?, then /dev/vd?
+        This is so totally not thread-safe. If other devices get attached externally, the device id may already be in use when we get there."""
+        if not devices:
+            devices = self._get_device_list()
+        device_map = map(lambda x:(x.split('/')[-1], x), devices)  # create a dict of id:/dev/id from devices
+        # in order, we want vd?, xvd?, or sd?
+        vds = sorted( (d[1] for d in device_map if d[0][0] == 'v' ) )
+        xvds = sorted( ( d[1] for d in device_map if d[0][0:2] == 'xv' ) )
+        sds = sorted( ( d[1] for d in device_map if d[0][0] == 's'  ) )
+        if vds:
+            return ( self._increment_device_id(vds[-1] ), )
+        elif xvds:
+            return ( self._increment_device_id(xvds[-1] ), )
+        elif sds:
+            return ( self._increment_device_id( sds[-1] ), '/dev/vda', '/dev/xvda' ) 
+        else:
+            log.error("Could not determine next available device from {0}".format(devices))
+            return None
+
+    def _do_attach(self, attach_device):
         try:
             if attach_device is not None:
-                log.debug("Attaching volume '%s' to instance '%s' as device '%s'"
-                    % (self.get_full_name(),  self.app.cloud_interface.get_instance_id(), attach_device))
-                volumestatus = self.app.cloud_interface.get_ec2_connection() \
-                    .attach_volume(self.volume_id, self.app.cloud_interface.get_instance_id(), attach_device)
+                log.debug("Attaching volume '%s' to instance '%s' as device '%s'" % (self.volume_id,  self.app.cloud_interface.get_instance_id(), attach_device))
+                self.volume.attach(self.app.cloud_interface.get_instance_id(),attach_device)
             else:
-                log.error("Attaching volume '%s' to instance '%s' failed because could not determine device."
-                    % (self.get_full_name(),  self.app.cloud_interface.get_instance_id()))
+                log.error("Attaching volume '%s' to instance '%s' failed because could not determine device." % (self.volume_id,  self.app.cloud_interface.get_instance_id()))
                 return False
         except EC2ResponseError, e:
             for er in e.errors:
                         "Exception: %s (%s)" % (self.volume_id,
                         self.app.cloud_interface.get_instance_id(), attach_device, er[0], er[1]))
             return False
-        return volumestatus
+        return self.status
 
     def attach(self):
         """
         Attach EBS volume to the given device.
         Try it for some time.
+        Returns the attached device path, or None if it can't attach
         """
-        for counter in range( 30 ):
-            if self.status() == volume_status.AVAILABLE:
-                attach_device = self.get_attach_device() # Ensure device ID is assigned to current volume
-                volumestatus = self.do_attach(attach_device)
-                # Wait until the volume is 'attached'
-                ctn = 0
-                attempts = 30
-                while ctn < attempts:
-                    log.debug("Attaching volume '%s'; status: %s (check %s/%s)" \
-                        % (self.get_full_name(), volumestatus, ctn, attempts))
-                    if volumestatus == 'attached':
-                        log.debug("Volume '%s' attached to instance '%s' as device '%s'" \
-                            % (self.volume_id, self.app.cloud_interface.get_instance_id(),
-                            self.get_attach_device()))
-                        break
-                    if ctn == attempts-1:
-                        log.debug("Volume '%s' FAILED to attach to instance '%s' as device '%s'." \
-                            % (self.get_full_name(), self.app.cloud_interface.get_instance_id(),
-                            self.get_attach_device()))
-                        if attempts < 90:
-                            log.debug("Will try another device")
-                            attempts += 30 # Increment attempts for another try
-                            if self.detach():
-                                # Offset device num by number of attempts
-                                attach_device = self.get_attach_device(offset=attempts/30-1)
-                                volumestatus = self.do_attach(attach_device)
-                        else:
-                            log.debug("Will not try again. Aborting attaching of volume")
-                            return False
-                    volumes = (self.app.cloud_interface.get_ec2_connection()).get_all_volumes([self.volume_id])
-                    volumestatus = volumes[0].attachment_state()
-                    time.sleep(3)
-                    ctn += 1
-                return True
-            elif self.status() == volume_status.IN_USE or self.status() == volume_status.ATTACHED:
-                # Check if the volume is already attached to current instance
-                # (this can happen following a reboot/crash)
-                if self.volume.attach_data.instance_id == self.app.cloud_interface.get_instance_id():
-                    self.attach_device = self.volume.attach_data.device
-                    log.debug("Tried to attach a volume but the volume '%s' is already "
-                        "attached (as device %s)" % (self.get_full_name(), self.get_attach_device()))
-                    return True
-            elif self.volume_id is None:
-                log.error("Wanted to attach a volume but missing volume ID; cannot attach")
-                return False
-            if counter == 29:
-                log.warning("Cannot attach volume '%s' in state '%s'" % (self.volume_id, self.status()))
-                return False
-            log.debug("Wanting to attach volume '%s' but it's not 'available' yet (current state: '%s'). "
-                "Waiting (%s/30)." % (self.volume_id, self.status(), counter))
-            time.sleep( 2 )
+        log.debug('Starting Volume.attach. volume {0}'.format(self.volume_id,self.app.cloud_interface.get_instance_id()))
+        # bail if the volume is doesn't exist, or is already attached
+        if self.status == volume_status.NONE or self.status == volume_status.DELETING:
+            log.error('Attempt to attach non-existent volume {0}'.format(self.volume_id))
+            return None
+        elif self.status == volume_status.ATTACHED or self.status == volume_status.IN_USE:
+            log.debug('Volume {0} already attached')
+            return self.device
+
+        # wait for the volume to become available
+        if self.from_snapshot_id and  self.status == volume_status.CREATING:
+            # Eucalyptus can take an inordinate amount of time to create a volume from a snapshot
+            log.warning("Waiting for volume to be created from a snapshot...") 
+            if not self.wait_for_status(volume_status.AVAILABLE,timeout=600): 
+                log.error('Volume never reached available from creating status. Status is {0}'.format(self.status))
+        elif self.status != volume_status.AVAILABLE:
+            if not self.wait_for_status(volume_status.AVAILABLE, timeout=60):
+                log.error('Volume never became available to attach. Status is {0}'.format(self.status))
+                return None
+
+        # attempt to attach
+        for attempted_device in self._get_likely_next_devices():
+            pre_devices = self._get_device_list()
+            log.debug('Before attach, devices = {0}'.format(' '.join(pre_devices)))
+            if self._do_attach(attempted_device):
+                if self.wait_for_status(volume_status.ATTACHED):
+                    time.sleep(30) # give a few seconds for the device to show up in the OS
+                    post_devices = self._get_device_list()
+                    log.debug('After attach, devices = {0}'.format(' '.join(post_devices)))
+                    new_devices = post_devices - pre_devices
+                    log.debug('New devices = {0}'.format(' '.join(new_devices)))
+                    if len(new_devices) == 0:
+                        log.debug('Could not find attached device for volume {0}. Attempted device = {1}'.format(self.volume_id, attempted_device))
+                    elif attempted_device in new_devices:
+                        self.device = attempted_device
+                        return attempted_device
+                    elif len(new_devices) > 1:
+                        log.error("Multiple devices (%s) added to OS during process, and none are the requested device. Can't determine new device. Aborting"
+                              % ', '.join(new_devices))
+                        return None
+                    else:
+                        device = tuple(new_devices)[0]
+                        self.device= device
+                        return device
+                # requested device didn't attach, for whatever reason
+                if self.status != volume_status.AVAILABLE and attempted_device[-3:-1] != 'vd':
+                    self.detach() # in case it attached invisibly
+                self.wait_for_status(volume_status.AVAILABLE, 60)
+        return None # no device properly attached
 
     def detach(self):
         """
         Detach EBS volume from an instance.
         Try it for some time.
         """
-        if self.status() == volume_status.ATTACHED or self.status() == volume_status.IN_USE:
+        if self.status == volume_status.ATTACHED or self.status == volume_status.IN_USE:
             try:
-                volumestatus = self.app.cloud_interface.get_ec2_connection()\
-                    .detach_volume( self.volume_id, self.app.cloud_interface.get_instance_id())
+                self.volume.detach()
             except EC2ResponseError, e:
-                log.error("Detaching volume '%s' from instance '%s' failed. Exception: %s" \
-                    % (self.get_full_name(), self.app.cloud_interface.get_instance_id(), e))
+                log.error("Detaching volume '%s' from instance '%s' failed. Exception: %s" % (self.volume_id, self.app.cloud_interface.get_instance_id(), e))
                 return False
-
-            for counter in range(30):
-                log.debug("Detaching volume '%s'; status '%s' (check %s/30)" \
-                    % (self.get_full_name(), volumestatus, counter))
-                if volumestatus == 'available':
-                    log.debug("Volume '%s' successfully detached from instance '%s'." \
-                        % ( self.get_full_name(), self.app.cloud_interface.get_instance_id() ))
-                    self.volume = None
-                    break
-                if counter == 28:
-                    try:
-                        volumestatus = self.app.cloud_interface.get_ec2_connection()\
-                            .detach_volume(self.volume_id, self.app.cloud_interface.get_instance_id(),
-                            force=True)
-                    except EC2ResponseError, e:
-                        log.error("Second attempt at detaching volume '%s' from instance '%s' failed. "
-                            "Exception: %s" % (self.get_full_name(), self.app.cloud_interface.get_instance_id(), e))
-                        return False
-                if counter == 29:
-                    log.debug("Volume '%s' FAILED to detach from instance '%s'" \
-                        % ( self.get_full_name(), self.app.cloud_interface.get_instance_id() ))
+            self.wait_for_status(volume_status.AVAILABLE,240)
+            if self.status != volume_status.AVAILABLE:
+                log.debug('Attempting to detach again.')
+                try:
+                    self.volume.detach()
+                except EC2ResponseError, e:
+                    log.error("Detaching volume '%s' from instance '%s' failed. Exception: %s" % (self.volume_id, self.app.cloud_interface.get_instance_id(), e))
                     return False
-                time.sleep(6)
-                volumes = self.app.cloud_interface.get_ec2_connection().get_all_volumes( [self.volume_id] )
-                volumestatus = volumes[0].status
+                if not self.wait_for_status(volume_status.AVAILABLE,60):
+                    log.warning('Volume {0} did not detach properly. Left in state {1}'.format(self.volume_id,self.status))
+                    return False
         else:
-            log.warning("Cannot detach volume '%s' in state '%s'" % (self.get_full_name(), self.status()))
+            log.warning("Cannot detach volume '%s' in state '%s'" % (self.volume_id, self.status))
             return False
         return True
 
     def snapshot(self, snap_description=None):
         log.info("Initiating creation of a snapshot for the volume '%s'" % self.volume_id)
-        ec2_conn = self.app.cloud_interface.get_ec2_connection()
         try:
-            snapshot = ec2_conn.create_snapshot(self.volume_id, description=snap_description)
-        except Exception, ex:
+            snapshot = self.volume.create_snapshot(description=snap_description)
+        except EC2ResponseError as ex:
             log.error("Error creating a snapshot from volume '%s': %s" % (self.volume_id, ex))
             raise
         if snapshot:
             return None
 
     def get_from_snap_id(self):
-        self.status()
         return self.from_snapshot_id
 
     def add(self):
         available over NFS
         """
         for counter in range(30):
-            if self.status() == volume_status.ATTACHED:
+            if self.status == volume_status.ATTACHED:
                 if os.path.exists(mount_point):
                     # Check if the mount location is empty
                     if len(os.listdir(mount_point)) != 0:
                 # Potentially wait for the device to actually become available in the system
                 # TODO: Do something if the device is not available in the given time period
                 for i in range(10):
-                    if os.path.exists(self.get_device()):
-                        log.debug("Device path {0} checked and it exists.".format(self.get_device()))
+                    if os.path.exists(self.device):
+                        log.debug("Device path {0} checked and it exists.".format(self.device))
                         break
                     else:
-                        log.debug("Device path {0} does not yet exists; waiting...".format(self.get_device()))
-                        time.sleep(6)
+                        log.debug("Device path {0} does not yet exists; waiting...".format(self.device))
+                        time.sleep(4)
                 # Until the underlying issue is fixed (see FIXME below), mask this
                 # even more by custom-handling the run command and thus not printing the err
-                cmd = '/bin/mount %s %s' % (self.get_device(), mount_point)
+                cmd = '/bin/mount %s %s' % (self.device, mount_point)
                 process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                 _, _ = process.communicate()
                 if process.returncode != 0:
                     # FIXME: Assume if a file system cannot be mounted that it's because
                     # there is not a file system on the device so try creating one
-                    if run('/sbin/mkfs.xfs %s' % self.get_device(),
-                        "Failed to create a files system on device %s" % self.get_device(),
-                        "Created a file system on device %s" % self.get_device()):
-                        if not run('/bin/mount %s %s' % (self.get_device(), mount_point),
-                            "Error mounting file system %s from %s" % (mount_point, self.get_device()),
-                            "Successfully mounted file system %s from %s" % (mount_point, self.get_device())):
+                    if run('/sbin/mkfs.xfs %s' % self.device,
+                        "Failed to create a files ystem on device %s" % self.device,
+                        "Created a file system on device %s" % self.device):
+                        if not run('/bin/mount %s %s' % (self.device, mount_point),
+                            "Error mounting file system %s from %s" % (mount_point, self.device),
+                            "Successfully mounted file system %s from %s" % (mount_point, self.device)):
                             log.error("Failed to mount device '%s' to mount point '%s'"
-                                % (self.get_device(), mount_point))
+                                % (self.device, mount_point))
                             return False
                 else:
                     log.info("Successfully mounted file system {0} from {1}".format(mount_point,
-                        self.get_device()))
+                        self.device))
                 try:
                     # Default owner of all mounted file systems to `galaxy` user
                     os.chown(mount_point, pwd.getpwnam("galaxy")[2], grp.getgrnam("galaxy")[2])
                 if self.fs.add_nfs_share(mount_point):
                     return True
             log.warning("Cannot mount volume '%s' in state '%s'. Waiting (%s/30)." % (self.volume_id,
-                self.status(), counter))
+                self.status, counter))
             time.sleep(2)
 
     def unmount(self, mount_point):

File cm/util/master.py

 from cm.util.decorators import TestFlag
 
 import cm.util.paths as paths
-from boto.exception import EC2ResponseError, S3ResponseError
+from boto.exception import EC2ResponseError, BotoClientError, BotoServerError, S3ResponseError
 
 log = logging.getLogger('cloudman')
 
                     # Based on the kind, add the appropriate file system. We can
                     # handle 'volume', 'snapshot', or 'bucket' kind
                     if fs['kind'] == 'volume':
-                        for vol_id in fs['ids']:
-                            filesystem.add_volume(vol_id=vol_id)
+                        if 'ids' not in fs and 'size' in fs:
+                            filesystem.add_volume(size=fs['size'])
+                        else:    
+                            for vol_id in fs['ids']:
+                                filesystem.add_volume(vol_id=vol_id)
                     elif fs['kind'] == 'snapshot':
                         for snap in fs['ids']:
                             # Check if an already attached volume maps to this snapshot
                     if not processed_service:
                         log.warning("Could not find service class matching service entry '%s'?" % service_name)
             return True
-        except Exception, e:
+        except (BotoClientError,BotoServerError) as e:
             log.error("Error reading existing cluster configuration file: %s" % e)
             self.manager_started = False
             return False
 
     def all_fs_status_text(self):
         return []
+    # FIXME: unreachable code
         tr = []
         for key, vol in self.volumes.iteritems():
             if vol[3] is None:
 
     def all_fs_status_array(self):
         return []
+        # FIXME: unreachable code
         tr = []
         for key, vol in self.volumes.iteritems():
             if vol[3] is None:
             if self.master_exec_host is True or force_removal:
                 self.master_exec_host = False
                 if not sge_svc._remove_instance_from_exec_list(self.app.cloud_interface.get_instance_id(),
-                        self.app.cloud_interface.get_self_private_ip()):
+                        self.app.cloud_interface.get_private_ip()):
                     # If the removal was unseccessful, reset the flag
                     self.master_exec_host = True
             else:
                 self.master_exec_host = True
                 if not sge_svc._add_instance_as_exec_host(self.app.cloud_interface.get_instance_id(),
-                        self.app.cloud_interface.get_self_private_ip()):
+                        self.app.cloud_interface.get_private_ip()):
                     # If the removal was unseccessful, reset the flag
                     self.master_exec_host = False
         else:
         log.debug("Trying to discover any worker instances associated with this cluster...")
         filters = {'tag:clusterName': self.app.ud['cluster_name'], 'tag:role': 'worker'}
         try:
-            ec2_conn = self.app.cloud_interface.get_ec2_connection()
-            reservations = ec2_conn.get_all_instances(filters=filters)
+            reservations = self.app.cloud_interface.get_all_instances(filters=filters)
             for reservation in reservations:
                 if reservation.instances[0].state != 'terminated' and reservation.instances[0].state != 'shutting-down':
                     i = Instance(self.app, inst=reservation.instances[0], m_state=reservation.instances[0].state, reboot_required=True)
         log.debug("Trying to discover any volumes attached to this instance...")
         attached_volumes = []
         try:
-            if self.app.cloud_type == 'ec2':
-                # filtering w/ boto is supported only with ec2
-                f = {'attachment.instance-id': self.app.cloud_interface.get_instance_id()}
-                attached_volumes = self.app.cloud_interface.get_ec2_connection().get_all_volumes(filters=f)
-            else:
-                volumes = self.app.cloud_interface.get_ec2_connection().get_all_volumes()
-                for vol in volumes:
-                    if vol.attach_data.instance_id == self.app.cloud_interface.get_instance_id():
-                        attached_volumes.append(vol)
-        except Exception, e:
+            f = {'attachment.instance-id': self.app.cloud_interface.get_instance_id()}
+            volumes = self.app.cloud_interface.get_all_volumes(filters=f)
+        except EC2ResponseError, e:
             log.debug( "Error checking for attached volumes: %s" % e )
-        log.debug("Attached volumes: %s" % attached_volumes)
-        return attached_volumes
+        log.debug("Attached volumes: %s" % volumes)
+        return volumes
 
     def shutdown(self, sd_galaxy=True, sd_sge=True, sd_postgres=True, sd_filesystems=True,
                 sd_instances=True, sd_autoscaling=True, delete_cluster=False, sd_spot_requests=True,
     def delete_cluster(self):
         log.info("All services shut down; deleting this cluster.")
         # Delete any remaining volume(s) assoc. w/ given cluster
-        vols = []
+        filters = {'tag:clusterName': self.app.ud['cluster_name']}
         try:
-            if self.app.cloud_type == 'ec2':
-                filters = {'tag:clusterName': self.app.ud['cluster_name']}
-                ec2_conn = self.app.cloud_interface.get_ec2_connection()
-                vols = ec2_conn.get_all_volumes(filters=filters)
+            vols = self.app.cloud_interface.get_all_volumes(filters=filters)
             for vol in vols:
                 log.debug("As part of cluster deletion, deleting volume '%s'" % vol.id)
-                ec2_conn.delete_volume(vol.id)
+                vol.delete()
         except EC2ResponseError, e:
             log.error("Error deleting volume %s: %s" % (vol.id, e))
         # Delete cluster bucket on S3
             #     log.debug( "Idle instances' DNs: %s" % idle_instances_dn )
 
             for idle_instance_dn in idle_instances_dn:
-                 for w_instance in self.worker_instances:
-                     # log.debug("Trying to match worker instance with private IP '%s' to idle "
-                     #    "instance '%s'" % (w_instance.get_local_hostname(), idle_instance_dn))
-                     if w_instance.get_local_hostname() is not None:
-                         if w_instance.get_local_hostname().lower().startswith(str(idle_instance_dn).lower()):
+                for w_instance in self.worker_instances:
+                    # log.debug("Trying to match worker instance with private IP '%s' to idle "
+                    #    "instance '%s'" % (w_instance.get_local_hostname(), idle_instance_dn))
+                    if w_instance.get_local_hostname() is not None:
+                        if w_instance.get_local_hostname().lower().startswith(str(idle_instance_dn).lower()):
                             # log.debug("Marking instance '%s' with FQDN '%s' as idle." \
                             #     % (w_instance.id, idle_instance_dn))
                             idle_instances.append( w_instance )
         log.info("Specific reboot of instance '%s' requested." % instance_id)
         for inst in self.worker_instances:
             if inst.id == instance_id:
-               inst.reboot()
+                inst.reboot()
         log.info("Initiated requested reboot of instance. Rebooting '%s'." % instance_id)
 
     def add_instances( self, num_nodes, instance_type='', spot_price=None):
         instance object in the process. """
         try:
             log.debug("Adding live instance '%s'" % instance_id)
-            ec2_conn = self.app.cloud_interface.get_ec2_connection()
-            reservation = ec2_conn.get_all_instances([instance_id])
+            reservation = self.app.cloud_interface.get_all_instances(instance_id)
             if reservation and len(reservation[0].instances)==1:
                 instance = reservation[0].instances[0]
                 if instance.state != 'terminated' and instance.state != 'shutting-down':
                     self.app.cloud_interface.add_tag(instance, 'clusterName', self.app.ud['cluster_name'])
                     self.app.cloud_interface.add_tag(instance, 'role', 'worker') # Default to 'worker' role tag
                     self.worker_instances.append(i)
+                    i.send_alive_request() # to make sure info like ip-address and hostname are updated
+                    log.debug('Added instance {0}....'.format(instance_id))
                 else:
                     log.debug("Live instance '%s' is at the end of its life (state: %s); not adding the instance." % (instance_id, instance.state))
                 return True
         if worker_id:
             log.info( "Checking status of instance '%s'" % worker_id )
             try:
-                ec2_conn = self.app.cloud_interface.get_ec2_connection()
-                reservation = ec2_conn.get_all_instances( worker_id.strip() )
+                reservation = self.app.cloud_interface.get_all_instances( worker_id.strip() )
                 if reservation:
                     workers_status[ reservation[0].instances[0].id ] = reservation[0].instances[0].state
             except Exception, e:
         shutil.copy("%s-tmp" % file_name, file_name)
 
     def get_status_dict( self ):
-        public_ip = self.app.cloud_interface.get_self_public_ip();
+        public_ip = self.app.cloud_interface.get_public_ip()
         if self.app.TESTFLAG:
             num_cpus = 1
             load = "0.00 0.02 0.39"
             # Set 'role' and 'clusterName' tags for the master instance
             try:
                 i_id = self.app.cloud_interface.get_instance_id()
-                ec2_conn = self.app.cloud_interface.get_ec2_connection()
-                ir = ec2_conn.get_all_instances([i_id])
+                ir = self.app.cloud_interface.get_all_instances(i_id)
                 self.app.cloud_interface.add_tag(ir[0].instances[0], 'clusterName', self.app.ud['cluster_name'])
                 self.app.cloud_interface.add_tag(ir[0].instances[0], 'role', self.app.ud['role'])
             except EC2ResponseError, e:
                 log.debug("Error setting 'role' tag: %s" % e)
-            except Exception, e:
-                log.debug("General error setting 'role' tag: %s" % e)
         self.monitor_thread.start()
 
     def shutdown( self ):
             cc = {} # cluster configuration
             svcs = [] # list of services
             fss = [] # list of filesystems
+            cc['tags'] = self.app.cloud_interface.tags # save cloud tags, in case the cloud doesn't support them natively
             for srvc in self.app.manager.services:
                 if srvc.svc_type=='Filesystem':
                     # Transient file systems do not get persisted
         except:
             pass
         # If not existent, save current boot script cm_boot.py to cluster's bucket
-        if not misc.file_exists_in_bucket(s3_conn, self.app.ud['bucket_cluster'], self.app.ud['boot_script_name']) and os.path.exists(os.path.join(self.app.ud['boot_script_path'], self.app.ud['boot_script_name'])):
+        # BUG: workaround eucalyptus Walrus, which hangs on returning saved file status if misc.file_exists_in_bucket() called first
+        # if not misc.file_exists_in_bucket(s3_conn, self.app.ud['bucket_cluster'], self.app.ud['boot_script_name']) and os.path.exists(os.path.join(self.app.ud['boot_script_path'], self.app.ud['boot_script_name'])):
+        if 1:
             log.debug("Saving current instance boot script (%s) to cluster bucket '%s' as '%s'" % (os.path.join(self.app.ud['boot_script_path'], self.app.ud['boot_script_name']), self.app.ud['bucket_cluster'], self.app.ud['boot_script_name']))
             misc.save_file_to_bucket(s3_conn, self.app.ud['bucket_cluster'], self.app.ud['boot_script_name'], os.path.join(self.app.ud['boot_script_path'], self.app.ud['boot_script_name']))
         # If not existent, save CloudMan source to cluster's bucket, including file's metadata
-        if not misc.file_exists_in_bucket(s3_conn, self.app.ud['bucket_cluster'], 'cm.tar.gz') and os.path.exists(os.path.join(self.app.ud['cloudman_home'], 'cm.tar.gz')):
+        # BUG : workaround eucalyptus Walrus, which hangs on returning saved file status if misc.file_exists_in_bucket() called first
+        # if not misc.file_exists_in_bucket(s3_conn, self.app.ud['bucket_cluster'], 'cm.tar.gz') and os.path.exists(os.path.join(self.app.ud['cloudman_home'], 'cm.tar.gz')):
+        if 1:
             log.debug("Saving CloudMan source (%s) to cluster bucket '%s' as '%s'" % (os.path.join(self.app.ud['cloudman_home'], 'cm.tar.gz'), self.app.ud['bucket_cluster'], 'cm.tar.gz'))
             misc.save_file_to_bucket(s3_conn, self.app.ud['bucket_cluster'], 'cm.tar.gz', os.path.join(self.app.ud['cloudman_home'], 'cm.tar.gz'))
             try:
                 log.debug("Error setting revision metadata on newly copied cm.tar.gz in bucket %s: %s" % (self.app.ud['bucket_cluster'], e))
         # Create an empty file whose name is the name of this cluster (useful as a reference)
         cn_file = os.path.join(self.app.ud['cloudman_home'], "%s.clusterName" % self.app.ud['cluster_name'])
-        if not misc.file_exists_in_bucket(s3_conn, self.app.ud['bucket_cluster'], "%s.clusterName" % self.app.ud['cluster_name']):
+        # BUG : workaround eucalyptus Walrus, which hangs on returning saved file status if misc.file_exists_in_bucket() called first
+        # if not misc.file_exists_in_bucket(s3_conn, self.app.ud['bucket_cluster'], "%s.clusterName" % self.app.ud['cluster_name']):
+        if 1:
             with open(cn_file, 'w'):
                 pass
             if os.path.exists(cn_file):
         if deep is True: # reset the current local instance field
             self.inst = None
         if self.inst is None and self.id is not None:
-            ec2_conn = self.app.cloud_interface.get_ec2_connection()
             try:
-                rs = ec2_conn.get_all_instances([self.id])
+                rs = self.app.cloud_interface.get_all_instances(self.id)
                 if len(rs) == 0:
                     log.warning("Instance {0} not found on the cloud?".format(self.id))
                 for r in rs:
                 self.m_state = instance_states.TERMINATED
         return self.m_state
 
+    @TestFlag(None)
+    def send_alive_request(self):
+        self.app.manager.console_monitor.conn.send( 'ALIVE_REQUEST', self.id 
+                                                    )
     def send_status_check( self ):
         # log.debug("\tMT: Sending STATUS_CHECK message" )
         if self.app.TESTFLAG is True:
         # log.info("\tMT: Sending restart message to worker %s" % self.id)
         if self.app.TESTFLAG is True:
             return
-        self.app.manager.console_monitor.conn.send( 'RESTART | %s' % self.app.cloud_interface.get_self_private_ip(), self.id )
+        self.app.manager.console_monitor.conn.send( 'RESTART | %s' % self.app.cloud_interface.get_private_ip(), self.id )
         log.info( "\tMT: Sent RESTART message to worker '%s'" % self.id )
 
     def update_spot(self, force=False):
                 # Instance is alive and functional. Send master pubkey.
                 self.send_master_pubkey()
                 # Add hostname to /etc/hosts (for SGE config)
-                if self.app.cloud_type == 'openstack':
+                if self.app.cloud_type in ('openstack','eucalyptus'):
                     worker_host_line = '{ip}\t{hostname}\n'.format(ip=self.private_ip, \
                         hostname=self.local_hostname)
                     log.debug("worker_host_line: {0}".format(worker_host_line))
                     % self.id )
                 try:
                     sge_svc = self.app.manager.get_services('SGE')[0]
-                    if sge_svc.add_sge_host(self.get_id(), self.get_private_ip()):
+                    if sge_svc.add_sge_host(self.get_id(), self.local_hostname):
                         # Send a message to worker to start SGE
                         self.send_start_sge()
                         # If there are any bucket-based FSs, tell the worker to add those
                 msplit = msg.split( ' | ' )
                 self.worker_status = msplit[1]
             else: # Catch-all condition
-               log.debug( "Unknown Message: %s" % msg )
+                log.debug( "Unknown Message: %s" % msg )
         else:
             log.error( "Epic Failure, squeue not available?" )
 

File cm/util/misc.py

 import subprocess
 import threading
 import datetime as dt
-from tempfile import mkstemp
-from shutil import move
+from tempfile import mkstemp, NamedTemporaryFile
+import shutil
 import os
-from os import remove, close
 import contextlib
 import errno
 
     """Merge fields from user data (user) YAML object and default data (default)
     YAML object. If there are conflicts, value from the user data object are
     kept."""
-    if isinstance(user,dict) and isinstance(default,dict):
-        for k,v in default.iteritems():
+    if isinstance(user, dict) and isinstance(default, dict):
+        for k, v in default.iteritems():
             if k not in user:
                 user[k] = v
             else:
-                user[k] = merge_yaml_objects(user[k],v)
+                user[k] = merge_yaml_objects(user[k], v)
     return user
 
 def shellVars2Dict(filename):
     while True:
         try:
             fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR)
-        except OSError, e:
+        except OSError as e:
             if e.errno != errno.EEXIST:
                 raise
             time.sleep(wait_delay)
     if s3_conn is None:
         log.debug("Checking if s3 bucket exists, no s3 connection specified... it does not")
         return False
-    if bucket_name is not None:
+    if bucket_name:
         try:
-            b = None
             b = s3_conn.lookup(bucket_name, validate=validate)
-            if b is not None:
+            if b:
                 # log.debug("Checking if bucket '%s' exists... it does." % bucket_name)
                 return True
             else:
                 log.debug("Checking if bucket '%s' exists... it does not." % bucket_name)
                 return False
-        except Exception, e:
+        except S3ResponseError as e:
             log.error("Failed to lookup bucket '%s': %s" % (bucket_name, e))
     else:
         log.error("Cannot lookup bucket with no name.")
     try:
         s3_conn.create_bucket(bucket_name)
         log.debug("Created bucket '%s'." % bucket_name)
-    except Exception, e:
+    except S3ResponseError as e:
         log.error( "Failed to create bucket '%s': %s" % (bucket_name, e))
         return False
     return True
     return b
 
 def make_bucket_public(s3_conn, bucket_name, recursive=False):
-    b = None
     b = get_bucket(s3_conn, bucket_name)
-    if b is not None:
+    if b:
         try:
             b.make_public(recursive=recursive)
             log.debug("Bucket '%s' made public" % bucket_name)
             return True
-        except S3ResponseError, e:
+        except S3ResponseError as e:
             log.error("Could not make bucket '%s' public: %s" % (bucket_name, e))
     return False
 
 def make_key_public(s3_conn, bucket_name, key_name):
-    b = None
     b = get_bucket(s3_conn, bucket_name)
-    if b is not None:
+    if b:
         try:
             k = Key(b, key_name)
             if k.exists():
                 k.make_public()
                 log.debug("Key '%s' made public" % key_name)
                 return True
-        except S3ResponseError, e:
+        except S3ResponseError as e:
             log.error("Could not make key '%s' public: %s" % (key_name, e))
     return False
 
                       will apply the grant to all keys within the bucket
                       or not.
     """
-    b = None
     b = get_bucket(s3_conn, bucket_name)
-    if b is not None:
+    if b:
         try:
             for c_id in cannonical_ids:
                 log.debug("Adding '%s' permission for bucket '%s' for users '%s'" % (permission, bucket_name, c_id))
                 b.add_user_grant(permission, c_id, recursive)
             return True
-        except S3ResponseError, e:
+        except S3ResponseError as e:
             log.error("Could not add permission '%s' for bucket '%s': %s" % (permission, bucket_name, e))
     return False
 
     :param cannonical_ids: A list of strings with canonical user ids associated 
                         with the AWS account your are granting the permission to.
     """
-    b = None
     b = get_bucket(s3_conn, bucket_name)
-    if b is not None:
+    if b:
         try:
             k = Key(b, key_name)
             if k.exists():
                     log.debug("Adding '%s' permission for key '%s' for user '%s'" % (permission, key_name, c_id))
                     k.add_user_grant(permission, c_id)
                 return True
-        except S3ResponseError, e:
+        except S3ResponseError as e:
             log.error("Could not add permission '%s' for bucket '%s': %s" % (permission, bucket_name, e))
     return False
 
     users=[] # Current list of users retrieved from folder's ACL
     key_list = None
     key_acl = None
-    b = None
     b = get_bucket(s3_conn, bucket_name)
-    if b is not None:
+    if b:
         try:
             key_list = b.get_all_keys(prefix=folder_name, delimiter='/')
             # for k in key_list:
                     for pu in power_users:
                         if pu in users:
                             users.remove(pu)
-        except S3ResponseError, e:
+        except S3ResponseError as e:
             log.error("Error getting list of folder '%s' users for bucket '%s': %s" % (folder_name, bucket_name, e))
     # log.debug("List of users for folder '%s' in bucket '%s': %s" % (folder_name, bucket_name, users))
     return users
     other_users = [] # List of users on other (shared) folders in given bucket
     folder_users = get_list_of_bucket_folder_users(s3_conn, bucket_name, folder_name)
     # log.debug("List of users on to-be-deleted shared folder '%s': %s" % (folder_name, folder_users))
-    b = None
     b = get_bucket(s3_conn, bucket_name)
-    if b is not None:
+    if b:
         try:
             # Get list of shared folders in given bucket
             folder_list = b.get_all_keys(prefix='shared/', delimiter='/')
             for u in folder_users:
                 if u not in other_users:
                     users_with_grant.append(u)
-        except S3ResponseError, e:
+        except S3ResponseError as e:
             log.error("Error isolating list of folder '%s' users for bucket '%s': %s" % (folder_name, bucket_name, e))
     log.debug("List of users whose bucket grant is to be removed because shared folder '%s' is being deleted: %s" \
         % (folder_name, users_with_grant))
             #     log.debug("Grant -> permission: %s, user name: %s, grant type: %s" % (g.permission, g.display_name, g.type))
             log.debug("Removed grants on bucket '%s' for these users: %s" % (bucket_name, users_whose_grant_to_remove))
             return True
-        except S3ResponseError, e:
+        except S3ResponseError as e:
             log.error("Error adjusting ACL for bucket '%s': %s" % (bucket_name, e))
     return False
 
     :return: True if remote_filename exists in bucket_name
              False otherwise
     """
-    b = None
     b = get_bucket(s3_conn, bucket_name)
-    if b is not None:
+    if b:
         try:
             k = Key(b, remote_filename)
             if k.exists():
         try:
             # Time format must be matched the time provided by boto field .last_modified
             k_ts = dt.datetime.strptime(key.last_modified, "%a, %d %b %Y %H:%M:%S GMT")
-        except Exception, e:
+        except Exception as e:
             log.debug("Could not get last modified timestamp for key '%s': %s" % (remote_filename, e))
             return True
         try:
             return k_ts < dt.datetime.fromtimestamp(os.path.getmtime(local_filename))
-        except Exception, e:
+        except Exception as e:
             log.debug("Trouble comparing local (%s) and remote (%s) file modified times: %s" % (local_filename, remote_filename, e))
             return True
     else:
             k.get_contents_to_filename(local_file)
             log.info( "Retrieved file '%s' from bucket '%s' to '%s'." \
                          % (remote_filename, bucket_name, local_file))
-        except S3ResponseError, e:
+        except S3ResponseError as e:
             log.debug( "Failed to get file '%s' from bucket '%s': %s" % (remote_filename, bucket_name, e))
-            remove(local_file) # Don't leave a partially downloaed or touched file
+            os.remove(local_file) # Don't leave a partially downloaded or touched file
             return False
     else:
-      log.debug("Bucket '%s' does not exist, did not get remote file '%s'" % (bucket_name, remote_filename))
-      return False
+        log.debug("Bucket '%s' does not exist, did not get remote file '%s'" % (bucket_name, remote_filename))
+        return False
     return True
 
 def save_file_to_bucket( conn, bucket_name, remote_filename, local_file ):
-    b = None
     b = get_bucket(conn, bucket_name)
-    if b is not None:
+    if b:
         k = Key( b, remote_filename )
         try:
             k.set_contents_from_filename( local_file )
             log.info( "Saved file '%s' to bucket '%s'" % ( remote_filename, bucket_name ) )
             # Store some metadata (key-value pairs) about the contents of the file being uploaded
             k.set_metadata('date_uploaded', dt.datetime.utcnow())
-        except S3ResponseError, e:
-             log.error( "Failed to save file local file '%s' to bucket '%s' as file '%s': %s" % ( local_file, bucket_name, remote_filename, e ) )
-             return False
+        except S3ResponseError as e:
+            log.error( "Failed to save file local file '%s' to bucket '%s' as file '%s': %s" % ( local_file, bucket_name, remote_filename, e ) )
+            return False
         return True
     else:
         log.debug("Could not connect to bucket '%s'; remote file '%s' not saved to the bucket" % (bucket_name, remote_filename))
             log.debug("Copying file '%s/%s' to file '%s/%s'" % (src_bucket_name, orig_filename, dest_bucket_name, copy_filename))
             k.copy(dest_bucket_name, copy_filename, preserve_acl=preserve_acl)
             return True
-        except S3ResponseError, e: 
+        except S3ResponseError as e: 
             log.debug("Error copying file '%s/%s' to file '%s/%s': %s" % (src_bucket_name, orig_filename, dest_bucket_name, copy_filename, e))
     return False
 
 def delete_file_from_bucket(conn, bucket_name, remote_filename):
-    b = None
     b = get_bucket(conn, bucket_name)
-    if b is not None:
+    if b:
         try:
             k = Key( b, remote_filename )
             log.debug( "Deleting key object '%s' from bucket '%s'" % (remote_filename, bucket_name))
             k.delete()
             return True
-        except S3ResponseError, e:
+        except S3ResponseError as e:
             log.error("Error deleting key '%s' from bucket '%s': %s" % (remote_filename, bucket_name, e))
     return False
 
     """Delete given bucket. This method will iterate through all the keys in
     the given bucket first and delete them. Finally, the bucket will be deleted."""
     try:
-        b = None
         b = get_bucket(conn, bucket_name)
-        if b is not None:
+        if b:
             keys = b.get_all_keys()
             for key in keys:
                 key.delete()
             b.delete()
             log.info("Successfully deleted cluster bucket '%s'" % bucket_name)
-    except S3ResponseError, e:
+    except S3ResponseError as e:
         log.error("Error deleting bucket '%s': %s" % (bucket_name, e))
     return True
 
     found, the method returns None.    
     """
     log.debug("Getting metadata '%s' for file '%s' from bucket '%s'" % (metadata_key, remote_filename, bucket_name))
-    b = None
     b = get_bucket(conn, bucket_name)
-    if b is not None:
+    if b:
         k = b.get_key(remote_filename)
         if k and metadata_key:
             return k.get_metadata(metadata_key)
     """
     log.debug("Setting metadata '%s' for file '%s' in bucket '%s'" % (metadata_key, remote_filename, bucket_name))
 
-    b = None
     b = get_bucket(conn, bucket_name)
-    if b is not None:
+    if b:
         k = b.get_key(remote_filename)
         if k and metadata_key:
             # Simply setting the metadata through set_metadata does not work.
             try:
                 k.copy(bucket_name, remote_filename, metadata={metadata_key:metadata_value}, preserve_acl=True)
                 return True
-            except Exception, e:
+            except S3ResponseError as e:
                 log.debug("Could not set metadata for file '%s' in bucket '%s': %e" % (remote_filename, bucket_name, e))
     return False
 
 def check_process_running(proc_in):
     ps = subprocess.Popen("ps ax", shell=True, stdout=subprocess.PIPE)
-    lines = str(ps.communicate()[0], 'utf-8')
+    lines = ps.communicate()[0]
     for line in lines.split('\n'):
         if line.find(proc_in) != -1:
             log.debug(line)
     log.debug("Getting size of volume '%s'" % vol_id) 
     try: 
         vol = ec2_conn.get_all_volumes(vol_id)
-    except EC2ResponseError, e:
+    except EC2ResponseError as e:
         log.error("Volume ID '%s' returned an error: %s" % (vol_id, e))
         return 0 
 
     stdout, stderr = process.communicate()
     if process.returncode == 0:
         log.debug(ok)
-        return True
+        if stdout:
+            return stdout
+        else:
+            return True
     else:
         log.error("%s, running command '%s' returned code '%s' and following stderr: '%s'" % (err, cmd, process.returncode, stderr))
         return False
         new_file.write(line.replace(pattern, subst))
     # Close temp file
     new_file.close()
-    close(fh)
+    os.close(fh)
     old_file.close()
     # Remove original file
-    remove(file_name)
+    os.remove(file_name)
     # Move new file
-    move(abs_path, file_name)
+    shutil.move(abs_path, file_name)
 
 def _if_not_installed(prog_name):
     """Decorator that checks if a callable program is installed.
         return decorator
     return argcatcher
 
+def add_to_etc_hosts(hostname, ip_address):
+    try:
+        etc_hosts = open('/etc/hosts','r')
+        tmp = NamedTemporaryFile()
+        for l in etc_hosts:
+            if not hostname in l:
+                tmp.write(l)
+        etc_hosts.close()
+        # add a line for the new hostname
+        tmp.write('{0} {1}\n'.format(ip_address, hostname))
+
+        # make sure changes are written to disk
+        tmp.flush()
+        os.fsync(tmp.fileno())
+        # swap out /etc/hosts
+        run('cp /etc/hosts /etc/hosts.orig')
+        run('cp {0} /etc/hosts'.format(tmp.name))
+        run('chmod 644 /etc/hosts')
+    except (IOError, OSError) as e:
+        log.error('could not update /etc/hosts. {0}'.format(e))
+
+def remove_from_etc_hosts(hostname):
+    try:
+        etc_hosts = open('/etc/hosts','r')
+        tmp = NamedTemporaryFile()
+