sphinx / sphinx / builders /

# -*- coding: utf-8 -*-

    The CheckExternalLinksBuilder class.

    :copyright: Copyright 2007-2011 by the Sphinx team, see AUTHORS.
    :license: BSD, see LICENSE for details.

import re
import sys
import Queue
import socket
import threading
from os import path
from urllib2 import build_opener, unquote, Request
from HTMLParser import HTMLParser, HTMLParseError

from docutils import nodes

from import Builder
from sphinx.util.console import purple, red, darkgreen, darkgray

# create an opener that will simulate a browser user-agent
opener = build_opener()
opener.addheaders = [('User-agent', 'Mozilla/5.0')]

class HeadRequest(Request):
    """Subclass of urllib2.Request that sends a HEAD request."""
    def get_method(self):
        return 'HEAD'

class AnchorCheckParser(HTMLParser):
    """Specialized HTML parser that looks for a specific anchor."""

    def __init__(self, search_anchor):

        self.search_anchor = search_anchor
        self.found = False

    def handle_starttag(self, tag, attrs):
        for key, value in attrs:
            if key in ('id', 'name') and value == self.search_anchor:
                self.found = True

def check_anchor(f, hash):
    """Reads HTML data from a filelike object 'f' searching for anchor 'hash'.
    Returns True if anchor was found, False otherwise.
    parser = AnchorCheckParser(hash)
        # Read file in chunks of 8192 bytes. If we find a matching anchor, we
        # break the loop early in hopes not to have to download the whole thing.
        chunk =
        while chunk and not parser.found:
            chunk =
    except HTMLParseError:
        # HTMLParser is usually pretty good with sloppy HTML, but it tends to
        # choke on EOF. But we're done then anyway.
    return parser.found

class CheckExternalLinksBuilder(Builder):
    Checks for broken external links.
    name = 'linkcheck'

    def init(self):
        self.to_ignore = map(re.compile,
        self.good = set()
        self.broken = {}
        self.redirected = {}
        # set a timeout for non-responding servers
        # create output file
        open(path.join(self.outdir, 'output.txt'), 'w').close()

        # create queues and worker threads
        self.wqueue = Queue.Queue()
        self.rqueue = Queue.Queue()
        self.workers = []
        for i in range(
            thread = threading.Thread(target=self.check_thread)

    def check_thread(self):
        kwargs = {}
        if sys.version_info > (2, 5) and
            kwargs['timeout'] =

        def check():
            # check for various conditions without bothering the network
            if len(uri) == 0 or uri[0] == '#' or uri[0:7] == 'mailto:' or uri[0:4] == 'ftp:':
                return 'unchecked', ''
            elif not (uri[0:5] == 'http:' or uri[0:6] == 'https:'):
                return 'local', ''
            elif uri in self.good:
                return 'working', ''
            elif uri in self.broken:
                return 'broken', self.broken[uri]
            elif uri in self.redirected:
                return 'redirected', self.redirected[uri]
            for rex in self.to_ignore:
                if rex.match(uri):
                    return 'ignored', ''

            if '#' in uri:
                req_url, hash = uri.split('#', 1)
                req_url = uri
                hash = None

            # need to actually check the URI
                if hash and
                    # Read the whole document and see if #hash exists
                    f =, **kwargs)
                    found = check_anchor(f, unquote(hash))

                    if not found:
                        raise Exception("Anchor '%s' not found" % hash)
                    f =, **kwargs)

            except Exception, err:
                self.broken[uri] = str(err)
                return 'broken', str(err)
            if f.url.rstrip('/') == req_url.rstrip('/'):
                return 'working', 'new'
                new_url = f.url
                if hash:
                    new_url += '#' + hash

                self.redirected[uri] = new_url
                return 'redirected', new_url

        while True:
            uri, docname, lineno = self.wqueue.get()
            if uri is None:
            status, info = check()
            self.rqueue.put((uri, docname, lineno, status, info))

    def process_result(self, result):
        uri, docname, lineno, status, info = result
        if status == 'unchecked':
        if status == 'working' and info != 'new':
        if lineno:
  '(line %3d) ' % lineno, nonl=1)
        if status == 'ignored':
   + ' - ' + darkgray('ignored'))
        elif status == 'local':
   + ' - ' + darkgray('local'))
            self.write_entry('local', docname, lineno, uri)
        elif status == 'working':
   + ' - ' + darkgreen('working'))
        elif status == 'broken':
   + ' - ' + red('broken: ') + info)
            self.write_entry('broken', docname, lineno, uri + ': ' + info)
                self.warn('broken link: %s' % uri,
                          '%s:%s' % (self.env.doc2path(docname), lineno))
        elif status == 'redirected':
   + ' - ' + purple('redirected') + ' to ' + info)
            self.write_entry('redirected', docname, lineno, uri + ' to ' + info)

    def get_target_uri(self, docname, typ=None):
        return ''

    def get_outdated_docs(self):
        return self.env.found_docs

    def prepare_writing(self, docnames):

    def write_doc(self, docname, doctree):
        n = 0
        for node in doctree.traverse(nodes.reference):
            if 'refuri' not in node:
            uri = node['refuri']
            lineno = None
            while lineno is None:
                node = node.parent
                if node is None:
                lineno = node.line
            self.wqueue.put((uri, docname, lineno), False)
            n += 1
        done = 0
        while done < n:
            done += 1

        if self.broken:
   = 1

    def write_entry(self, what, docname, line, uri):
        output = open(path.join(self.outdir, 'output.txt'), 'a')
        output.write("%s:%s: [%s] %s\n" % (self.env.doc2path(docname, None),
                                           line, what, uri))

    def finish(self):
        for worker in self.workers:
            self.wqueue.put((None, None, None), False)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.