Source

Snakemake / snakemake / io.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
# -*- coding: utf-8 -*-

import os
import re
import stat
from itertools import product, chain
from collections import Iterable, namedtuple
from snakemake.exceptions import MissingOutputException, WorkflowError, WildcardError

__author__ = "Johannes Köster"


def IOFile(file, rule=None):
    f = _IOFile(file)
    f.rule = rule
    return f


class _IOFile(str):
    """
    A file that is either input or output of a rule.
    """

    dynamic_fill = "__snakemake_dynamic__"

    def __new__(cls, file):
        obj = str.__new__(cls, file)
        obj._is_function = type(file).__name__ == "function"
        obj._file = file
        obj.rule = None
        obj._regex = None
        return obj

    @property
    def file(self):
        if not self._is_function:
            return self._file
        else:
            raise ValueError(
                "This IOFile is specified as a function and "
                "may not be used directly.")

    @property
    def exists(self):
        return os.path.exists(self.file)

    @property
    def protected(self):
        return self.exists and not os.access(self.file, os.W_OK)

    @property
    def mtime(self):
        return os.stat(self.file).st_mtime

    def is_newer(self, time):
        return self.mtime > time

    def prepare(self):
        path_until_wildcard = re.split(self.dynamic_fill, self.file)[0]
        dir = os.path.dirname(path_until_wildcard)
        if len(dir) > 0 and not os.path.exists(dir):
            try:
                os.makedirs(dir)
            except OSError as e:
                # ignore Errno 17 "File exists" (reason: multiprocessing)
                if e.errno != 17:
                    raise e

    def protect(self):
        mode = (os.stat(self.file).st_mode & ~stat.S_IWUSR &
            ~stat.S_IWGRP & ~stat.S_IWOTH)
        if os.path.isdir(self.file):
            for root, dirs, files in os.walk(self.file):
                for d in dirs:
                    os.chmod(os.path.join(self.file, d), mode)
                for f in files:
                    os.chmod(os.path.join(self.file, f), mode)
        else:
            os.chmod(self.file, mode)

    def remove(self):
        remove(self.file)

    def touch(self):
        try:
            os.utime(self.file, None)
        except OSError as e:
            if e.errno == 2:
                raise MissingOutputException(
                    "Output file {} of rule {} shall be touched but "
                    "does not exist.".format(self.file, self.rule.name),
                    lineno=self.rule.lineno,
                    snakefile=self.rule.snakefile)
            else:
                raise e

    def apply_wildcards(
        self, wildcards, fill_missing=False,
        fail_dynamic=False):
        f = self._file
        if self._is_function:
            f = self._file(Namedlist(fromdict=wildcards))

        return IOFile(
            apply_wildcards(
                f, wildcards, fill_missing=fill_missing,
                fail_dynamic=fail_dynamic,
                dynamic_fill=self.dynamic_fill),
            rule=self.rule)

    def get_wildcard_names(self):
        return set(match.group('name') for match in
            _wildcard_regex.finditer(self.file))

    def regex(self):
        if not self._regex:
            # compile a regular expression
            self._regex = re.compile(regex(self.file))
        return self._regex

    def match(self, target):
        match = self.regex().match(target)
        return match if match else None

    def __eq__(self, other):
        f = other._file if isinstance(other, _IOFile) else other
        return self._file == f

    def __hash__(self):
        return self._file.__hash__()


_wildcard_regex = re.compile(
    "\{\s*(?P<name>\w+?)(\s*,\s*(?P<constraint>[^\}]*))?\s*\}")


def remove(file):
    if os.path.exists(file):
        if os.path.isdir(file):
            try:
                os.removedirs(file)
            except OSError:
                # ignore non empty directories
                pass
        else:
            os.remove(file)


def regex(filepattern):
    f = []
    last = 0
    wildcards = set()
    for match in _wildcard_regex.finditer(filepattern):
        f.append(re.escape(filepattern[last:match.start()]))
        wildcard = match.group("name")
        if wildcard in wildcards:
            if match.group("constraint"):
                raise ValueError("If multiple wildcards of the same name "
                "appear in a string, eventual constraints have to be defined "
                "at the first occurence and will be inherited by the others.")
            f.append("(?P={})".format(wildcard))
        else:
            wildcards.add(wildcard)
            f.append("(?P<{}>{})".format(
                wildcard,
                match.group("constraint")
                    if match.group("constraint") else ".+"))
        last = match.end()
    f.append(re.escape(filepattern[last:]))
    f.append("$") # ensure that the match spans the whole file
    return "".join(f)


def apply_wildcards(pattern, wildcards, fill_missing=False,
        fail_dynamic=False, dynamic_fill=None):

        def format_match(match):
            name = match.group("name")
            try:
                value = wildcards[name]
                if fail_dynamic and value == dynamic_fill:
                    raise WildcardError(name)
                return str(value)  # convert anything into a str
            except KeyError as ex:
                if fill_missing:
                    return dynamic_fill
                else:
                    raise WildcardError(str(ex))

        return re.sub(_wildcard_regex, format_match, pattern)


def not_iterable(value):
    return isinstance(value, str) or not isinstance(value, Iterable)


class AnnotatedString(str):
    def __init__(self, value):
        self.flags = set()


def flag(value, flag):
    if isinstance(value, AnnotatedString):
        value.flags.add(flag)
        return value
    if not_iterable(value):
        value = AnnotatedString(value)
        value.flags.add(flag)
        return value
    return [flag(v, flag) for v in value]


def is_flagged(value, flag):
    if isinstance(value, AnnotatedString):
        return flag in value.flags
    return False


def temp(value):
    """
    A flag for an input or output file that shall be removed after usage.
    """
    if is_flagged(value, "protected"):
        raise SyntaxError("Protected and temporary flags are mutually exclusive.")
    return flag(value, "temp")


