Commits

Enis Afgan  committed 2371e8f

(WIP) Add the ability to add a remote NFS file system at run time as a local file system

  • Participants
  • Parent commits dd640a5

Comments (0)

Files changed (9)

File cm/controllers/root.py

             vol_id=None, vol_fs_name='',
             snap_id=None, snap_fs_name='',
             bucket_name='', bucket_fs_name='', bucket_a_key='', bucket_s_key='',
-            nfs_server=None, nfs_fs_name='', **kwargs):
+            nfs_server=None, nfs_fs_name='', nfs_username='', nfs_pwd='', **kwargs):
         """
         Decide on the new file system kind and call the appropriate manager method.
 
                     bucket_s_key = None
                 else:
                     bucket_s_key = bucket_s_key.strip()
-                self.app.manager.add_fs(bucket_name.strip(), bucket_fs_name.strip(),
+                self.app.manager.add_fs_bucket(bucket_name.strip(), bucket_fs_name.strip(),
                     bucket_a_key=bucket_a_key, bucket_s_key=bucket_s_key, persistent=persist)
             else:
-                log.error("Wanted to add a new file system from a bucket but no bucket name was provided")
+                log.error("Wanted to add a new file system from a bucket but no "
+                    "bucket name was provided.")
         elif fs_kind == 'volume' or fs_kind == 'snapshot':
-                log.debug("Adding '{2}' file system based on an existing {0}, {1}"\
+                log.debug("Adding '{2}' file system based on an existing {0}, {1}"
                     .format(fs_kind, vol_id if fs_kind == 'volume' else snap_id,
                         vol_fs_name if fs_kind == 'volume' else snap_fs_name))
         elif fs_kind == 'new_volume':
-            log.debug("Adding a new '{0}' file system: volume-based,{2} persistent,{3} to "\
-                "be deleted, of size {1}"\
-                .format(new_vol_fs_name, new_disk_size, ('' if persist else ' not'), ('' if dot else ' not')))
+            log.debug("Adding a new '{0}' file system: volume-based,{2} persistent,{3} to "
+                "be deleted, of size {1}"
+                .format(new_vol_fs_name, new_disk_size, ('' if persist else ' not'),
+                ('' if dot else ' not')))
         elif fs_kind == 'nfs':
-            log.debug("Adding a new '{0}' file system: nfs-based,{1} persistent."\
+            log.debug("Adding a new '{0}' file system: nfs-based,{1} persistent."
                 .format(nfs_fs_name, ('' if persist else ' not')))
+            self.app.manager.add_fs_nfs(nfs_server, nfs_fs_name, username=nfs_username,
+                pwd=nfs_pwd, persistent=persist)
         else:
             log.error("Wanted to add a file system but did not recognize kind {0}".format(fs_kind))
         return "Initiated file system addition"

File cm/services/data/filesystem.py

 
 from boto.exception import EC2ResponseError
 
-from cm.util import paths
 from cm.util.misc import run
 from cm.util.misc import flock
 from cm.services import service_states
 from cm.services import ServiceRole
 from cm.services.data import DataService
+from cm.services.data.nfs import NfsFS
 from cm.services.data.volume import Volume
 from cm.services.data.bucket import Bucket
 from cm.services.data.transient_storage import TransientStorage
         self.volumes = []  # A list of cm.services.data.volume.Volume objects
         self.buckets = []  # A list of cm.services.data.bucket.Bucket objects
         self.transient_storage = []  # Instance's transient storage
+        self.nfs_fs = None  # NFS file system object implementing this file system's device
         self.name = name  # File system name
         self.persistent = persistent  # Whether it should be part of the cluster config
         self.size = None  # Total size of this file system
         self.size_used = None  # Used size of the this file system
         self.size_pct = None  # Used percentage of this file system
         self.dirty = False
-        self.kind = None  # Choice of 'snapshot', 'volume', 'bucket', or 'transient'
+        self.kind = None  # Choice of 'snapshot', 'volume', 'bucket', 'transient', or 'nfs'
         self.mount_point = mount_point if mount_point is not None else os.path.join(
             self.app.path_resolver.mount_root, self.name)
         self.grow = None  # Used (APPLICABLE ONLY FOR the galaxyData FS) to indicate a need to grow
                          # the file system; use following dict structure:
                          # {'new_size': <size>, 'snap_desc': <snapshot description>}
-        self.started_starting = datetime.utcnow(
-        )  # A time stamp when the state changed to STARTING.
-                                                  # It is used to avoid brief ERROR states during
-                                                  # the system configuration.
+        self.started_starting = datetime.utcnow()  # A time stamp when the state changed to
+                                                   # STARTING; it is used to avoid brief ERROR
+                                                   # states during the system configuration.
 
     def get_full_name(self):
         """
                     self.get_full_name()))
                 self.state = service_states.STARTING
                 self.started_starting = datetime.utcnow()
