Source

grab / grab / spider / base.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
from __future__ import absolute_import
from Queue import PriorityQueue, Empty
import pycurl
from grab import Grab
from grab.base import GLOBAL_STATE
import logging
import types
from collections import defaultdict
import os
import time
import signal
import json
import cPickle as pickle
import anydbm
import multiprocessing
import zlib
from hashlib import sha1
try:
    import pymongo
    import pymongo.binary
except ImportError:
    PYMONGO_IMPORTED = False
else:
    PYMONGO_IMPORTED = True
import inspect
import traceback
from urlparse import urljoin
from random import randint

from .error import SpiderError, SpiderMisuseError, FatalError
from .task import Task
from .data import Data

CURL_OBJECT = pycurl.Curl()
DEFAULT_TASK_PRIORITY = 100
RANDOM_TASK_PRIORITY_RANGE = (80, 100)

def execute_handler(path, handler_name, res_count, queue,
                    grab, task):
    try:
        mod_path, cls_name = path.rsplit('.', 1)
        mod = __import__(mod_path, fromlist=[''])
        cls = getattr(mod, cls_name)
        bot = cls(container_mode=True)
        handler = getattr(bot, handler_name)
        try:
            result = handler(grab, task)
            if isinstance(result, types.GeneratorType):
                items = list(result)
            else:
                items = [result]
            items += bot.distributed_task_buffer
                #if len(items) > 1:
                    #raise Exception('Multiple yield from handler is not supported yet in distributed mode')
            queue.put((res_count, handler_name, items))
        except Exception, ex:
            #logging.error(ex)
            tb = traceback.format_exc()
            queue.put((res_count, handler_name, [{'error': ex, 'traceback': tb}]))
    except Exception, ex:
        logging.error('', exc_info=ex)


