Source

fsquass / fsquass / __init__.py

Full commit
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
# coding: utf-8
"""
FsQuass is a filesystem query and traversing library, a pythonic jQuery for filesystem.

Still work in progress.
"""
# from weakref import WeakValueDictionary
import os
import gettext
import logging
import re
import sys

from fnmatch import fnmatch
from functools import partial
from itertools import chain, product
from os import path
from shutil import rmtree

__version__ = '0.1.0'

_ = lambda x: x  # placeholder for translation
__all__ = ['Fs', 'Dir', 'File']


def escaped_split(delimiter, string):
	"""
	Splits *string* by *delimiters* that are not escaped by backslash.
	Unescapes the strings after splitting.
	"""
	return [i.replace('\\' + delimiter, delimiter)
		for i in re.split(r'(?<![^\\]\\)' + delimiter, string)]


class Fs(set):
	"""
	**Files set**. Is a :py:class:`set` of :py:class:`File` and :py:class:`Dir` instances with traversal methods. Besides the methods inherited from :py:class:`set`, it has some methods and properties specific to file systems.

	*nodes* can be a string or an iterable. of :py:class:`File` and :py:class:`Dir` instances.

	If *nodes* is a string, it's treated differently depending on what it starts with:

	* ``/``, files are matched from those in the root directory and further, without scanning the whole filesystem.
	* ``./``, the next name will be searched inside the current directory, without recursive scanning.
	* ``~``, home folder will be opened
	* ``~/``, home folder will be opened, and it's children will be matched, without recursive scanning.

	A space is treated like in CSS, a recursive search for descendants. E.g.

	.. code-block:: python

		Fs('/home/user tests/__init__.py')

	will

	* find ``/home/user``,
	* then recursively scan both for files and directories named ``tests``,
	* then will search for ``__init__.py`` inside those directories, but not deeper.

	Note: Recursive scans can be expensive. If you
	"""

	"""
	TODO:
		* Fs.has()
		* Fs.andSelf()
	"""

	def map_patterns(self, search_string, callback):
		result = Fs()
		for p in escaped_split(';', search_string):
			result |= callback([escaped_split(path.sep, i) for i in escaped_split(' ', p)])
		return result

	def _recur_find(self, patterns):
		result = self
		for pattern in patterns:
			for p in pattern:
				if p == '':
					continue
				if p == '..':
					result = result.parents()
				else:
					result = Fs(i for i in result.children() if self._pattern_level_match(p, i))

		child = self.children()
		if len(child):
			result |= child._recur_find(patterns)
		return result

	def __init__(self, nodes=None):
		if isinstance(nodes, basestring) and nodes.strip() == '/':
			nodes = [Dir('/')]

		elif isinstance(nodes, basestring):
			pattern, nodes = nodes, []
			for patterns in self._patterns(pattern):
				# if query is 'path/to/folder path/to/another' (the part after the space is descendant, not necessarily child of 'folder')
				# then patterns are [['path', 'to', 'folder'], ['path', 'to', 'another']]
				if patterns[0][0] == '' and len(patterns[0]) > 1:  # '/something' becomes ['', 'something'], hence search from the fs root
					patterns[0].pop(0)
					d = Dir('/')
				elif patterns[0][0] in ('.', '~', '..'):
					d = Dir(patterns[0].pop(0))

				else:  # either '/ something' (space means scanning inside fs root), or 'something/...' (which is the same)
					raise ValueError(_("Forbidden query '%s'. To do system-wide scan, use Fs('/').find('. <your query>')") % nodes)

				local_nodes = [d]
				for i, p in enumerate(patterns):
					local_nodes = Fs(local_nodes)._recursive_find(p, deeper=i > 0)

				nodes.extend(local_nodes)

		super(Fs, self).__init__(nodes or [])

	def children(self, pattern=None):
		"""
		Returns a set of children of all the set items filtered by *pattern*.
		"""
		if len(self):
			return reduce(self.__class__.__or__, (i.children(pattern) for i in self))
		return self

	def closest(self, pattern):
		"""
		Finds the closest ancestors by pattern.
		"""
		return self._get_ancestors(pattern) - self

	def filter(self, pattern):
		"""
		Filters items_list by *patten*.

		Filtering a set of paths is equal to an intersection of the set and of a set found by *pattern*:

		.. code-block:: python

			dirs = Fs('/home/siberiano;/home;/tmp;/tmp/siberiano')
			dirs.filter('siberiano') == dirs & Fs('/').find('. siberiano')

		"""
		return self._get_ancestors(pattern) & self

	@staticmethod
	def _filter_children_generator(node):
		path_sections = escaped_split(path.sep, node.path)

		def child_getter(fs):
			item = list(fs)[0]
			path_depth = 1 if item.path == '/' else len(escaped_split(path.sep, item.path))

			if path_depth < len(path_sections):
				return [File(path.sep.join(path_sections[:path_depth + 1]))]
			return []

		return child_getter

	def find(self, pattern):
		"""
		Searches by *pattern* inside the set items. Returns a new Fs instance. E.g. if we have a set ``fs`` of these paths::

			/home/user/
			/root

		``fs.find('.bashrc')`` will probably output::

			/home/user/.bashrc
			/root/.bashrc

		If you need to find multiple paths, separate them with semicolon:

		.. code-block:: python

			Fs('/home/siberiano').find('.bashrc;Work/project/templates base.haml')

		Will search for ``.bashrc`` file in my homefolder (but not deeper) and inside ~/Work/project/templates will recursively search for ``base.haml`` files.

		To avoid accidental scanning of the entire filesystem, recursive search is made harder. Use dot and space in the beginning if you need it anyway:

		.. code-block:: python

			# scan the entire filesystem for 'siberiano'
			Fs('/').find('. siberiano')
			# scans for files & directories named 'project' inside Work
			Fs('/home/siberiano/Work').find('. project')
		"""
		if pattern.strip() == '':
			raise ValueError('Search pattern must be non-empty.')

		result = Fs()
		for patterns in self._patterns(pattern):
			if patterns[0] in ([''], ['', '']):
				raise ValueError(_("Can't start search query with space. To do filesystem scan, use dot-space: '. query'"))
			for i, p in enumerate(patterns):
				if p[0] == '':
					raise ValueError(_("Can't search from root (/) inside an Fs ('%s').") % '/'.join(p))

				if i > 0 and '.' in p:
					raise ValueError(_("Can't use '.' in descendants ('%s')") % '/'.join(p))

			local_result = self
			p = patterns.pop(0)
			if p != ['.']:  # '. name' is the way to scan the filesystem
				local_result = Fs(local_result._recursive_find(p, deeper=False))
			for p in patterns:
				local_result = Fs(local_result._recursive_find(p))
			result |= local_result

		return result

	def first(self):
		"""
		Returns the first item from the set. A shortcut for ``iter(fs).next()``
		"""
		return Fs(iter(self).next())

	def _get_ancestors(self, pattern):
		if not pattern:
			return self

		result = self.__class__()
		for n in self:
			for patterns in self._patterns(pattern):
				local_result = Fs('/')
				for p in patterns:
					child_getter = self._filter_children_generator(n)
					local_result = Fs(local_result._recursive_find(p, child_getter))

				result |= local_result

		return result

	def exclude(self, pattern):
		"""
		Exclude items that match pattern.
		"""
		if isinstance(pattern, basestring):
			return self - self.filter(pattern)

		if isinstance(pattern, Fs):
			return self - pattern

		raise ValueError(_('pattern must be either a string or an Fs instance. Got %s instead.') % pattern)

	def _link(self, target, multiple_targets=False, name_callback=None, link_function=os.symlink):
		if not isinstance(target, Fs):
			target = Fs(target)

		if target.filter(':dir') != target:
			raise ValueError(_("Target(s) is not a directory: %s") % target)

		if not callable(link_function):
			raise ValueError(_("Link function must be a callable."))

		for source, target_dir in product(self, target):
			if not multiple_targets:
				target = target.first()
			s, t = Fs(source), Fs(target_dir)
			if callable(name_callback):
				s, t = name_callback(s, t)
			link_function(s.pop().path, t.pop().path)

	def linkTo(self, target, multiple_targets=False, name_callback=None):
		"""
		Makes a hard link to all the set members in *target* folder.

		* target must be a set of 1 or more directories (:py:class:`Dir` instances).
		* if *multiple_targets* parameter is ``True``, links will be made in all the *target* folders. If *multiple_targets* is ``False``, then will link in the first *target* folder only.

		Optional *name_callback* should work like this:

		.. code-block:: python

			def name_callback(source, target):
				# source & target are Fs instances with 1 member each
				return source, target

		"""
		self.symlinkTo(target, multiple_targets, name_callback, os.link)

	def symlinkTo(self, target, multiple_targets=False, name_callback=None):
		"""
		Makes a symbolic link in *target* folder like :py:func:`Fs.linkTo`
		"""
		self._link(target, multiple_targets, name_callback, os.symlink)

	def parents(self):
		"""
		Returns a set of parents of all the items, e.g. for

		.. code-block:: none

			/home/user/.bashrc
			/home/user/.hgrc
			/tmp/test
			/tmp

		parents will be

		.. code-block:: none

			/home/user
			/tmp
			/
		"""
		return self.__class__(chain.from_iterable(i.parent for i in self))

	@property
	def paths(self):
		"""
		A generator of paths of all the items.
		"""
		return (i.path for i in self)

	@staticmethod
	def _pattern_level_match(pattern, item):
		"""
		Match item's basename against pattern at one level, i.e. between slashes::

			.../siberiano/...
			/tmp/...

		*pattern* can contain subpatterns::

			{home,tmp}
			{etc,usr,var,tmp}
			{dropbox:ignorecase,Pictures}
		"""
		find_match = re.match(r'^{([^{}]+)}$', pattern)  # pattern may contain multiple subpatterns: /{home,tmp,*oot}/
		if find_match:
			subpatterns = escaped_split(',', find_match.groups()[0])
		elif re.findall(r'[^\\][\{\},]', pattern):
			raise ValueError(_('Path query string "%s" contains illegal characters') % pattern)
		else:
			subpatterns = [pattern]

		sp_func = partial(Fs._subpattern_match, item=item)
		if not any(map(sp_func, subpatterns)):
			return False

		return True

	@staticmethod
	def _patterns(pattern):
		for p in escaped_split(';', pattern):
			yield [escaped_split(path.sep, i) for i in escaped_split(' ', p)]

	def _recursive_find(self, pattern, get_children=None, deeper=True):
		"""
		Searches recursively through filesystem, both for direct children and for descendants.

		* *pattern* is a list of strings (either of which may contain subpatterns)
		* *deeper* is a flag whether this path should be searched among children (fs scan)
		* *get_children* is an optional function for the purpose of filtering.

		Normally you call :py:func:`Fs.children` and match them with *pattern*. In case you want to filter, you don't need to scan the real filesystem. Search scope is already limited to a node path (/path/to/node), which means for '/path' get_children should return '/path/to' only, without it siblings from the real hard drive. Custom get_children function that is used in :py:func:`Fs.filter` does this.
		"""
		get_children = get_children or (lambda s: s.children())

		yielder = []
		if pattern in ([], ['']):
			yielder.append(self)

		elif pattern[0] == '..':
			yielder.append(self.parents()._recursive_find(pattern[1:], get_children, False))

		else:
			for n in get_children(self):
				if deeper and isinstance(n, Dir):
					yielder.append(Fs([n])._recursive_find(pattern, get_children))

				logging.debug([n, pattern])
				r = []
				if self._pattern_level_match(pattern[0], n):
					r = [n]

				if r and len(pattern) > 1:
					r = Fs(r)._recursive_find(pattern[1:], get_children, False)

				yielder.append(r)

		for i in chain(*yielder):
			yield i

	def __repr__(self):
		return '%s([%s])' % (self.__class__.__name__, ', '.join(map(str, self)))

	def siblings(self, pattern=None):
		"""
		Finds all the siblings of the files in set, filtered by *pattern*. The result will not include any files of the original set.
		"""
		return self.parents().children(pattern) - self

	@staticmethod
	def _subpattern_match(subpattern, item):
		"""
		Matches ``item.basename`` against *subpattern*.

		* ``subpattern`` - string containing text or unix-like patterns ``?``, ``*``.
		* ``item`` - a :py:class:`File` instance.
		"""
		pseudo_classes = escaped_split(':', subpattern)
		subpattern = pseudo_classes.pop(0)
		basename = item.basename
		if 'ignorecase' in pseudo_classes:
			subpattern = subpattern.lower()
			basename = item.basename.lower()

		if subpattern and not fnmatch(basename, subpattern):
			return False  # stop searching through subpatterns of current level

		if (('file' in pseudo_classes and isinstance(item, Dir)) or
			('dir' in pseudo_classes and not isinstance(item, Dir))):
			return False
		return True