+                # TODO: devices must be added to a file system before one can
+                # be `added` and thus we know what `kind` a FS is. So, instead of
+                # iterating over all devices, just use `self.kind`-based if/else, right?
+                # See `nfs` case as an example
                 for vol in self.volumes:
                     # Threading has some issues w/ race conditions over device IDs
                     # threading.Thread(target=vol.add).start()
                     self.kind = 'transient'
                     self.persistent = False
                     ts.add()
+                if self.kind == 'nfs':
+                    self.nfs_fs.start()
             except Exception, e:
                 log.error("Error adding file system service {0}: {1}".format(
                     self.get_full_name(), e))
         Initiate removal of this file system from the system; do it in a
         separate thread and return without waiting for the process to complete.
         """
-        log.info("Initiating removal of '{0}' data service with: volumes {1}, buckets {2}, and "
-                 "transient storage {3}".format(self.get_full_name(), self.volumes, self.buckets,
-                                                self.transient_storage))
+        log.info("Initiating removal of '{0}' data service with: volumes {1}, buckets {2}, "
+                 "transient storage {3}, and nfs server {4}".format(self.get_full_name(),
+                 self.volumes, self.buckets, self.transient_storage, self.nfs_fs))
         self.state = service_states.SHUTTING_DOWN
         r_thread = threading.Thread(target=self.__remove)
         r_thread.start()
             b.unmount()
         for t in self.transient_storage:
             t.remove()
+        if self.nfs_fs:
+            self.nfs_fs.stop()
         log.debug("Setting state of %s to '%s'" % (
             self.get_full_name(), service_states.SHUT_DOWN))
         self.state = service_states.SHUT_DOWN
         log.debug("Configuring instance transient storage at {0} with NFS.".format(
             self.mount_point))
         self.transient_storage.append(TransientStorage(self))
+
+    def add_nfs(self, nfs_server, username=None, pwd=None):
+        """
+        Add a NFS server (e.g., ``172.22.169.17:/nfs_dir``) to mount the file system from
+        """
+        log.debug("Adding NFS server {0} to file system {1}".format(nfs_server, self.name))
+        self.kind = 'nfs'
+        self.nfs_fs = NfsFS(self, nfs_server, username, pwd)

File cm/services/data/nfs.py

+"""
+An implementation of the NFS-based file system service capable of managing
+interactions with a remote NFS server and making it available locally as a
+file system.
+"""
+import os
+import time
+import subprocess
+
+from cm.util.misc import run
+
+import logging
+log = logging.getLogger('cloudman')
+
+
+class NfsFS(object):
+    def __init__(self, filesystem, nfs_server, username=None, pwd=None):
+        self.fs = filesystem  # File system encapsulating this implementation
+        self.app = self.fs.app  # A local reference to app (used by @TestFlag)
+        self.nfs_server = nfs_server
+        self.server_username = username
+        self.server_pwd = pwd
+
+    def __str__(self):
+        return str(self.nfs_server)
+
+    def __repr__(self):
+        return str(self.nfs_server)
+
+    def start(self):
+        """
+        Start NfsFS service.
+        Satisfy any preconditions and try to mount ``self.nfs_server`` locally at
+        ``self.fs.mount_point``
+        """
+        if not os.path.exists(self.fs.mount_point):
+            os.makedirs(self.fs.mount_point)
+        self.mount()
+
+    def stop(self):
+        """
+        Stop NfsFS service and cleanup the ``self.fs.mount_point`` directory.
+        """
+        if self.unmount():
+            # Clean up the system path now that the file system is unmounted
+            try:
+                os.rmdir(self.fs.mount_point)
+            except OSError, e:
+                log.error("Error removing unmounted path {0}: {1}".format(
+                    self.fs.mount_point, e))
+
+    def mount(self):
+        """
+        Do the actual mounting of the remote NFS file system locally.
+        """
+        log.debug("Mounting NFS FS {0} from {1}".format(self.fs.mount_point, self.nfs_server))
+        cmd = '/bin/mount -t nfs {0} {1}'.format(self.nfs_server, self.fs.mount_point)
+        process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        _, _ = process.communicate()
+        if process.returncode != 0:
+            log.error("Trouble mounting file system at {0} from NFS server {1}"
+                .format(self.fs.mount_point, self.nfs_server))
+        else:
+            log.info("Successfully mounted file system at {0} from {1}"
+                .format(self.fs.mount_point, self.nfs_server))
+
+    def unmount(self):
+        """
+        Do the actual unmounting of this file system.
+        """
+        log.debug("Unmounting NFS-based FS from {0}".format(self.fs.mount_point))
+        for counter in range(10):
+            if run('/bin/umount %s' % self.fs.mount_point):
+                break
+            if counter == 9:
+                log.warning(
+                    "Could not unmount file system at '%s'" % self.fs.mount_point)
+                return False
+            counter += 1
+            time.sleep(3)
+        return True

File cm/services/data/volume.py

             return False
         self.fs.status()
         if self.fs.state == service_states.RUNNING or self.fs.state == service_states.SHUTTING_DOWN:
-            log.debug(
-                "Unmounting volume-based FS from {0}".format(mount_point))
+            log.debug("Unmounting volume-based FS from {0}".format(mount_point))
             for counter in range(10):
                 if run('/bin/umount %s' % mount_point,
                         "Error unmounting file system '%s'" % mount_point,

File cm/util/master.py

                       file_system_name)
             return False
 
-    def add_fs(self, bucket_name, fs_name=None, fs_roles=[ServiceRole.GENERIC_FS], bucket_a_key=None, bucket_s_key=None, persistent=False):
+    def add_fs_bucket(self, bucket_name, fs_name=None, fs_roles=[ServiceRole.GENERIC_FS],
+        bucket_a_key=None, bucket_s_key=None, persistent=False):
+        """
+        Add a new file system service for a bucket-based file system.
+        """
         log.info("Adding a {4} file system {3} from bucket {0} (w/ creds {1}:{2})"
                  .format(bucket_name, bucket_a_key, bucket_s_key, fs_name, persistent))
         fs = Filesystem(self.app, fs_name or bucket_name,
             w_inst.send_add_s3fs(bucket_name, fs_roles)
         log.debug("Master done adding FS from bucket {0}".format(bucket_name))
 
+    @TestFlag(None)
+    def add_fs_nfs(self, nfs_server, fs_name, username=None, pwd=None,
+        fs_roles=[ServiceRole.GENERIC_FS], persistent=False):
+        """
+        Add a new file system service for a NFS-based file system. Optionally,
+        provide password-based credentials (``username`` and ``pwd``) for
+        accessing the NFS server.
+        """
+        log.info("Adding a NFS-based file system {0} from NFS server {1}".format(fs_name, nfs_server))
+        fs = Filesystem(self.app, fs_name, persistent=persistent, svc_roles=fs_roles)
+        fs.add_nfs(nfs_server, username, pwd)
+        self.add_master_service(fs)
+        # Inform all workers to add the same FS (the file system will be the same
+        # and sharing it over NFS does not seems to work)
+        for w_inst in self.worker_instances:
+            w_inst.send_add_nfs_fs(nfs_server, fs_name, fs_roles, username, pwd)
+        log.debug("Master done adding FS from NFS server {0}".format(nfs_server))
+
     def stop_worker_instances(self):
         """
         Initiate termination of all worker instances.
         for fs in self.app.manager.get_services(svc_type=ServiceType.FILE_SYSTEM):
             mount_points.append(
                 {'nfs_server': self.app.cloud_interface.get_public_ip(),
-                                 'shared_mount_path': fs.get_details()['mount_point'],
-                                 'fs_name': fs.get_details()['name']
-                                 })
+                 'shared_mount_path': fs.get_details()['mount_point'],
+                 'fs_name': fs.get_details()['name']})
         jmp = json.dumps({'mount_points': mount_points})
         self.app.manager.console_monitor.conn.send('MOUNT | %s' % jmp, self.id)
