Commits

Enis Afgan  committed dfe63da

Introduced a new format for the cluster configuration file (namely,
persistent_data.yaml in cluster's bucket). Any old files will be
automatically converted to this new format on cluster status update.
See persistent_data.yaml.sample for an exmaple.

  • Participants
  • Parent commits 3ec7399

Comments (0)

Files changed (4)

File cm/services/data/filesystem.py

 
 
 class Filesystem(DataService):
-    def __init__(self, app, name):
+    def __init__(self, app, name, mount_point=None):
         super(Filesystem, self).__init__(app)
         self.svc_type = "Filesystem"
         self.nfs_lock_file = '/tmp/nfs.lockfile'
         self.name = name
         self.size = None
         self.dirty = False
-        self.mount_point = '/mnt/%s' % self.name
+        self.kind = None # Choice of 'snapshot', 'volume', or 'bucket'
+        self.mount_point = mount_point if mount_point is not None else '/mnt/%s' % self.name
         self.grow = None # Used (APPLICABLE ONLY FOR the galaxyData FS) to indicate a need to grow
                          # the file system; use following dict structure:
                          # {'new_size': <size>, 'snap_desc': <snapshot description>}
             for vol in self.volumes:
                 vol.create(self.name)
                 # Mark a volume as 'static' if created from a snapshot
-                # Note that if a volume is marked as 'static', it is assumed it 
+                # Note that if a volume is marked as 'static', it is assumed it
                 # can be deleted upon cluster termination!
                 if self.name != 'galaxyData' and vol.from_snapshot_id is not None:
                     log.debug("Marked volume '%s' from file system '%s' as 'static'" % (vol.volume_id, self.name))
                     vol.static = True
+                    self.kind= 'snapshot'
+                else:
+                    self.kind = 'volume'
                 if vol.attach():
                     self.mount(vol)
             for b in self.buckets:
+                self.kind = 'bucket'
                 threading.Thread(target=b.mount).start()
                 log.debug("Initiated addition of FS from bucket {0}".format(b.bucket_name))
         except Exception, e:
             log.debug("Did not check status of filesystem '%s' with mount point '%s' in state '%s'" \
                 % (self.name, self.mount_point, self.state))
     
-    def add_volume(self, vol_id=None, size=None, from_snapshot_id=None):
+    def add_volume(self, vol_id=None, size=0, from_snapshot_id=None):
         log.debug("Adding Volume (id={id}, size={size}, snap={snap}) into Filesystem {fs}"\
             .format(id=vol_id, size=size, snap=from_snapshot_id, fs=self.get_full_name()))
         self.volumes.append(Volume(self, vol_id=vol_id, size=size, from_snapshot_id=from_snapshot_id))

File cm/services/data/volume.py

                     self.volume_id = None
                     return volume_status.NONE
                 self.from_snapshot_id = self.volume.snapshot_id
+                self.size = self.volume.size
                 if self.from_snapshot_id is '': # ensure consistency
                     self.from_snapshot_id = None
                 if self.volume.status == 'creating':
             try:
                 log.debug("Creating a new volume of size '%s' in zone '%s' from snapshot '%s'" \
                     % (self.size, self.app.cloud_interface.get_zone(), self.from_snapshot_id))
-                self.volume = self.app.cloud_interface.get_ec2_connection().create_volume(self.size,
-                    self.app.cloud_interface.get_zone(), snapshot=self.from_snapshot_id)
-                self.volume_id = str(self.volume.id)
-                self.size = int(self.volume.size)
-                log.debug("Created new volume of size '%s' from snapshot '%s' with ID '%s' in zone '%s'" \
-                    % (self.size, self.from_snapshot_id, self.get_full_name(), \
-                    self.app.cloud_interface.get_zone()))
+                if self.size > 0:
+                    self.volume = self.app.cloud_interface.get_ec2_connection().create_volume(self.size,
+                        self.app.cloud_interface.get_zone(), snapshot=self.from_snapshot_id)
+                    self.volume_id = str(self.volume.id)
+                    self.size = int(self.volume.size)
+                    log.debug("Created new volume of size '%s' from snapshot '%s' with ID '%s' in zone '%s'" \
+                        % (self.size, self.from_snapshot_id, self.get_full_name(), \
+                        self.app.cloud_interface.get_zone()))
+                else:
+                    log.warning("Cannot create volume of size 0! Volume not created.")
             except EC2ResponseError, e:
                 log.error("Error creating volume: %s" % e)
         else:

File cm/util/master.py

         if self.app.LOCALFLAG is True:
             self.init_cluster(cluster_type='Galaxy')
 
-        # Add PSS service - however, this service will run only after the cluster
+        # Add PSS service - note that this service runs only after the cluster
         # type has been selected and all of the services are in RUNNING state
         self.app.manager.services.append(PSS(self.app))
 
         log.info("Completed the initial cluster startup process. {0}".format(cc_detail))
         return True
 
+    @TestFlag(False)
     def add_preconfigured_services(self):
-        """ Inspect cluster configuration and persistent data and add
-        available/preconfigured services. """
-        log.debug("Cheking for and adding any preconfigured services")
-        if self.app.TESTFLAG is True and self.app.LOCALFLAG is False:
-            log.debug("Attempted to add preconfigured cluster services but the TESTFLAG is set.")
-            return None
+        """
+        Inspect the cluster configuration and persistent data to add any
+        previously defined cluster services.
+        """
+        log.debug("Checking for and adding any previously defined cluster services")
         try:
-            # Make sure we have a complete Galaxy cluster configuration and don't start partial set of services
-            if self.app.ud.has_key("static_filesystems") and not self.app.ud.has_key("data_filesystems"):
-                log.warning("Conflicting cluster configuration loaded; corrupted persistent_data.yaml? Starting a new cluster.")
-                self.app.manager.initial_cluster_type = None
-                return True # True because we're dafaulting to starting a new cluster so start the manager
             attached_volumes = self.get_attached_volumes()
-            if self.app.ud.has_key("static_filesystems"):
+            # Test if depricated cluster config is used and handle it
+            if "static_filesystems" in self.app.ud or "data_filesystems" in self.app.ud:
+                log.debug("Handling depricated cluster config")
+                return self._handle_old_cluster_conf_format(attached_volumes)
+            # Process the current cluster config
+            if 'filesystems' in self.app.ud:
+                for fs in self.app.ud['filesystems']:
+                    log.debug("Adding a previously defined filesystem '{0}' of "\
+                        "kind '{1}'".format(fs['name'], fs['kind']))
+                    filesystem = Filesystem(self.app, fs['name'], fs.get('mount_point', None))
+                    # Based on the kind, add the appropriate file system. We can
+                    # handle 'volume', 'snapshot', or 'bucket' kind
+                    if fs['kind'] == 'volume':
+                        for vol_id in fs['ids']:
+                            filesystem.add_volume(vol_id=vol_id)
+                    elif fs['kind'] == 'snapshot':
+                        for snap in fs['ids']:
+                            # Check if an already attached volume maps to this snapshot
+                            att_vol = self.get_vol_if_fs(attached_volumes, fs['name'])
+                            if att_vol:
+                                filesystem.add_volume(vol_id=att_vol.id, size=att_vol.size,
+                                        from_snapshot_id=att_vol.snapshot_id)
+                            else:
+                                filesystem.add_volume(from_snapshot_id=snap)
+                    elif fs['kind'] == 'bucket':
+                        a_key = fs.get('access_key', None)
+                        s_key = fs.get('secret_key', None)
+                        filesystem.add_bucket(fs['name'], a_key, s_key)
+                    self.app.manager.services.append(filesystem)
+                    #self.app.manager.initial_cluster_type = 'Galaxy'
+            if "services" in self.app.ud:
+                for srvc in self.app.ud['services']:
+                    service_name = srvc['name']
+                    log.debug("Adding service: '%s'" % service_name)
+                    # TODO: translation from predefined service names into classes is not quite ideal...
+                    processed_service = False
+                    if service_name == 'Postgres':
+                        self.app.manager.services.append(PostgresService(self.app))
+                        self.app.manager.initial_cluster_type = 'Galaxy'
+                        processed_service = True
+                    if service_name == 'Galaxy':
+                        self.app.manager.services.append(GalaxyService(self.app))
+                        self.app.manager.initial_cluster_type = 'Galaxy'
+                        processed_service = True
+                    if not processed_service:
+                        log.warning("Could not find service class matching userData service entry: %s" % service_name)
+            return True
+        except Exception, e:
+            log.error("Error reading existing cluster configuration file: %s" % e)
+            self.manager_started = False
+            return False
+
+    def get_vol_if_fs(self, attached_volumes, filesystem_name):
+        """
+        Iterate through the list of (attached) volumes and check if any
+        one of them match the current cluster name and filesystem (as stored
+        in volume's tags). Return a matching volume (as a ``boto`` object) or
+        ``None``.
+
+        *Note* that this method returns the first matching volume and will thus
+        not work for filesystems composed of multiple volumes.
+        """
+        for vol in attached_volumes:
+            log.debug("Checking if vol '{0}' is file system '{1}'".format(vol.id, filesystem_name))
+            if self.app.cloud_interface.get_tag(vol, 'clusterName') == self.app.ud['cluster_name'] and \
+               self.app.cloud_interface.get_tag(vol, 'filesystem') == filesystem_name:
+                log.debug("Identified attached volume '%s' as filesystem '%s'" % (vol.id, filesystem_name))
+                return vol
+        return None
+
+    def _handle_old_cluster_conf_format(self, attached_volumes):
+        """
+        For backward compatibility, handle the old/deprecated cluster
+        configuration/persistent data file format, e.g.,::
+            data_filesystems:
+              galaxyData:
+              - size: 20
+                vol_id: vol-edfc9280
+            galaxy_home: /mnt/galaxyTools/galaxy-central
+            services:
+            - service: SGE
+            - service: Postgres
+            - service: Galaxy
+            static_filesystems:
+            - filesystem: galaxyIndices
+              size: 700
+              snap_id: !!python/unicode 'snap-5b030634'
+            - filesystem: galaxyTools
+              size: 2
+              snap_id: !!python/unicode 'snap-1688b978'
+        """
+        try:
+            # First make a backup of the deprecated config file
+            s3_conn = self.app.cloud_interface.get_s3_connection()
+            misc.copy_file_in_bucket(s3_conn, self.app.ud['bucket_cluster'], self.app.ud['bucket_cluster'],
+                    'persistent_data.yaml', 'persistent_data-deprecated.yaml', validate=False)
+            # Process the deprecated confgiuration now
+            if "static_filesystems" in self.app.ud:
                 for vol in self.app.ud['static_filesystems']:
                     fs = Filesystem(self.app, vol['filesystem'])
                     # Check if an already attached volume maps to the current filesystem
                     att_vol = self.get_vol_if_fs(attached_volumes, vol['filesystem'])
                     if att_vol:
-                        fs.add_volume(vol_id=att_vol.id, size=att_vol.size, from_snapshot_id=att_vol.snapshot_id)
+                        fs.add_volume(vol_id=att_vol.id, size=att_vol.size,
+                               from_snapshot_id=att_vol.snapshot_id)
                     else:
                         fs.add_volume(size=vol['size'], from_snapshot_id=vol['snap_id'])
                     log.debug("Adding static filesystem: '%s'" % vol['filesystem'])
                     self.app.manager.services.append(fs)
                     self.app.manager.initial_cluster_type = 'Galaxy'
-            if self.app.ud.has_key("data_filesystems"):
+            if "data_filesystems" in self.app.ud:
                 for fs, vol_array in self.app.ud['data_filesystems'].iteritems():
                     log.debug("Adding a previously existing data filesystem: '%s'" % fs)
                     fs = Filesystem(self.app, fs)
                         fs.add_volume(vol_id=vol['vol_id'], size=vol['size'])
                     self.app.manager.services.append(fs)
                     self.app.manager.initial_cluster_type = 'Data'
-            if self.app.ud.has_key("services"):
+            if "services" in self.app.ud:
                 for srvc in self.app.ud['services']:
                     service_name = srvc['service']
                     log.debug("Adding service: '%s'" % service_name)
                         log.warning("Could not find service class matching userData service entry: %s" % service_name)
             return True
         except Exception, e:
-            log.error("Error in filesystem YAML: %s" % e)
+            log.error("Error reading existing cluster configuration file: %s" % e)
             self.manager_started = False
             return False
 
-    def get_vol_if_fs(self, attached_volumes, filesystem_name):
-        """ Iterate through the list of (attached) volumes and check if any
-        one of them match the current cluster name and filesystem (as stored
-        in volume tags). Returns a matching volume or None.
-        Note that this method returns the first matching volume and will thus
-        not work for filesystems composed of multiple volumes. """
-        for vol in attached_volumes:
-            log.debug("Checking if vol '{0}' is file system '{1}'".format(vol.id, filesystem_name))
-            if self.app.cloud_interface.get_tag(vol, 'clusterName') == self.app.ud['cluster_name'] and \
-               self.app.cloud_interface.get_tag(vol, 'filesystem') == filesystem_name:
-                log.debug("Identified attached volume '%s' as filesystem '%s'" % (vol.id, filesystem_name))
-                return vol
-        return None
-
     def start_autoscaling(self, as_min, as_max, instance_type):
         as_svc = self.get_services('Autoscale')
         if not as_svc:
             log.debug("Problem adding a live instance (tried ID: %s): %s" % (instance_id, e))
         return False
 
-    def init_cluster(self, cluster_type, pss = None):
-        """ 
+    def init_cluster(self, cluster_type, pss=1):
+        """
         Initialize the type for this cluster and start appropriate services,
         storing the cluster configuration into the cluster's bucket.
 
         :param pss: Persistent Storage Size associated with data volumes being
                     created for the cluster
         """
+        def _add_data_fs():
+            """
+            A local convenience method used to add a new file system
+            """
+            fs_name = 'galaxyData'
+            log.debug("Creating a new data filesystem: '%s'" % fs_name)
+            fs = Filesystem(self.app, fs_name)
+            fs.add_volume(size=pss)
+            self.services.append(fs)
+
         if self.app.TESTFLAG is True and self.app.LOCALFLAG is False:
             log.debug("Attempted to initialize a new cluster of type '%s', but TESTFLAG is set." % cluster_type)
             return
             s3_conn = self.app.cloud_interface.get_s3_connection()
             snaps_file = 'cm_snaps.yaml'
             snaps = None
-            if self.app.ud.has_key('static_filesystems'):
-                snaps = self.app.ud['static_filesystems']
-            elif misc.get_file_from_bucket(s3_conn, self.app.ud['bucket_default'], 'snaps.yaml', snaps_file):
+            # Get a list of auto-mount/default/read-only/reference data sources
+            if misc.get_file_from_bucket(s3_conn, self.app.ud['bucket_default'], 'snaps.yaml', snaps_file):
                 snaps_file = misc.load_yaml_file(snaps_file)
                 snaps = snaps_file['static_filesystems']
+            # Turn those data sources into file systems
             if snaps:
                 attached_volumes = self.get_attached_volumes()
                 for snap in snaps:
                     log.debug("Adding a static filesystem '{0}' with volumes '{1}'"\
                         .format(fs.get_full_name(), fs.volumes))
                     self.services.append(fs)
-
-            # User data - add a new file system for user data of size 'pss'
+            # Add a file system for user's data now (OpenNebula and dummy clouds
+            # do not support volumes yet so skip those)
             if self.app.cloud_type not in ['opennebula', 'dummy']:
-                fs_name = 'galaxyData'
-                log.debug("Creating a new data filesystem: '%s'" % fs_name)
-                fs = Filesystem(self.app, fs_name)
-                fs.add_volume(size=pss)
-                self.services.append(fs)
-
-            # PostgreSQL
+                _add_data_fs()
+            # Add PostgreSQL service
             self.services.append(PostgresService(self.app))
-            # Galaxy
+            # Add Galaxy service
             self.services.append(GalaxyService(self.app))
         elif cluster_type == 'Data':
-            # Add required services:
-            # User data - add a new file system for user data of size 'pss'
-            fs_name = 'galaxyData'
-            log.debug("Creating a new data filesystem: '%s'" % fs_name)
-            fs = Filesystem(self.app, fs_name)
-            fs.add_volume(size=pss)
-            self.services.append(fs)
+            # Add a file system for user's data
+            _add_data_fs()
         elif cluster_type == 'SGE':
             # SGE service is automatically added at cluster start (see ``start`` method)
             pass
         self.app.manager._start_app_level_services()
         return True
 
-    def create_cluster_config_file(self, file_name=None, addl_data=None):
-        """ Take the current cluster service configuration and create a file
-        representation of it; by default, store the file in the current directory.
-
-        :type file_name: string
-        :param file_name: Name (or full path) of the file to save the configuration to
-
-        :type addl_data: dict of strings
-        :param addl_data: Any additional data to be included in the configuration
-
-        :rtype: string
-        :return: name of the newly created cluster configuration file
+    def create_cluster_config_file(self, file_name='persistent_data-current.yaml'):
+        """
+        Capture the current cluster configuration in a file (i.e., ``persistent_data.yaml``
+        in cluster's bucket). The generated file is stored in CloudMan's running
+        directory as ``file_name``.
         """
         try:
             cc = {} # cluster configuration
-            svl = [] # static volume list
-            dvd = {} # data volume dict
-            if addl_data:
-                cc = addl_data
+            svcs = [] # list of services
+            fss = [] # list of filesystems
             for srvc in self.app.manager.services:
                 if srvc.svc_type=='Filesystem':
-                    dvd_arr = []
-                    for vol in srvc.volumes:
-                        if vol.static and srvc.name!='galaxyData':
-                            svl.append({'filesystem': srvc.name, 'snap_id': vol.get_from_snap_id(), 'size': int(vol.size)})
-                        else:
-                            dvd_arr.append({'vol_id': str(vol.volume_id), 'size': int(vol.size)})
-                    if dvd_arr:
-                        dvd[srvc.name] = dvd_arr
+                    fs = {}
+                    fs['name'] = srvc.name
+                    fs['mount_point'] = srvc.mount_point
+                    fs['kind'] = srvc.kind
+                    if srvc.kind == 'bucket':
+                        fs['ids'] = [b.bucket_name for b in srvc.buckets]
+                    elif srvc.kind == 'volume':
+                        fs['ids'] = [v.volume_id for v in srvc.volumes]
+                    elif srvc.kind == 'snapshot':
+                        fs['ids'] = [v.from_snapshot_id for v in srvc.volumes]
+                    else:
+                        log.error("Unknown filesystem kind {0}".format(srvc.kind))
+                    fss.append(fs)
                 else:
-                    if cc.has_key('services'):
-                        cc['services'].append({'service': srvc.svc_type})
-                    else:
-                        cc['services'] = [{'service':srvc.svc_type}]
-            if svl: cc['static_filesystems'] = svl
-            if dvd: cc['data_filesystems'] = dvd
-            cc_file_name = file_name if file_name else 'cm_cluster_config.yaml'
-            misc.dump_yaml_to_file(cc, cc_file_name)
+                    s = {}
+                    s['name'] = srvc.svc_type
+                    if srvc.svc_type == 'Galaxy':
+                        s['home'] = paths.P_GALAXY_HOME
+                    svcs.append(s)
+            cc['filesystems'] = fss
+            cc['services'] = svcs
+            misc.dump_yaml_to_file(cc, file_name)
             # Reload the user data object in case anything has changed
             self.app.ud = misc.merge_yaml_objects(cc, self.app.ud)
         except Exception, e:
             log.error("Problem creating cluster configuration file: '%s'" % e)
-        return cc_file_name
+        return file_name
 
     def store_cluster_config(self):
-        """Create a cluster configuration file and store it into cluster bucket under name
-        'persistent_data.yaml'. The cluster configuration is considered the set of currently
+        """
+        Create a cluster configuration file and store it into cluster's bucket under name
+        ``persistent_data.yaml``. The cluster configuration is considered the set of currently
         seen services in the master.
-        In addition, local Galaxy configuration files, if they do not exist in
-        the cluster bucket, are saved to the bucket.
+
+        In addition, store the local Galaxy configuration files to the cluster's
+        bucket (do so only if they are not already there).
         """
         s3_conn = self.app.cloud_interface.get_s3_connection()
         if not misc.bucket_exists(s3_conn, self.app.ud['bucket_cluster']):
             misc.create_bucket(s3_conn, self.app.ud['bucket_cluster'])
         # Save/update the current Galaxy cluster configuration to cluster's bucket
-        clust_customizations = {'galaxy_home': paths.P_GALAXY_HOME}
-        cc_file_name = self.create_cluster_config_file(addl_data=clust_customizations)
+        cc_file_name = self.create_cluster_config_file()
         misc.save_file_to_bucket(s3_conn, self.app.ud['bucket_cluster'], 'persistent_data.yaml', cc_file_name)
         # Ensure Galaxy config files are stored in the cluster's bucket,
         # but only after Galaxy has been configured and is running (this ensures

File persistent_data.yaml.sample

+services:
+- name: SGE
+- name: Postgres
+- name: Galaxy
+  home: /mnt/galaxyTools/galaxy-central
+filesystems:
+- name: galaxyIndices
+  kind: snapshot
+  ids: [snap-6e5aa902]
+  mount_point: /mnt/galaxyTools/
+- name: galaxyData
+  kind: volume
+  ids: [vol-060a7f6e, vol-060a7f6e]
+  mount_point: /mnt/galaxyData/
+- name: 1000genomes
+  kind: bucket
+  mount_point: /mnt/1000genomes/
+  access_key: akey
+  secret_key: skey
+shared_cluster_info: NOT YET IMPLEMENTED!