Source

galaxy-central / contrib / nagios / check_galaxy.py

Full commit
#!/usr/bin/env python
"""
check_galaxy can be run by hand, although it is meant to run from cron
via the check_galaxy.sh script in Galaxy's cron/ directory.
"""

import socket, sys, os, time, tempfile, filecmp, htmllib, formatter, getopt, json
from user import home

import warnings
with warnings.catch_warnings():
    warnings.simplefilter('ignore')
    import twill
    import twill.commands as tc

# options
if os.environ.has_key( "DEBUG" ):
    debug = os.environ["DEBUG"]
else:
    debug = False

test_data_dir = os.path.join( os.path.dirname( __file__ ), 'check_galaxy_data' )
# what tools to run - not so pretty
tools = {
    "Extract+genomic+DNA+1" :
    [
        {
            "inputs" :
            (
                {
                    "file_path" : os.path.join( test_data_dir, "1.bed" ),
                    "dbkey" : "hg17",
                },

            )
        },
        { "check_file" : os.path.join( test_data_dir, "extract_genomic_dna_out1.fasta" ) },
        {
            "tool_run_options" :
            {
                "input" : "1.bed",
                "interpret_features" : "yes",
                "index_source" : "cached",
                "out_format" : "fasta"
            }
        }
    ]
}

# handle arg(s)
def usage():
    print "usage: check_galaxy.py <server> <username> <password>"
    sys.exit(1)

try:
    opts, args = getopt.getopt( sys.argv[1:], 'n' )
except getopt.GetoptError, e:
    print str(e)
    usage()
if len( args ) < 1:
    usage()
server = args[0]
username = args[1]
password = args[2]

new_history = False
for o, a in opts:
    if o == "-n":
        if debug:
            print "Specified -n, will create a new history"
        new_history = True
    else:
        usage()

# state information
var_dir = os.path.join( home, ".check_galaxy", server )
if not os.access( var_dir, os.F_OK ):
    os.makedirs( var_dir, 0700 )

# default timeout for twill browser is never
socket.setdefaulttimeout(300)

# user-agent
tc.agent("Mozilla/5.0 (compatible; check_galaxy/0.1)")
tc.config('use_tidy', 0)

class Browser:

    def __init__(self):
        self.server = server
        self.tool = None
        self.tool_opts = None
        self._hda_id = None
        self._hda_state = None
        self._history_id = None
        self.check_file = None
        self.cookie_jar = os.path.join( var_dir, "cookie_jar" )
        dprint("cookie jar path: %s" % self.cookie_jar)
        if not os.access(self.cookie_jar, os.R_OK):
            dprint("no cookie jar at above path, creating")
            tc.save_cookies(self.cookie_jar)
        tc.load_cookies(self.cookie_jar)

    def get(self, path):
        tc.go("http://%s%s" % (self.server, path))
        tc.code(200)

    def reset(self):
        self.tool = None
        self.tool_opts = None
        self._hda_id = None
        self._hda_state = None
        self._history_id = None
        self.check_file = None
        if new_history:
            self.get("/history/delete_current")
            tc.save_cookies(self.cookie_jar)
        self.delete_datasets()

    def check_redir(self, url):
        try:
            tc.get_browser()._browser.set_handle_redirect(False)
            tc.go(url)
            tc.code(302)
            tc.get_browser()._browser.set_handle_redirect(True)
            dprint( "%s is returning redirect (302)" % url )
            return(True)
        except twill.errors.TwillAssertionError, e:
            tc.get_browser()._browser.set_handle_redirect(True)
            dprint( "%s is not returning redirect (302): %s" % (url, e) )
            code = tc.browser.get_code()
            if code == 502:
                print "Galaxy is down (code 502)"
                sys.exit(1)
            return False

    def login(self, user, pw):
        self.get("/user/login")
        tc.fv("1", "email", user)
        tc.fv("1", "password", pw)
        tc.submit("Login")
        tc.code(200)
        if len(tc.get_browser().get_all_forms()) > 0:
            # uh ohs, fail
            p = userParser()
            p.feed(tc.browser.get_html())
            if p.no_user:
                dprint("user does not exist, will try creating")
                self.create_user(user, pw)
            elif p.bad_pw:
                raise Exception, "Password is incorrect"
            else:
                raise Exception, "Unknown error logging in"
        tc.save_cookies(self.cookie_jar)

    def create_user(self, user, pw):
        self.get("/user/create")
        tc.fv("1", "email", user)
        tc.fv("1", "password", pw)
        tc.fv("1", "confirm", pw)
        tc.submit("Submit")
        tc.code(200)
        if len(tc.get_browser().get_all_forms()) > 0:
            p = userParser()
            p.feed(tc.browser.get_html())
            if p.already_exists:
                raise Exception, 'The user you were trying to create already exists'

    def upload(self, input):
        self.get("/tool_runner/index?tool_id=upload1")
        tc.fv("1","file_type", "bed")
        tc.fv("1","dbkey", input.get('dbkey', '?'))
        tc.formfile("1","file_data", input['file_path'])
        tc.submit("runtool_btn")
        tc.code(200)

    def runtool(self):
        self.get("/tool_runner/index?tool_id=%s" % self.tool)
        for k, v in self.tool_opts.items():
            tc.fv("1", k, v)
        tc.submit("runtool_btn")
        tc.code(200)

    @property
    def history_id(self):
        if self._history_id is None:
            self.get('/api/histories')
            self._history_id = json.loads(tc.browser.get_html())[0]['id']
        return self._history_id

    @property
    def history_contents(self):
        self.get('/api/histories/%s/contents' % self.history_id)
        return json.loads(tc.browser.get_html())

    @property
    def hda_id(self):
        if self._hda_id is None:
            self.set_top_hda()
        return self._hda_id

    @property
    def hda_state(self):
        if self._hda_state is None:
            self.set_top_hda()
        return self._hda_state

    def set_top_hda(self):
        self.get(self.history_contents[-1]['url'])
        hda = json.loads(tc.browser.get_html())
        self._hda_id = hda['id']
        self._hda_state = hda['state']

    @property
    def undeleted_hdas(self):
        rval = []
        for item in self.history_contents:
            self.get(item['url'])
            hda = json.loads(tc.browser.get_html())
            if hda['deleted'] == False:
                rval.append(hda)
        return rval

    @property
    def history_state(self):
        self.get('/api/histories/%s' % self.history_id)
        return json.loads(tc.browser.get_html())['state']

    @property
    def history_state_terminal(self):
        if self.history_state not in ['queued', 'running', 'paused']:
            return True
        return False

    def wait(self):
        sleep_amount = 1
        count = 0
        maxiter = 16
        while count < maxiter:
            count += 1
            if not self.history_state_terminal:
                time.sleep( sleep_amount )
                sleep_amount += 1
            else:
                break
        if count == maxiter:
            raise Exception, "Tool never finished"

    def check_state(self):
        if self.hda_state != "ok":
            self.get("/datasets/%s/stderr" % self.hda_id)
            print tc.browser.get_html()
            raise Exception, "HDA %s NOT OK: %s" % (self.hda_id, self.hda_state)

    def diff(self):
        self.get("/datasets/%s/display?to_ext=%s" % (self.hda_id, self.tool_opts.get('out_format', 'fasta')))
        data = tc.browser.get_html()
        tmp = tempfile.mkstemp()
        dprint("tmp file: %s" % tmp[1])
        tmpfh = os.fdopen(tmp[0], 'w')
        tmpfh.write(data)
        tmpfh.close()
        if filecmp.cmp(tmp[1], self.check_file):
            dprint("Tool output is as expected")
        else:
            if not debug:
                os.remove(tmp[1])
            raise Exception, "Tool output differs from expected"
        if not debug:
            os.remove(tmp[1])

    def delete_datasets(self):
        for hda in self.undeleted_hdas:
            self.get('/datasets/%s/delete' % hda['id'])
        hdas = [hda['id'] for hda in self.undeleted_hdas]
        if hdas:
            print "Remaining datasets ids:", " ".join(hdas)
            raise Exception, "History still contains datasets after attempting to delete them"

    def check_if_logged_in(self):
        self.get("/user?cntrller=user")
        p = loggedinParser()
        p.feed(tc.browser.get_html())
        return p.logged_in

