Snippets

Dénes Türei Parse ads from ingatlan.com

Created by Dénes Türei last modified
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import imp
import sys
import bs4
import socket
import urllib.request
import http.client
import gzip
from io import BytesIO
import json
import itertools
import time

try:
    import brotli
except ImportError:
    sys.stdout.write(
        'Module `brotli` not available.\n'
        'Won\'t be able to extract Brotli compressed responses.\n'
        'You can install brotli by `pip install brotli`.\n'
    )

# author Dénes Türei
# turei.denes@gmail.com

# 2018-09-24:
#   -- added Brotli compression handler
# 2018-07-29:
#   -- catching also URLError and ConnectionResetError
# 2018-07-25:
#   -- error handling for failed downloads: 3 attempts & giving up
#   -- list of all towns added to sections
# 2018-03-02
#   -- created

class Ingatlan(object):
    
    baseurl = 'https://ingatlan.com/'
    phoneurl = 'https://ingatlan.com/detailspage/api/%u'
    sections = [
        'budapest',
        'nyiregyhaza',
        'debrecen',
        'miskolc',
        'szeged',
        'gyor',
        'pecs',
        'szombathely',
        'sopron',
        'vac',
        'budaors',
        'szentendre',
        'szekesfehervar',
        'veszprem',
        'szolnok',
        'szekszard',
        'tatabanya',
        'zalaegerszeg',
        'salgotarjan',
        'kaposvar',
        'kecskemet'
    ]
    subsections = ['kiado+lakas']
    
    http_headers = {
        'Host': 'ingatlan.com',
        'User-Agent': 'Mozilla/5.0 (X11; U; Linux i686; en-US; rv:54.0) Gecko/20110304 Firefox/54.0',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
        'Accept-Language': 'en-US,en;q=0.5',
        'Accept-Encoding': 'gzip, deflate, br',
        'Referer': 'https://ingatlan.com/',
        'Cookie': 'ICSESSIDfrontend=nk313j4alqbjsvaj8dp84fm7t4; m1x_switch=off; G_ENABLED_IDPS=google; nps_hidden=1; bbid=9036898247414400',
        'DNT': '1',
        'Connection': 'keep-alive',
        'Upgrade-Insecure-Requests': '1',
        'Pragma': 'no-cache',
        'Cache-Control': 'no-cache'
    }
    
    # if debug messages needed:
    #opener = urllib.request.build_opener(
        #urllib.request.HTTPHandler(debuglevel=1),
        #urllib.request.HTTPSHandler(debuglevel=1)
    #)
    #urllib.request.install_opener(opener)
    
    def __init__(self):
        
        self.ad_urls = set([])
        self.data = {}
    
    def reload(self, children = False):
        modname = self.__class__.__module__
        mod = __import__(modname, fromlist=[modname.split('.')[0]])
        imp.reload(mod)
        new = getattr(mod, self.__class__.__name__)
        setattr(self, '__class__', new)
    
    def main(self):
        
        self.iter_sections()
        self.export_urls()
        self.iter_ads()
        self.export_ads()
    
    def iter_sections(self):
        
        # sections are defined manually above
        
        for sec, subsec in itertools.product(self.sections, self.subsections):
            
            # starting from page 1
            page = 0
            
            # iterating pages:
            while True:
                
                # next page
                page += 1
                # constructing the URL
                self.url  = '%s%s/%s?page=%u' % (
                    self.baseurl, sec, subsec, page
                )
                # generic method for download something
                # and extract HTML
                self.load_page()
                # if the download failed:
                if not self.html:
                    continue
                # create a BeautifulSoup object
                self.soup = bs4.BeautifulSoup(self.html, 'lxml')
                # parse HTML of advertisement list
                self.parse_page()
                
                # stop if we have no more results:
                if self.resn == '0':
                    
                    sys.stdout.write(
                        '[ INFO ] No more URLs, '
                        'section `%s/%s` finished\n' % (
                            sec, subsec
                        )
                    )
                    break
        
    def load_page(self, method = 'GET', data = None):
        
        response = None
        
        # creating request
        req = urllib.request.Request(self.url, method = method, data = data)
        
        # setting headers
        for key, val in self.http_headers.items():
            
            req.add_header(key, val)
        
        sys.stdout.write('[ INFO ] Attempting to load `%s`\n' % self.url)
        
        for i in range(3):
            
            try:
                # opening connection
                try:
                    
                    response = urllib.request.urlopen(req, timeout = 90)
                    
                except UnicodeEncodeError:
                    
                    sys.stdout.write(
                        '[ WARN ] Could not ascii encode: `%s`\n' % self.url
                    )
                    self.html = None
                    break
                
                break
            
            except (
                urllib.error.HTTPError,
                ConnectionResetError,
                urllib.error.URLError,
                socket.timeout
            ) as e:
                
                sys.stdout.write(
                    '%s`--> Failed with `%s`, trying again. Attempt #%u\n' % (
                        ' ' * 13,
                        e.__class__.__name__,
                        i + 2
                    )
                )
                time.sleep(2)
        
        if not response:
            
            sys.stdout.write(
                '[ WARN ] Failed to download `%s`. Giving up.\n' % self.url
            )
            self.html = None
            return
        
        resp_encoding = response.info()['Content-Encoding']
        
        # loading response into BytesIO
        response_file = BytesIO(response.read())
        # read 2 bytes
        magic = response_file.read(2)
        response_file.seek(0)
        
        # extracting HTML from response
        if magic == b'\x1f\x8b' or resp_encoding == 'gzip':
            
            # gzip extraction
            self.html = gzip.GzipFile(
                fileobj = response_file
            ).read().decode('utf-8') # decoding bytes to UTF-8
            
        elif resp_encoding == 'br':
            
            # brotli extraction
            self.html = brotli.decompress(response_file.read()).decode('utf-8')
            
        else:
            
            # assume no compression
            self.html = response_file.read().decode('utf-8')
        
        #sys.stdout.write('[  !!  ] Error downloading `%s`\n' % self.url)
    
    def parse_page(self):
        
        # check if we have more than zero result
        self.resn = self.soup.find(
            'span',
            {'class': 'results__number__count'}
        ).text.strip()
        
        # these are the individual ads on one page
        for card in self.soup.find_all('div', {'class': 'listing__card'}):
            
            # the very first anchor is the ad URL
            # we collect these in the ad_urls set:
            self.ad_urls.add(card.find('a').get('href'))
    
    def iter_ads(self):
        
        # iterate advertisement URLs
        for ad_url in self.ad_urls:
            
            self.url = '%s%s' % (self.baseurl, ad_url)
            self.load_page()
            # if the download failed:
            if not self.html:
                continue
            
            self.soup = bs4.BeautifulSoup(self.html, 'lxml')
            self.parse_ad()
    
    def export_urls(self, fname = 'urls.txt'):
        
        # writing URLs to file
        with open(fname, 'w') as fp:
            
            _ = fp.write('\n'.join(
                '%s%s' % (self.baseurl, url)
                for url in self.ad_urls
            ))
    
    def get_phone(self, adid):
        
        self.url = self.phoneurl % int(adid)
        data = ((
            '{"id":%u,"is_favourite":false,"is_hidden":false,'
            '"is_phone_number_visible":true,"phone_numbers":[]}'
        ) % int(adid)).encode('ascii')
        self.http_headers['Content-type'] = 'application/json'
        self.load_page(method = 'PUT', data = data)
        del self.http_headers['Content-type']
        
        return (
            ''
                if not self.html else
            ';'.join(json.loads(self.html)['phone_numbers'])
        )
    
    def parse_ad(self):
        
        def get_param(div, sub = 'parameter-value'):
            
            try:
                
                param = self.soup.find('div', {'class': div}).find(
                'span', {'class': sub}).text
                
            except:
                
                param = 'none'
            
            return param
        
        # here write some BeautifulSoup code to extract info
        # from an individual ad
        ad_url = self.url
        try:
            title   = self.soup.find('h1', {'class': 'js-listing-title'}).text
        except:
            title   = 'none'
        try:
            subtype = self.soup.find('div', {'class': 'listing-subtype'}).text
        except:
            title   = 'none'
        price   = get_param('parameter-price')
        area    = get_param('parameter-area-size')
        rooms   = get_param('parameter-room')
        try:
            desc    = self.soup.find('div', {'class': 'long-description'}).text
        except:
            desc    = 'none'
        try:
            agent   = self.soup.find('div', {'class': 'agent-name'}).text
        except:
            agent   = 'none'
        adid     = self.soup.find('b', {'class': 'listing-id'}).text
        phone    = self.get_phone(adid)
        # here a typo in webpage code :)
        dparam  = self.soup.find('div', {'class': 'paramterers'})
        param   = {}
        
        for tab in dparam.find_all('table'):
            
            for tr in tab.find_all('tr'):
                
                td0 = tr.find('td')
                key = td0.text
                val = td0.find_next_sibling().text
                param[key] = val
        
        self.data[ad_url] = {
            'title':   title,
            'id':      adid,
            'subtype': subtype,
            'price':   price,
            'area':    area,
            'rooms':   rooms,
            'agent':   agent,
            'phone':   phone,
            'desc':    desc,
            'param':   param
        }
    
    def export_ads(self, fname = 'ads.tsv'):
        
        with open(fname, 'w') as fp:
            
            hdr = [
                'url',
                'id',
                'title',
                'subtype',
                'price',
                'area',
                'rooms',
                'agent',
                'phone',
                'desc',
                'param'
            ]
            
            fp.write('%s\n' % ('\t'.join(hdr)))
            
            for url, data in self.data.items():
                
                fp.write('%s\t%s\t%s\n' % (
                    url,
                    '\t'.join(
                        # except url and param
                        data[h] for h in hdr[1:-1]
                    ),
                    ';'.join('%s|%s' % i for i in data['param'].items())
                ))

Comments (0)

HTTPS SSH

You can clone a snippet to your computer for local editing. Learn more.