-        log.debug(
-            "Sent mount points %s to worker %s" % (mount_points, self.id))
+        log.debug("Sent mount points %s to worker %s" % (mount_points, self.id))
 
     def send_master_pubkey(self):
         # log.info("\tMT: Sending MASTER_PUBKEY message: %s" % self.app.manager.get_root_public_key() )
         self.app.manager.console_monitor.conn.send('START_SGE', self.id)
 
     def send_add_s3fs(self, bucket_name, svc_roles):
-        msg = 'ADDS3FS | {0} | {1}'.format(
-            bucket_name, ServiceRole.to_string(svc_roles))
-        log.debug("\tMT: Sending message '{msg}' to instance {inst}".format(
-            msg=msg, inst=self.id))
+        msg = 'ADDS3FS | {0} | {1}'.format(bucket_name, ServiceRole.to_string(svc_roles))
+        self._send_msg(msg)
+
+    def send_add_nfs_fs(self, nfs_server, fs_name, svc_roles, username=None, pwd=None):
+        """
+        Send a message to the worker node requesting it to mount a new file system
+        form the ``nfs_server`` at mount point /mnt/``fs_name`` with roles``svc_roles``.
+        """
+        nfs_server_info = {
+            'nfs_server': nfs_server, 'fs_name': fs_name, 'username': username,
+            'pwd': pwd, 'svc_roles': ServiceRole.to_string(svc_roles)
+        }
+        msg = json.dumps({'nfs_server_info': nfs_server_info})
+        self._send_msg(msg)
+
+    def _send_msg(self, msg):
+        """
+        An internal convenience method to log and send a message to the current instance.
+        """
+        log.debug("\tMT: Sending message '{msg}' to instance {inst}".format(msg=msg, inst=self.id))
         self.app.manager.console_monitor.conn.send(msg, self.id)
 
     def handle_message(self, msg):

