Source

galaxy-globus / library_directory_sync.py

Full commit
#!/usr/bin/env python
"""
Synchronizes a library with a directory on the filesystem.

Usage:

    python library_directory_sync.py <api_key> <api_url> 'Library Name' /tmp/my_directory

NOTE:
  - Requires the you have Galaxy's Experimental REST-ful API enabled
  - The upload method used requires the data library filesystem upload
       allow_library_path_paste variable set.  e.g.: set

       allow_library_path_paste = True

       in your universe_wsgi.ini file.

"""
import exceptions
import json
import logging
import os
import shutil
import sys
import time

logging.basicConfig(level=logging.DEBUG)

try:
    import argparse
except ImportError:
    logging.critical("This script requires the 'argparse' library from pypi (http://pypi.python.org/pypi/argparse)")
try:
    import requests
except ImportError:
    logging.critical("This script requires the 'requests' library from pypi (http://pypi.python.org/pypi/requests)")
try:
    import watchdog
except ImportError:
    logging.critical("This script requires the 'watchdog' library from pypi (http://pypi.python.org/pypi/watchdog)")

import configobj

from Crypto.Cipher import Blowfish

from watchdog import events
#from watchdog import events
#eh = events.LoggingEventHandler()
from watchdog import observers


CONFIG_ID_SECRET = None
ini = open("universe_wsgi.ini", "r")
config = configobj.ConfigObj(ini)
CONFIG_ID_SECRET = config["app:main"]["id_secret"]


def encode_id( obj_id ):
    id_cipher = Blowfish.new(CONFIG_ID_SECRET)
    # Convert to string
    s = str( obj_id )
    # Pad to a multiple of 8 with leading "!" 
    s = ("!" * (8 - len(s) % 8)) + s
    # Encrypt
    return id_cipher.encrypt(s).encode('hex')


def decode_id( obj_id ):
    id_cipher = Blowfish.new(CONFIG_ID_SECRET)
    hex_string = obj_id.decode('hex')
    decrypted = id_cipher.decrypt(hex_string)
    decrypted = decrypted.lstrip("!")
    decoded_id = int(decrypted)
    return decoded_id


def decode_file_id(obj_id):
    """File IDs come back from galaxy as 'file.%d' or 'folder.%d'
    """
    id_cipher = Blowfish.new(CONFIG_ID_SECRET)
    hex_string = obj_id.decode('hex')
    decrypted = id_cipher.decrypt(hex_string)
    decrypted = decrypted.lstrip("!")
    id_number_string = decrypted.split(".")[-1]
    id_number = int(id_number_string)
    return id_number


class GalaxyError(exceptions.Exception):
    """Exception thrown to indicate a error response code (400+)
    from the Galaxy API"""