class userParser(htmllib.HTMLParser):
    def __init__(self):
        htmllib.HTMLParser.__init__(self, formatter.NullFormatter())
        self.in_span = False
        self.in_div = False
        self.no_user = False
        self.bad_pw = False
        self.already_exists = False
    def start_span(self, attrs):
        self.in_span = True
    def start_div(self, attrs):
        self.in_div = True
    def end_span(self):
        self.in_span = False
    def end_div(self):
        self.in_div = False
    def handle_data(self, data):
        if self.in_span or self.in_div:
            if data == "No such user (please note that login is case sensitive)":
                self.no_user = True
            elif data == "Invalid password":
                self.bad_pw = True
            elif data == "User with that email already exists":
                self.already_exists = True

class loggedinParser(htmllib.HTMLParser):
    def __init__(self):
        htmllib.HTMLParser.__init__(self, formatter.NullFormatter())
        self.in_p = False
        self.logged_in = False
    def start_p(self, attrs):
        self.in_p = True
    def end_p(self):
        self.in_p = False
    def handle_data(self, data):
        if self.in_p:
            if data == "You are currently not logged in.":
                self.logged_in = False
            elif data.startswith( "You are currently logged in as " ):
                self.logged_in = True

def dprint(str):
    if debug:
        print str

# do stuff here
if __name__ == "__main__":

    dprint("checking %s" % server)

    b = Browser()

    # login (or not)
    if b.check_if_logged_in():
        dprint("we are already logged in (via cookies), hooray!")
    else:
        dprint("not logged in... logging in")
        b.login(username, password)

    for tool, params in tools.iteritems():

        check_file = ""

        # make sure history and state is clean
        b.reset()
        b.tool = tool

        # get all the tool run conditions
        for dict in params:
            for k, v in dict.items():
                if k == 'inputs':
                    for input in v:
                        b.upload(input)
                    b.wait()
                elif k == 'check_file':
                    b.check_file = v
                elif k == 'tool_run_options':
                    b.tool_opts = v
                else:
                    raise Exception, "Unknown key in tools dict: %s" % k

        b.runtool()
        b.wait()
        b.check_state()
        b.diff()
        b.delete_datasets()

    print "OK"
    sys.exit(0)