File cm/util/misc.py

         except S3ResponseError as e:
             log.debug("Failed to get file '%s' from bucket '%s': %s" % (
                 remote_filename, bucket_name, e))
-            os.remove(local_file)
-                      # Don't leave a partially downloaded or touched file
+            if os.path.exists(local_file):
+                os.remove(local_file)  # Don't leave a partially downloaded or touched file
             return False
     else:
         log.debug("Bucket '%s' does not exist, did not get remote file '%s'" % (

File cm/util/paths.py

                 gh = os.path.join(galaxy_tools_fs_svc[0].mount_point, g_dir)
                 if os.path.exists(gh):
                     return gh
-        log.debug("Warning: Returning default path for galaxy_home")
+        # log.debug("Warning: Returning default path for galaxy_home")
         return P_GALAXY_HOME
 
     @property
     Check if virtual-burrito is installed and if a virtualenv named ``venv_name``
     exists. If so, return ``True``; ``False`` otherwise.
     """
-    vb_path = os.path.join(os.getenv('HOME', '/root'), '.venvburrito/startup.sh')
+    vb_path = '/home/ubuntu/.venvburrito/startup.sh'
     if os.path.exists(vb_path):
         cm_venv = _run("/bin/bash -l -c '. {0}; lsvirtualenv | grep {1}'".format(vb_path, venv_name))
         if cm_venv and venv_name in cm_venv:

File static/scripts/admin.js

                 '<label for="fs-kind-snapshot">Snapshot</label>' +
                 '<input disabled="disabled" type="radio" name="fs_kind" id="fs-kind-new-volume" class="fs-add-radio-btn" value="new_volume"/>' +
                 '<label for="fs-kind-new-volume">New volume</label>' +
-                '<input disabled="disabled" type="radio" name="fs_kind" id="fs-kind-nfs" class="fs-add-radio-btn" value="nfs"/>' +
+                '<input type="radio" name="fs_kind" id="fs-kind-nfs" class="fs-add-radio-btn" value="nfs"/>' +
                 '<label for="fs-kind-nfs">NFS</label>' +
             '</fieldset>' +
             // Bucket form details