class File(object):
	"""
	A file or a directory. Contains self.path, and if an object with the same absolute path is instantiated, an existing item is returned. If *full_path* is unaccessible, :exc:`EnvironmentError` is raised.
	"""

	"""
	TODO: chmod chown lchown makedirs readlink stat walk
	"""
	_data = {}  # WeakValueDictionary()

	def __new__(cls, full_path):
		"""
		Normalizes path and checks if an instance for this path already exists.
		"""
		full_path = File._normalize(full_path)
		cls._assert_path(full_path)
		if path.isdir(full_path):
			cls = Dir

		if full_path not in cls._data:
			cls._data[full_path] = object.__new__(cls, full_path)

		return cls._data[full_path]

	def __init__(self, full_path):
		self.path = File._normalize(full_path)

	@staticmethod
	def _assert_path(full_path):
		"""
		Checks if path is visible, which means full_path is either of these:

		* a file (or a working and accessible symlink)
		* a symlink, wich may be broken or inaccessible
		"""
		if not(path.exists(full_path) or path.islink(full_path)):
			raise OSError(_('Path unreachable: %s') % full_path)

	@property
	def basename(self):
		"""
		String basename of the file.
		"""
		return path.basename(self.path)

	def children(self, pattern=None):
		"""
		Returns an Fs of child nodes. Makes sense in Dir only, but put here for compatibility.
		"""
		return Fs()

	def __contains__(self, other):
		if not isinstance(other, File):
			raise ValueError('Tried to compare if {0} is in {1}. {0} must be an fsquass.File instance.'.format(other, self))
		return other.path.startswith(self.path)

	def delete(self, sure=False):
		"""
		Deletes the file if *sure* is ``True``. If you managed to call it like this, don't blame the library for any lost data.
		"""
		if sure == True:
			os.remove(self.path)

	def __iter__(self):
		return []

	@staticmethod
	def _normalize(dirty_path):
		return path.abspath(path.normpath(path.expanduser(dirty_path)))

	def open(self, *args, **kwargs):
		"""
		Wrapper to Python ``open()``.
		"""
		open(self.path, *args, **kwargs)

	@property
	def parent(self):
		"""
		Returns an Fs with the parent directory.
		"""
		par = path.normpath(path.join(self.path, path.pardir))
		return Fs([Dir(par)] if par != self.path else None)

	def __repr__(self):
		return "%s('%s')" % (self.__class__.__name__, self.path)