class LibraryFilesystemSynchronizer(object):
    """Performs synchronization while managing the details of working
    with the Galaxy HTTP API

    """

    def __init__(self, api_key, site_url, library_name, directory):
        self.api_key = api_key
        self.site_url = site_url
        self.api_url = site_url + "/" + "api"
        self.library_name = library_name
        self.directory = directory

    def find_or_create_library(self, library_name):
        """Lookup a library if it exists otherwise create it.

        Returns a dictionary of library information."""
        query_params = {"library_name": library_name,
                        "key":self.api_key}
        libraries_url = self.api_url + "/" + "libraries"
        print "Getting libraries:", libraries_url
        response = requests.get(libraries_url, params=query_params)
        libraries = json.load(response)
        return libraries[0]

    def find_folder(self, library):
        folders_url = self.api_url + "/" + "libraries/%s/contents"%(library["id"],)
        response = requests.get(folders_url, params={"key":self.api_key,
                                          })
        folders = json.load(response)
        root_folder = None
        for folder in folders:
            if folder["name"] == "/":
                root_folder = folder
        return root_folder

    def create_or_update_dataset(self, filesystem_path, file_type="auto", dbkey="", upload_option="upload_paths", create_type="file"):
        """Add a file to the specified library folder without copying.

        """
        data = {"folder_id":self.root_folder["id"],
                "file_type":file_type,
                "dbkey":dbkey,
                "upload_option":upload_option,
                "filesystem_paths":filesystem_path, # todo, multiple 'uploads'?
                "create_type": create_type}
        
        contents_url = (self.api_url + "/" +
                        "libraries/%s/contents"%(self.library["id"]) +
                        "?key=%s"%(self.api_key,))

        # check if the file already exists:
        name = os.path.basename(filesystem_path)
        contents_response = requests.get(contents_url, headers={"Content-type":"application/json"})
        contents = json.load(contents_response)
        for entry in contents:
            entry_name = entry["name"]
            if entry_name == "/" + name:
                print "Replacing:", entry_name, entry["id"], entry
                #data["replace_id"] = entry["id"]
                #data["replace_id"] = encode_id(entry["id"])
                data["replace_id"] = encode_id(decode_file_id(entry["id"]))
        
        json_data = json.dumps(data)
        print "json data:", json_data
        response = requests.post(contents_url, 
                                 headers={"Content-type":"application/json"},
                                 data=json_data)
        
        if response.status_code >= 400:
            # TODO: raise exception?
            #content = response.content
            with open("sync_error.html", "w") as error:
                error.write(response.content)
            print "Creating or updating a datset: Expected %s reponse code - got %s instead!"%(200, response.status_code)

    # BEGIN watchdog.event.EventHandler protocol:
    def on_moved(self, event):
        what = 'directory' if event.is_directory else 'file'
        logging.info("Moved %s: from %s to %s", what, event.src_path,
                     event.dest_path)

    def on_created(self, event):
        what = 'directory' if event.is_directory else 'file'
        logging.info("Created %s: %s", what, event.src_path)
        self.create_or_update_dataset(self.root_folder, event.src_path,
                                 file_type="auto",
                                 dbkey="", upload_option="upload_paths",
                                 create_type="file")

    def on_deleted(self, event):
        what = 'directory' if event.is_directory else 'file'
        logging.info("Deleted %s: %s", what, event.src_path)

    def on_modified(self, event):
        what = 'directory' if event.is_directory else 'file'
        logging.info("Modified %s: %s", what, event.src_path)
        self.create_or_update_dataset(event.src_path,
                                 file_type="auto",
                                 dbkey="", upload_option="upload_paths",
                                 create_type="file")

    def on_any_event(self, event):
        pass

    _method_map = {events.EVENT_TYPE_MOVED:on_moved,
                   events.EVENT_TYPE_DELETED:on_deleted,
                   events.EVENT_TYPE_CREATED:on_created,
                   events.EVENT_TYPE_MODIFIED:on_modified}

    def dispatch(self, event):
        self.on_any_event(event)
        self._method_map[event.event_type](self, event)
    # END watchdog.event.EventHandler protocol

    def run(self):
        # Find/Create data library with the above name.  Assume we're putting datasets in the root folder '/'
        library = self.find_or_create_library(self.library_name)
        self.library = library
        print "Library:", library["id"]
        folder = self.find_folder(library)
        print "folder:", folder
        self.root_folder = folder

        observer = observers.Observer()
        observer.schedule(self, path=self.directory, recursive=False)
        print "Watching:", self.directory
        observer.start()
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            observer.stop()
        finally:
            observer.join()

def get_parser():
    parser = argparse.ArgumentParser()
    parser.add_argument("library_name", metavar="LIBRARY",
                        help="Name of library to be synchronized."
                        "Creates it if it is missing.")
    parser.add_argument("directory", metavar="DIRECTORY",
                     help="Directory to synchronize with the Galaxy Library.")

    parser.add_argument("-k", "--api-key", dest="api_key",
                        help="Galaxy REST API key")
    parser.add_argument("-u", "--api-url", dest="api_url",
                        help="Galaxy REST API URL")
    return parser

def main():
    parser = get_parser()
    args = parser.parse_args()
    print "args:", args
    synchro = LibraryFilesystemSynchronizer(args.api_key, args.api_url,
                                            args.library_name, args.directory)
    synchro.run()

if __name__=="__main__":
    main()