class Spider(object):
    """
    Asynchronious scraping framework.
    """

    # You can define here some urls and initial tasks
    # with name "initial" will be created from these
    # urls
    # If the logic of generating initial tasks is complex
    # then consider to use `task_generator` method instead of
    # `initial_urls` attribute
    initial_urls = None

    # The base url which is used to resolve all relative urls
    # The resolving takes place in `add_task` method
    base_url = None

    def __init__(self, thread_number=3, request_limit=None,
                 network_try_limit=10, task_try_limit=10,
                 debug_error=False,
                 use_cache=False,
                 use_cache_compression=False,
                 cache_db = None,
                 log_taskname=False,
                 cache_key_hash=True,
                 request_pause=0,
                 container_mode=False,
                 distributed_mode=False,
                 distributed_path=None,
                 priority_mode='random',
                 meta=None,
                 ):
        """
        Arguments:
        * thread-number - Number of concurrent network streams
        * request_limit - Limit number of all network requests
            Useful for debugging
        * network_try_limit - How many times try to send request
            again if network error was occuried, use 0 to disable
        * network_try_limit - Limit of tries to execute some task
            this is not the same as network_try_limit
            network try limit limits the number of tries which
            are performed automaticall in case of network timeout
            of some other physical error
            but task_try_limit limits the number of attempts which
            are scheduled manually in the spider business logic
        * distributed_mode - if True then multiprocessing module
            will be used to share task handlers among available CPU cores
        * request_pause - amount of time on which the main `run` cycle should
            pause the activity of spider. By default it is equal to zero. You
            can use this option to slow down the spider speed (also you can use
            `thread_number` option). The value of `request_pause` could be float.
        * container_mode - used for distributed mode then we have to call method
            in remote process which receives name of spider class and name of method.
        * distributed_path - path to the spider class in format "mod.mod.ClassName"
        * priority_mode - could be "random" or "const"
        * meta - arbitrary user data
        """

        if meta:
            self.meta = meta
        else:
            self.meta = {}
        self.container_mode = container_mode
        self.distributed_task_buffer = []
        if container_mode:
            return
        self.taskq = PriorityQueue()
        self.thread_number = thread_number
        self.request_limit = request_limit
        self.counters = defaultdict(int)
        self.grab_config = {}
        self.proxylist_config = None
        self.items = {}
        self.task_try_limit = task_try_limit
        self.network_try_limit = network_try_limit
        if priority_mode not in ['random', 'const']:
            raise SpiderMisuseError('Value of priority_mode option should be "random" or "const"')
        else:
            self.priority_mode = priority_mode
        try:
            signal.signal(signal.SIGUSR1, self.sigusr1_handler)
        except (ValueError, AttributeError):
            pass
        self.debug_error = debug_error
        self.use_cache = use_cache
        self.cache_db = cache_db
        self.use_cache_compression = use_cache_compression
        if use_cache:
            self.setup_cache()
        self.log_taskname = log_taskname
        self.prepare()
        self.distributed_mode = distributed_mode
        self.cache_key_hash = cache_key_hash
        self.should_stop = False
        self.request_pause = request_pause
        self.distributed_path = distributed_path
        # Init task generator
        self.task_generator_object = self.task_generator()
        self.task_generator_enabled = True
        self.process_task_generator()

    def setup_cache(self):
        if not self.cache_db:
            raise Exception('You should configure cache_db option')
        if not PYMONGO_IMPORTED:
            raise Exception('pymongo required to use cache feature')
        self.cache = pymongo.Connection()[self.cache_db]['cache']

    def prepare(self):
        """
        You can do additional spider customizatin here
        before it has started working.
        """

    def container_prepare(self):
        """
        Executed in container-mode on instance creating phase.
        """

    def sigusr1_handler(self, signal, frame):
        """
        Catches SIGUSR1 signal and dumps current state
        to temporary file
        """

        with open('/tmp/spider.state', 'w') as out:
            out.write(self.render_stats())


    def load_tasks(self, path, task_name='initial', task_priority=100,
                   limit=None):
        count = 0
        with open(path) as inf:
            for line in inf:
                url = line.strip()
                if url:
                    self.taskq.put((task_priority, Task(task_name, url)))
                    count += 1
                    if limit is not None and count >= limit:
                        logging.debug('load_tasks limit reached')
                        break

    def setup_grab(self, **kwargs):
        self.grab_config = kwargs

    def load_initial_urls(self):
        """
        Create initial tasks from `self.initial_urls`.

        Tasks are created with name "initial".
        """

        if self.initial_urls:
            for url in self.initial_urls:
                self.add_task(Task('initial', url=url))

    def run(self):
        try:
            self.start_time = time.time()
            self.load_initial_urls()

            # new
            if self.distributed_mode:
                pool = multiprocessing.Pool()
                manager = multiprocessing.Manager()
                queue = manager.Queue()
                mapping = {}
                multi_requests = []

            for res_count, res in enumerate(self.fetch()):
                if res_count > 0 and self.request_pause > 0:
                    time.sleep(self.request_pause)

                if res is None and not self.distributed_mode:
                    break

                if res:
                    if self.should_stop:
                        break

                    if self.task_generator_enabled:
                        self.process_task_generator()

                    # Increase task counters
                    self.inc_count('task')
                    self.inc_count('task-%s' % res['task'].name)
                    if (res['task'].network_try_count == 1 and
                        res['task'].task_try_count == 1):
                        self.inc_count('task-%s-initial' % res['task'].name)
                    if self.log_taskname:
                        logging.debug('TASK: %s - %s' % (res['task'].name,
                                                         'OK' if res['ok'] else 'FAIL'))

                    handler_name = 'task_%s' % res['task'].name
                    try:
                        handler = getattr(self, handler_name)
                    except AttributeError:
                        raise Exception('Task handler does not exist: %s' %\
                                        handler_name)
                    else:
                        if self.distributed_mode:
                            self.execute_response_handler_async(
                                res, self.distributed_path, handler_name, mapping,
                                res_count, multi_requests, queue, pool)
                        else:
                            self.execute_response_handler_sync(res, handler, handler_name)

                if self.distributed_mode:
                    try:
                        res_count, handler_name, task_results = queue.get(False)
                    except Empty:
                        if res is None:
                            break
                    else:
                        res = mapping[res_count]
                        for task_result in task_results:
                            #if task_result == 'traceback':
                                #import pdb; pdb.set_trace()
                            if isinstance(task_result, dict) and 'error' in task_result:
                                self.error_handler(handler_name,
                                                   task_result['error'],
                                                   res['task'],
                                                   error_tb=task_result['traceback'])
                            else:
                                self.process_result(task_result, res['task'])
                            #import pdb; pdb.set_trace()
                            #res['grab']._lxml_tree = tree
                            #print tree.xpath('//h1')
                            #self.execute_response_handler(res, handler, handler_name)
                        del mapping[res_count]
                    multi_requests = [x for x in multi_requests if not x.ready()]

            # It is nonsense if that code should work because
            # we already is out of main loop so
            # if handler returns new Task then it will not be processed
            #if self.distributed_mode:
            # new
            #while True:
                #try:
                    #res_count, tree = queue.get(True, 0.1)
                #except Empty:
                    #multi_requests = [x for x in multi_requests if not x.ready()]
                    #if not len(multi_requests):
                        #break
                #else:
                    #res = mapping[res_count]
                    ##res['grab']._lxml_tree = tree
                    #self.execute_response_handler(res, handler, handler_name)
                    #del mapping[res_count]


        except KeyboardInterrupt:
            print '\nGot ^C signal. Stopping.'
            print self.render_stats()
        finally:
            # This code is executed when main cycles is breaked
            self.shutdown()


    def execute_response_handler_async(self, res, path, handler_name,
                                       mapping, res_count, multi_requests,
                                       queue, pool):
        if res['ok'] and (res['grab'].response.code < 400 or
                          res['grab'].response.code == 404):
            mapping[res_count] = res
            res['grab'].curl = None
            res['grab_original'].curl = None
            multi_request = pool.apply_async(
                execute_handler, (path, handler_name, res_count,
                                  queue, res['grab'], res['task']))
            multi_requests.append(multi_request)
        else:
            # Log the error
            if res['ok']:
                res['emsg'] = 'HTTP %s' % res['grab'].response.code
            self.inc_count('network-error-%s' % res['emsg'][:20])
            logging.error(res['emsg'])

            # Try to repeat the same network query
            if self.network_try_limit > 0:
                task = res['task']
                task.grab = res['grab_original']
                self.add_task(task)
            # TODO: allow to write error handlers

    def execute_response_handler_sync(self, res, handler, handler_name):
        if res['ok'] and (res['grab'].response.code < 400 or
                          res['grab'].response.code == 404):
            try:
                result = handler(res['grab'], res['task'])
                if isinstance(result, types.GeneratorType):
                    for item in result:
                        self.process_result(item, res['task'])
                else:
                    self.process_result(result, res['task'])
            except Exception, ex:
                self.error_handler(handler_name, ex, res['task'])
        else:
            # Log the error
            if res['ok']:
                res['emsg'] = 'HTTP %s' % res['grab'].response.code
            self.inc_count('network-error-%s' % res['emsg'][:20])
            logging.error(res['emsg'])

            # Try to repeat the same network query
            if self.network_try_limit > 0:
                task = res['task']
                task.grab = res['grab_original']
                self.add_task(task)
            # TODO: allow to write error handlers
    
    def process_result(self, result, task):
        """
        Process result returned from task handler. 
        Result could be None, Task instance or Data instance.
        """

        if isinstance(result, Task):
            if not self.add_task(result):
                self.add_item('wtf-error-task-not-added', task.url)
        elif isinstance(result, Data):
            handler_name = 'data_%s' % result.name
            try:
                handler = getattr(self, handler_name)
            except AttributeError:
                handler = self.data_default
            try:
                handler(result.item)
            except Exception, ex:
                self.error_handler(handler_name, ex, task)
        elif result is None:
            pass
        else:
            #import pdb; pdb.set_trace()
            raise Exception('Unknown result type: %s' % result)

    def add_task(self, task):
        """
        Add new task to task queue.

        Stop the task which was executed too many times.
        """

        if task.priority is None:
            if self.priority_mode == 'const':
                task.priority = DEFAULT_TASK_PRIORITY
            else:
                task.priority = randint(*RANDOM_TASK_PRIORITY_RANGE)

        if not task.url.startswith('http'):
            task.url = urljoin(self.base_url, task.url)
        if task.grab and not task.grab.config['url'].startswith('http'):
            task.grab.config['url'] = urljoin(self.base_url, task.grab.config['url'])

        if self.container_mode:
            self.distributed_task_buffer.append(task)
        else:
            if task.task_try_count > self.task_try_limit:
                logging.debug('Task tries ended: %s / %s' % (task.name, task.url))
                return False
            elif task.network_try_count >= self.network_try_limit:
                logging.debug('Network tries ended: %s / %s' % (task.name, task.url))
                return False
            else:
                #prep = getattr(self, 'task_%s_preprocessor' % task.name, None)
                #ok = True
                #if prep:
                    #ok = prep(task)
                #if ok:
                    #self.taskq.put((task.priority, task))
                #return ok
                self.taskq.put((task.priority, task))
                return True

    def data_default(self, item):
        """
        Default handler for Content result for which
        no handler defined.
        """

        raise Exception('No content handler for %s item', item)

    def fetch(self):
        """
        Download urls via multicurl.
        
        Get new tasks from queue.
        """ 
        m = pycurl.CurlMulti()
        m.handles = []

        # Create curl instances
        for x in xrange(self.thread_number):
            curl = pycurl.Curl()
            m.handles.append(curl)

        freelist = m.handles[:]

        # This is infinite cycle
        # You can break it only from outside code which
        # iterates over result of this method
        while True:

            cached_request = None

            while len(freelist):

                # Increase request counter
                if (self.request_limit is not None and
                    self.counters['request'] >= self.request_limit):
                    logging.debug('Request limit is reached: %s' %\
                                  self.request_limit)
                    if len(freelist) == self.thread_number:
                        yield None
                    else:
                        break
                else:
                    try:
                        priority, task = self.taskq.get(True, 0.1)
                    except Empty:
                        # If All handlers are free and no tasks in queue
                        # yield None signal
                        if len(freelist) == self.thread_number:
                            yield None
                        else:
                            break
                    else:
                        if not self._preprocess_task(task):
                            continue

                        task.network_try_count += 1
                        if task.task_try_count == 0:
                            task.task_try_count = 1

                        if task.task_try_count > self.task_try_limit:
                            logging.debug('Task tries ended: %s / %s' % (
                                          task.name, task.url))
                            self.add_item('too-many-task-tries', task.url)
                            continue
                        
                        if task.network_try_count > self.network_try_limit:
                            logging.debug('Network tries ended: %s / %s' % (
                                          task.name, task.url))
                            self.add_item('too-many-network-tries', task.url)
                            continue

                        if task.grab:
                            grab = task.grab
                        else:
                            # Set up curl instance via Grab interface
                            grab = self.create_grab_instance()
                            grab.setup(url=task.url)

                        if (self.use_cache
                            and not task.get('refresh_cache')
                            and not task.get('disable_cache')):
                            if grab.detect_request_method() == 'GET':
                                url = grab.config['url']
                                _hash = self.build_cache_hash(url)
                                cache_item = self.cache.find_one({'_id': _hash})
                                if cache_item:
                                #if url in self.cache:
                                    #cache_item = pickle.loads(self.cache[url])
                                    #logging.debug('From cache: %s' % url)

                                    # `curl` attribute should not be None
                                    # If it is None (which could be if the fire Task
                                    # objects with grab objects which was recevied in
                                    # as input argument of response handler function)
                                    # then `prepare_request` method will failed
                                    # because it asssumes that Grab instance
                                    # has valid `curl` attribute
                                    if grab.curl is None:
                                        grab.curl = CURL_OBJECT
                                    cached_request = (grab, grab.clone(),
                                                      task, cache_item)
                                    grab.prepare_request()
                                    grab.log_request('CACHED')
                                    self.inc_count('request-cache')

                                    # break from prepre-request cycle
                                    # and go to process-response code
                                    break

                        self.inc_count('request-network')
                        if self.proxylist_config:
                            args, kwargs = self.proxylist_config
                            grab.setup_proxylist(*args, **kwargs)

                        curl = freelist.pop()
                        curl.grab = grab
                        curl.grab.curl = curl
                        curl.grab_original = grab.clone()
                        curl.grab.prepare_request()
                        curl.grab.log_request()
                        curl.task = task
                        # Add configured curl instance to multi-curl processor
                        m.add_handle(curl)


            # If there were done network requests
            if len(freelist) != self.thread_number:
                while True:
                    status, active_objects = m.perform()
                    if status != pycurl.E_CALL_MULTI_PERFORM:
                        break

            if cached_request:
                grab, grab_original, task, cache_item = cached_request
                url = task.url# or grab.config['url']
                grab.fake_response(cache_item['body'])

                body = cache_item['body']
                if self.use_cache_compression:
                    body = zlib.decompress(body)
                def custom_prepare_response(g):
                    g.response.head = cache_item['head']
                    g.response.body = body
                    g.response.code = cache_item['response_code']
                    g.response.time = 0
                    g.response.url = cache_item['url']
                    g.response.parse()
                    g.response.cookies = g._extract_cookies()

                grab.process_request_result(custom_prepare_response)

                yield {'ok': True, 'grab': grab, 'grab_original': grab_original,
                       'task': task, 'ecode': None, 'emsg': None}
                self.inc_count('request')

            while True:
                queued_messages, ok_list, fail_list = m.info_read()

                results = []
                for curl in ok_list:
                    results.append((True, curl, None, None))
                for curl, ecode, emsg in fail_list:
                    results.append((False, curl, ecode, emsg))

                for ok, curl, ecode, emsg in results:
                    res = self.process_multicurl_response(ok, curl,
                                                          ecode, emsg)
                    m.remove_handle(curl)
                    freelist.append(curl)
                    yield res
                    self.inc_count('request')

                if not queued_messages:
                    break

            m.select(0.5)

    def process_multicurl_response(self, ok, curl, ecode=None, emsg=None):
        """
        Process reponse returned from multicurl cycle.
        """

        task = curl.task
        # Note: curl.grab == task.grab if task.grab is not None
        grab = curl.grab
        grab_original = curl.grab_original

        url = task.url# or grab.config['url']
        grab.process_request_result()

        # Break links, free resources
        curl.grab.curl = None
        curl.grab = None
        curl.task = None

        if ok and self.use_cache and grab.request_method == 'GET' and not task.get('disable_cache'):
            if grab.response.code < 400 or grab.response.code == 404:
                body = grab.response.body
                if self.use_cache_compression:
                    body = zlib.compress(body)

                _hash = self.build_cache_hash(task.url)
                item = {
                    '_id': _hash,
                    'url': task.url,
                    'body': pymongo.binary.Binary(body),
                    'head': pymongo.binary.Binary(grab.response.head),
                    'response_code': grab.response.code,
                    'cookies': None,#grab.response.cookies,
                }
                #import pdb; pdb.set_trace()
                try:
                    #self.mongo.cache.save(item, safe=True)
                    self.cache.save(item, safe=True)
                except Exception, ex:
                    if 'document too large' in unicode(ex):
                        pass
                    #else:
                        #import pdb; pdb.set_trace()

        return {'ok': ok, 'grab': grab, 'grab_original': grab_original,
                'task': task,
                'ecode': ecode, 'emsg': emsg}

    def shutdown(self):
        """
        You can override this method to do some final actions
        after parsing has been done.
        """

        logging.debug('Job done!')
        #self.tracker.stats.print_summary()

    def inc_count(self, key, display=False, count=1):
        """
        You can call multiply time this method in process of parsing.

        self.inc_count('regurl')
        self.inc_count('captcha')

        and after parsing you can acces to all saved values:

        print 'Total: %(total)s, captcha: %(captcha)s' % spider_obj.counters
        """

        self.counters[key] += count
        if display:
            logging.debug(key)
        return self.counters[key]

    def setup_proxylist(self, *args, **kwargs):
        """
        Save proxylist config which will be later passed to Grab
        constructor.
        """

        self.proxylist_config = (args, kwargs)

    def add_item(self, list_name, item, display=False):
        """
        You can call multiply time this method in process of parsing.

        self.add_item('foo', 4)
        self.add_item('foo', 'bar')

        and after parsing you can acces to all saved values:

        spider_instance.items['foo']
        """

        lst = self.items.setdefault(list_name, [])
        lst.append(item)
        if display:
            logging.debug(list_name)

    def save_list(self, list_name, path):
        """
        Save items from list to the file.
        """

        with open(path, 'w') as out:
            lines = []
            for item in self.items.get(list_name, []):
                if isinstance(item, basestring):
                    lines.append(item)
                else:
                    lines.append(json.dumps(item))
            out.write('\n'.join(lines))

    def render_stats(self):
        out = []
        out.append('Counters:')
        # Sort counters by its names
        items = sorted(self.counters.items(), key=lambda x: x[0], reverse=True)
        out.append('  %s' % '\n  '.join('%s: %s' % x for x in items))
        out.append('\nLists:')
        # Sort lists by number of items
        items = [(x, len(y)) for x, y in self.items.items()]
        items = sorted(items, key=lambda x: x[1], reverse=True)
        out.append('  %s' % '\n  '.join('%s: %s' % x for x in items))

        total_time = time.time() - self.start_time
        out.append('Queue size: %d' % self.taskq.qsize())
        out.append('Threads: %d' % self.thread_number)
        out.append('DOM build time: %.3f' % GLOBAL_STATE['dom_build_time'])
        out.append('Time: %.2f sec' % total_time)
        return '\n'.join(out)

    def save_all_lists(self, dir_path):
        """
        Save each list into file in specified diretory.
        """

        for key, items in self.items.items():
            path = os.path.join(dir_path, '%s.txt' % key)
            self.save_list(key, path)

    def error_handler(self, func_name, ex, task, error_tb=None):
        self.inc_count('error-%s' % ex.__class__.__name__.lower())

        if error_tb:
            logging.error('Error in %s function' % func_name)
            if error_tb:
                logging.error(error_tb)
        else:
            logging.error('Error in %s function' % func_name,
                          exc_info=ex)

        try:
            ex_str = unicode(ex, 'utf-8', 'ignore')
        except TypeError:
            ex_str = str(ex)
        self.add_item('fatal', '%s|%s|%s' % (ex.__class__.__name__,
                                             ex_str, task.url))
        if self.debug_error:
            #import sys, traceback,  pdb
            #type, value, tb = sys.exc_info()
            #traceback.print_exc()
            #pdb.post_mortem(tb)
            import pdb; pdb.set_trace()
        if isinstance(ex, FatalError):
            raise

    # TODO: remove
    #def generate_tasks(self, init):
        #"""
        #Create new tasks.

        #This method is called on each step of main run cycle and
        #at Spider initialization.

        #initi is True only for call on Spider initialization stage
        #"""

        #pass

    def task_generator(self):
        """
        You can override this method to load new tasks smoothly.

        It will be used each time as number of tasks
        in task queue is less then number of threads multiplied on 1.5
        This allows you to not overload all free memory if total number of
        tasks is big.
        """

        if False:
            # Some magic to make this function generator
            yield ':-)'
        return

    def _preprocess_task(self, task):
        """
        Run custom task preprocessor which could change task
        properties or cancel it.

        This method is called *before* network request.

        Return True to continue process the task or False to cancel the task.
        """

        handler_name = 'preprocess_%s' % task.name
        handler = getattr(self, handler_name, None)
        if handler:
            try:
                return handler(task)
            except Exception, ex:
                self.error_handler(handler_name, ex, task)
                return False
        else:
            return task

    def process_task_generator(self):
        """
        Load new tasks from `self.task_generator_object`
        Create new tasks.

        If task queue size is less than some value
        then load new tasks from tasks file.
        """

        qsize = self.taskq.qsize()
        new_count = 0
        min_limit = int(self.thread_number * 1.5)
        if qsize < min_limit:
            try:
                for x in xrange(min_limit - qsize):
                    self.add_task(self.task_generator_object.next())
                    new_count += 1
            except StopIteration:
                self.task_generator_enabled = False

    def create_grab_instance(self):
        return Grab(**self.grab_config)

    def next_page_task(self, grab, task, xpath, **kwargs):
        """
        Return new `Task` object if link that mathes the given `xpath`
        was found.

        This method is used by `grab.spider.shortcuts.paginate` helper.
        """
        nav = grab.xpath(xpath, None)
        if nav is not None:
            url = grab.make_url_absolute(nav.get('href'))
            page = task.get('page', 1) + 1
            grab2 = grab.clone()
            grab2.setup(url=url)
            task2 = task.clone(task_try_count=0, grab=grab2, page=page, **kwargs)
            return task2


    def build_cache_hash(self, url):
        utf_url = url.encode('utf-8') if isinstance(url, unicode) else url
        if self.cache_key_hash:
            return sha1(utf_url).hexdigest()
        else:
            return utf_url

    def remove_cache_item(self, url):
        _hash = self.build_cache_hash(url)
        self.cache.remove({'_id': _hash})

    def stop(self):
        """
        Stop main loop.
        """

        self.should_stop = True

    @classmethod
    def init_with_config(cls, modname):
        """
        This method create spider instance and configure it
        with options found in given config module.
        
        Args:
            :modname string: name of module with settings
        """

        # Load key, value dict from config module
        config = __import__(modname, fromlist=[''])
        config_dict = {}
        for key in dir(config):
            config_dict[key.lower()] = getattr(config, key)

        # Find names of arguments of __init__ method
        arg_names = inspect.getargspec(getattr(cls, '__init__'))[0]
        arg_names = [x.lower() for x in arg_names]

        # Find __init__ arguments in config module
        kwargs = {}
        for name in arg_names:
            if name in config_dict:
                kwargs[name] = config_dict[name]

        # Create Spider instance
        obj = cls(**kwargs)

        # Configure proxy list
        if 'proxylist' in config_dict:
            obj.setup_proxylist(**config_dict['proxylist'])

        return obj

    def follow_links(self, grab, xpath, task_name, task=None):
        """
        Args:
            :xpath: xpath expression which calculates list of URLS

        Example::

            self.follow_links('//div[@class="topic"]/a/@href', 'topic')
        """

        urls = []
        for url in grab.xpath_list(xpath):
            #if not url.startswith('http') and self.base_url is None:
            #    raise SpiderError('You should define `base_url` attribute to resolve relative urls')
            url = urljoin(grab.config['url'], url)
            if not url in urls:
                urls.append(url)
                g2 = grab.clone()
                g2.setup(url=url)
                self.add_task(Task(task_name, grab=g2))