def temporary(value):
    """ An alias for temp. """
    return temp(value)


def protected(value):
    """ A flag for a file that shall be write protected after creation. """
    if is_flagged(value, "temp"):
        raise SyntaxError("Protected and temporary flags are mutually exclusive.")
    return flag(value, "protected")


def dynamic(value):
    """
    A flag for a file that shall be dynamic, i.e. the multiplicity
    (and wildcard values) will be expanded after a certain
    rule has been run """
    annotated = flag(value, "dynamic")
    tocheck = [annotated] if not_iterable(annotated) else annotated
    for file in tocheck:
        matches = list(_wildcard_regex.finditer(file))
        #if len(matches) != 1:
        #    raise SyntaxError("Dynamic files need exactly one wildcard.")
        for match in matches:
            if match.group("constraint"):
                raise SyntaxError(
                    "The wildcards in dynamic files cannot be constrained.")
    return annotated


def expand(*args, **wildcards):
    """
    Expand wildcards in given filepatterns.

    Arguments
    *args -- first arg: filepatterns as list or one single filepattern,
        second arg (optional): a function to combine wildcard values
        (itertools.product per default)
    **wildcards -- the wildcards as keyword arguments
        with their values as lists
    """
    filepatterns = args[0]
    if len(args) == 1:
        combinator = product
    elif len(args) == 2:
        combinator = args[1]
    if isinstance(filepatterns, str):
        filepatterns = [filepatterns]

    def flatten(wildcards):
        for wildcard, values in wildcards.items():
            if isinstance(values, str) or not isinstance(values, Iterable):
                values = [values]
            yield [(wildcard, value) for value in values]

    try:
        return [filepattern.format(**comb) for comb in map(dict, combinator(*flatten(wildcards))) for filepattern in filepatterns]
    except KeyError as e:
        raise WildcardError("No values given for wildcard {}.".format(e))


def glob_wildcards(pattern):
    """
    Glob the values of the wildcards by matching the given pattern to the filesystem.
    Returns a named tuple with a list of values for each wildcard.
    """
    first_wildcard = re.search("{[^{]", pattern)
    dirname = os.path.dirname(pattern[:first_wildcard.start()]) if first_wildcard else os.path.dirname(pattern)
    if not dirname:
        dirname = "."
    
    names = [match.group('name')
        for match in _wildcard_regex.finditer(pattern)]
    Wildcards = namedtuple("Wildcards", names)
    wildcards = Wildcards(*[list() for name in names])

    pattern = re.compile(regex(pattern))
    for dirpath, dirnames, filenames in os.walk(dirname):
        for f in chain(filenames, dirnames):
            if dirpath != ".":
                f = os.path.join(dirpath, f)
            match = re.match(pattern, f)
            if match:
                for name, value in match.groupdict().items():
                    getattr(wildcards, name).append(value)
    return wildcards


# TODO rewrite Namedlist!
class Namedlist(list):
    """
    A list that additionally provides functions to name items. Further,
    it is hashable, however the hash does not consider the item names.
    """
    def __init__(self, toclone=None, fromdict=None, plainstr=False):
        """
        Create the object.

        Arguments
        toclone  -- another Namedlist that shall be cloned
        fromdict -- a dict that shall be converted to a
            Namedlist (keys become names)
        """
        list.__init__(self)
        self._names = dict()

        if toclone:
            self.extend(map(str, toclone) if plainstr else toclone)
            if isinstance(toclone, Namedlist):
                self.take_names(toclone.get_names())
        if fromdict:
            for key, item in fromdict.items():
                self.append(item)
                self.add_name(key)

    def add_name(self, name):
        """
        Add a name to the last item.

        Arguments
        name -- a name
        """
        self.set_name(name, len(self) - 1)

    def set_name(self, name, index, end=None):
        """
        Set the name of an item.

        Arguments
        name  -- a name
        index -- the item index
        """
        if end is None:
            end = index + 1
        self._names[name] = (index, end)
        if index == end - 1:
            setattr(self, name, self[index])
        else:
            setattr(self, name, Namedlist(toclone=self[index:end]))

    def get_names(self):
        """
        Get the defined names as (name, index) pairs.
        """
        for name, index in self._names.items():
            yield name, index

    def take_names(self, names):
        """
        Take over the given names.

        Arguments
        names -- the given names as (name, index) pairs
        """
        for name, (i, j) in names:
            self.set_name(name, i, end=j)

    def items(self):
        for name in self._names:
            yield name, getattr(self, name)

    def allitems(self):
        next = 0
        for name, index in sorted(
            self._names.items(), key=lambda item: item[1]):
            start, end = index
            if start > next:
                for item in self[next:start]:
                    yield None, item
            yield name, getattr(self, name)
            next = end
        for item in self[next:]:
            yield None, item

    def insert_items(self, index, items):
        self[index:index + 1] = items
        add = len(items) - 1
        for name, (i, j) in self._names.items():
            if i > index:
                self._names[name] = (i + add, j + add)
            elif i == index:
                self.set_name(name, i, end=i+len(items))

    def keys(self):
        return self._names

    def plainstrings(self):
        return self.__class__.__call__(toclone=self, plainstr=True)

    def __getitem__(self, key):
        try:
            return super().__getitem__(key)
        except TypeError:
            pass
        return getattr(self, key)

    def __hash__(self):
        return hash(tuple(self))

    def __str__(self):
        return " ".join(self)


class InputFiles(Namedlist):
    pass


class OutputFiles(Namedlist):
    pass


class Wildcards(Namedlist):
    pass


class Params(Namedlist):
    pass

class Resources(Namedlist):
    pass