class Dir(File):
	"""
	Directory. Returns its directories and files in children() method.

	* Is iterable:

		.. code-block:: python

			for i in Dir('/home/siberiano'):
				print i

		will print files and directories in the folder.

		This allows using such tricks as using a :py:class:`Dir` to get a :py:class:`Fs` of it's children:

		.. code-block:: python

			>>> d = Dir('/')
			>>> Fs(d) == d.children()
			True

	* Can check if contains another :py:class:`File` or :py:class:`Dir`:

		.. code-block:: python

			>>> Dir('/home') in Dir('/')
			True
			>>> Dir('/tmp') in Dir('/home')
			False
	"""
	def __init__(self, full_path):
		super(Dir, self).__init__(full_path)
		if not path.isdir(self.path):
			raise ValueError(_('path %s is not a directory') % self.path)

	def children(self, pattern=None):
		"""
		Lists the directory and returns :py:class:`Fs` of the files, filtered by *pattern*.
		"""
		try:
			filenames = os.listdir(self.path)
		except OSError:
			return Fs()
		t = partial(path.join, self.path)
		return Fs(map(File, map(t, filenames))).filter(pattern)

	def delete(self, sure=False):
		"""
		Deletes the directory with all files and directories in it if *sure* is ``True``. If you managed to call it like this, don't blame the library for any lost data.
		"""
		if sure == True:
			rmtree(self.path, ignore_errors=True)

	def __iter__(self):
		return iter(self.children())

	def open(self, *args, **kwargs):
		"""
		Raises TypeError, since directories can't be opened like files.
		"""
		raise TypeError(_("Directories can't be opened like files"))

# paths = Fs(__file__).parents().parents().find('./locale').paths
#_ = gettext.translation('fsquass', ''.join(paths)).ugettext


def main(args):
	if not args:
		print _('Usage: fsquass "[search string]"\n(Enclose search string in quotes.)')
		sys.exit(1)
	try:
		found = Fs(' '.join(args))
	except KeyboardInterrupt:
		print _('Interrupted by user.')
		sys.exit(1)
	except SystemExit:
		raise
	except:
		import traceback
		if not logging.root.handlers:
			logging.basicConfig()
		skip_it = False
		exc_info = sys.exc_info()
		if hasattr(exc_info[0], "__name__"):
			exc_class, exc, tb = exc_info
			if isinstance(exc, IOError) and exc.args[0] == 32:
				# Skip 'IOError: [Errno 32] Broken pipe': often a cancelling of `less`.
				skip_it = True
			if not skip_it:
				tb_path, tb_lineno, tb_func = traceback.extract_tb(tb)[-1][:3]
				logging.error("%s (%s:%s in %s)", exc_info[1], tb_path,
					tb_lineno, tb_func)
		else:  # string exception
			logging.error(exc_info[0])
		if not skip_it:
			if logging.getLogger().level <= logging.DEBUG:
				print()
				traceback.print_exception(*exc_info)
			sys.exit(1)
	for i in found:
		print i.path

	sys.exit(0)


if __name__ == '__main__':
	main(sys.argv[1:])