1. Paul Tan
  2. pypixiv

Commits

Paul Tan  committed 8cf376f

Initial Commit

  • Participants
  • Branches default

Comments (0)

Files changed (2)

File pixiv.py

View file
+"""
+API for Pixiv via screen scraping
+
+Usage
+======
+(All actual values replaced by placeholders until
+I get an example illustration and manga up)
+
+First, get your Pixiv context,
+>>> import pixiv
+>>> context = pixiv.Pixiv("USERNAME", "PASSWORD")
+
+Get the Illust or Manga instance from a URL,
+>>> x = context.get_url("MANGA OR ILLUST URL")
+>>> x
+Illust(Pixiv("USERNAME"), ILLUST_ID)
+
+Download an Illust or Manga,
+>>> context.download(x)
+["FILENAME1", "FILENAME2", ...]
+
+And finally, logout,
+>>> context.logout()
+
+"""
+from collections import namedtuple
+from abc import ABCMeta
+from functools import total_ordering
+#TODO: Write tests
+
+__all__ = ["Illust", "Manga", "Pixiv", "Sizes"]
+
+"""
+Pixiv Notes
+=====================
+- If not logged in, illust manga modes works and displays
+exactly the same view as logged-in,
+BUT without the permalink links.
+However, manga_big mode will not work,
+and will display the same content as manga_view without
+any redirection.
+"""
+
+"""
+Low-level API
+===============
+Kept closely in sync with the Pixiv Website.
+Can be used (pixivutils uses it), but API Compatibility between versions
+is not guaranteed.
+
+Low-level API only contains the generators and parsers.
+Functions which make HTTP Requests should be considered
+the Mid-Level API.
+"""
+
+"""
+API Annotation functions
+"""
+
+class OneOf:
+    def __init__(self, *classes):
+        self.classes = classes
+    def __repr__(self):
+        return "OneOf({})".format(
+                ", ".join([repr(x) for x in self.classes]))
+
+"""
+Base Definitions
+"""
+
+#Models HTTP Requests
+#TODO: Should probably be moved into another library
+Request = namedtuple('Request', ('method', 'url', 'data', 'headers',
+    'cookies'))
+
+"""
+Session State Data Model
+"""
+Session = namedtuple('Session', ('id',))
+
+"""
+Illust URL Parser / Generator
+"""
+
+# Models data in illust URL
+IllustUrl = namedtuple('IllustUrl', ('id', 'mode', 'page'))
+
+def parse_illust_url(url) -> IllustUrl:
+    """Parse illustration URL, returning Illustration Id and mode"""
+    #Form: http://www.pixiv.net/member_illust.php?mode=medium&illust_id=ID
+    import re
+    from urllib.parse import urlsplit, parse_qs
+    url = urlsplit(url)
+    query = dict((k, v[0]) for k,v in parse_qs(url.query).items())
+    if url.scheme == 'http' and \
+            url.path == '/member_illust.php' and \
+            url.netloc == 'www.pixiv.net' and \
+            'illust_id' in query and \
+            'mode' in query:
+        return IllustUrl(
+                int(query['illust_id']), query['mode'],
+                query['page'] if 'page' in query else None)
+    else:
+        raise ValueError()
+
+# URL Construction
+def gen_illust_url(id, mode = 'medium', page = None):
+    from urllib.parse import urlencode
+    p = {
+            'illust_id': id,
+            'mode': mode,
+            'page': page
+            }
+    p = dict(x for x in p.items() if x[1] is not None)
+    return 'http://www.pixiv.net/member_illust.php?{}'.format(urlencode(p))
+
+"""
+Illust Content Parsers
+These Parsers expect the page content to be one viewed
+when a user is logged in, and will raise an Exception when
+this is not so.
+"""
+
+def raise_for_illust_not_logged_in_content(parser):
+    """Check if user is logged in. parser must be a 
+    BeautifulSoup parser"""
+    if parser.find("body", {"class": "not-logged-in"}):
+        raise Exception("User is not logged in or session expired.")
+
+def raise_for_illust_manga_big_not_logged_in_content(parser):
+    """Raises an exception if user is not logged in in
+    manga_big mode. Pixiv displays the same content
+    as manga mode when the user is not logged in, hence
+    we look for elements which exist in manga mode but
+    not in manga_big mode."""
+    if parser.find("body", {"class": "scroll-view"}):
+        raise Exception("User is not logged in or session expired.")
+
+def parse_illust_content(content, mode):
+    """Select illust content parser according to provided mode"""
+    parsers = {
+            'manga': parse_illust_manga_content,
+            'big': parse_illust_big_content,
+            'manga_big': parse_illust_manga_big_content
+            }
+    return parsers[mode](content)
+
+IllustBigContentParseResult = namedtuple('IllustBigContentParseResult',
+        ('imgurl',))
+
+def parse_illust_big_content(content) -> IllustBigContentParseResult:
+    from bs4 import BeautifulSoup
+    p = BeautifulSoup(content)
+    raise_for_illust_not_logged_in_content(p)
+    return IllustBigContentParseResult(p.img['src'])
+
+IllustMangaPage = namedtuple('IllustMangaPage', ('n', 'imgurl'))
+
+IllustMangaContentParseResult = namedtuple(
+    'IllustMangaContentParseResult', 
+        ('pages',))
+
+def parse_illust_manga_content(content) -> IllustMangaContentParseResult:
+    import re
+    #NOTE: No login check because pixiv displays the
+    #same content as logged in user (with slight differences
+    #such as no permalink which is irrelevant to the
+    #parser)
+    from bs4 import BeautifulSoup
+    p = BeautifulSoup(content)
+    raise_for_illust_not_logged_in_content(p)
+    regex = r"pixiv.context.images\[(\d+?)\]\.unshift\('(.+?)'\)"
+    if isinstance(content, bytes):
+        regex = regex.encode('utf-8')
+    matches = re.findall(regex, content)
+    if isinstance(content, bytes):
+        matches = [(x.decode(), y.decode()) for x, y in matches]
+    matches = [(int(x), y) for x, y in matches]
+    #Generate array
+    imgurls = [None for x in matches]
+    for i, x in matches:
+        imgurls[i] = IllustMangaPage(i, x)
+    return IllustMangaContentParseResult(imgurls)
+
+IllustMangaBigContentParseResult = namedtuple(
+        'IllustMangaBigContentParseResult',
+        ('imgurl',))
+
+def parse_illust_manga_big_content(content) -> str:
+    from bs4 import BeautifulSoup
+    p = BeautifulSoup(content)
+    raise_for_illust_manga_big_not_logged_in_content(p)
+    return IllustMangaBigContentParseResult(p.img['src'])
+
+
+"""
+Illust Referer URL Generators
+"""
+
+def gen_illust_referer(id, mode = 'medium', page = None):
+    f = {'big': gen_illust_referer_big,
+            'manga': gen_illust_referer_manga,
+            'manga_big': gen_illust_referer_manga_big}
+    if mode in f:
+        return f[mode](id, page)
+    else:
+        return None
+
+def gen_illust_referer_big(id, *args):
+    return gen_illust_url(id, mode = 'medium')
+
+def gen_illust_referer_manga(id, *args):
+    return gen_illust_url(id, mode = 'medium')
+
+def gen_illust_referer_manga_big(id, page):
+    return gen_illust_url(id, mode = 'manga', page = page)
+
+"""
+Illust Image Referer URL Generators
+"""
+
+def gen_illust_img_referer(illust_id, mode, page = None):
+    f = {'medium': gen_illust_img_referer_medium,
+            'big': gen_illust_img_referer_big,
+            'manga': gen_illust_img_referer_manga,
+            'manga_big': gen_illust_img_referer_manga_big
+            }
+    if mode in f:
+        return f[mode](illust_id, page)
+    else:
+        return None
+
+def gen_illust_img_referer_medium(illust_id, *args):
+    return gen_illust_url(illust_id, 'medium')
+
+def gen_illust_img_referer_big(illust_id, *args):
+    return gen_illust_url(illust_id, 'big')
+
+def gen_illust_img_referer_manga(illust_id, *args):
+    return gen_illust_url(illust_id, 'medium')
+
+def gen_illust_img_referer_manga_big(illust_id, *args):
+    return gen_illust_url(illust_id, 'manga')
+
+
+"""
+Illust HTTP Request Generators
+"""
+def gen_illust_request(session, id, mode, page = None):
+    modes = ("medium", "big", "manga", "manga_big")
+    if mode in modes:
+        return Request('get', gen_illust_url(id, mode, page),
+                None,
+                headers = {'Referer': gen_illust_referer(id, mode, page)},
+                cookies = {'PHPSESSID': session.id})
+    else:
+        raise ValueError("modes must be one of {}. Got: {}".format(modes,
+            mode))
+
+"""
+Illust Image HTTP Request Generators
+"""
+
+def gen_illust_img_request(session, imgurl, id, mode, page = None):
+    modes = ("medium", "big", "manga", "manga_big")
+    if mode in modes:
+        return Request('get', imgurl, None,
+                headers = {'Referer': gen_illust_img_referer(id, mode, page)},
+                cookies = {'PHPSESSID': session.id})
+    else:
+        raise ValueError("modes must be one of {}".format(modes))
+
+
+"""
+Illust Manga Permalink Generator
+(to derive manga_big URL from manga URL)
+"""
+
+def gen_illust_manga_permalink(id, page):
+    """page is 0-index."""
+    from urllib.parse import urlencode
+    p = {'mode': 'manga_big',
+            'illust_id': int(id),
+            'page': int(page) }
+    p = dict(x for x in p.items() if x[1] is not None)
+    return 'http://www.pixiv.net/member_illust.php?{}'.format(urlencode(p))
+
+"""
+Session Authentication: Login
+"""
+
+def gen_login_url(ssl = True):
+    if ssl:
+        return 'https://ssl.pixiv.net/login.php'
+    else:
+        return 'http://www.pixiv.net/login.php'
+
+def gen_login_params(username_or_email, password, remember_me = False):
+    p = {'mode': 'login', 'pixiv_id': username_or_email,
+            'pass': password, 'skip': '1' if remember_me else None}
+    return dict(x for x in p.items() if x[1] is not None)
+
+def gen_login_request(username_or_email, password, remember_me = False, ssl = True):
+    return Request('post', gen_login_url(ssl),
+            data = gen_login_params(username_or_email, password,
+                remember_me),
+            headers = None,
+            cookies = None)
+
+"""
+Session Authentication: Logout
+"""
+def gen_logout_url():
+    return 'http://www.pixiv.net/logout.php'
+
+def gen_logout_request(session):
+    return Request('get', gen_logout_url(),
+            data = None,
+            headers = None,
+            cookies = {'PHPSESSID': session.id})
+
+"""
+Internal Data Structures for High Level API
+"""
+
+class IllustModeParseCache:
+    from functools import lru_cache
+    def __init__(self, session, id):
+        self.id = int(id)
+        self.session = session
+    @lru_cache(maxsize = 20)
+    def __getitem__(self, key):
+        import requests
+        #If key is str, treat as mode. Else treat as (mode, page)
+        if isinstance(key, str):
+            mode = key
+            page = None
+        else:
+            mode = key[0]
+            page = key[1]
+        req = gen_illust_request(self.session, self.id, mode, page)
+        args = req._asdict()
+        args['allow_redirects'] = False
+        r = requests.request(**args)
+        r.raise_for_status()
+        return parse_illust_content(r.content, mode)
+
+
+def enum(classname, names):
+    class EnumClass:
+        __slots__ = names
+        def __iter__(self):        
+            return iter(constants)
+        def __len__(self):         
+            return len(constants)
+        def __getitem__(self, i):  
+            return constants[i]
+        def __repr__(self):        
+            return '{}({})'.format(classname,names)
+        def __contains__(self, key):
+            if hasattr(key, 'EnumType'):
+                return self is key.EnumType
+            else:
+                return False
+    class EnumValue:
+        __slots__ = ('__value')
+        def __init__(self, value): 
+            self.__value = value
+        Value = property(lambda self: self.__value)
+        EnumType = property(lambda self: EnumType)
+        def __hash__(self):        
+            return hash(self.__value)
+        def __eq__(self, other):
+            if self.EnumType is other.EnumType:
+                return self.__value == other.__value
+            else:
+                ValueError('Only values from the same enum are comparable')
+        def __lt__(self, other):
+            if self.EnumType is other.EnumType:
+                return self.__value < other.__value
+            else:
+                ValueError('Only values from the same enum are comparable')
+        def __invert__(self):      
+            return constants[maximum - self.__value]
+        def __nonzero__(self):     
+            return bool(self.__value)
+        def __repr__(self):        
+            return '{}.{}'.format(classname, names[self.__value])
+    maximum = len(names) - 1
+    constants = [None] * len(names)
+    for i, each in enumerate(names):
+        val = EnumValue(i)
+        setattr(EnumClass, each, val)
+        constants[i] = val
+    constants = tuple(constants)
+    EnumType = EnumClass()
+    return EnumType
+
+"""
+Mid-Level API
+===============
+These methods perform HTTP Requests directly using the
+python requests library
+"""
+
+# Medium-level methods which perform the actual HTTP Requests
+def login(username_or_email, password, remember_me = False, ssl = True) -> Session:
+    """Login using requests. Returns PHPSESSID on success."""
+    #On login success, Header location set
+    import requests
+    req = gen_login_request(username_or_email, password,
+            remember_me, ssl)
+    args = req._asdict()
+    args['allow_redirects'] = False
+    r = requests.request(**args)
+    r.raise_for_status()
+    if 'Location' in r.headers:
+        return Session(r.cookies['PHPSESSID'])
+    else:
+        raise ValueError('Invalid username or password')
+
+def logout(session):
+    import requests
+    req = gen_logout_request(session)
+    args = req._asdict()
+    args['allow_redirects'] = False
+    r = requests.request(**args)
+    r.raise_for_status()
+    if 'Location' in r.headers:
+        return
+    else:
+        raise Exception('Could not detect successful logout')
+
+def req_download(request:dict):
+    """Downloads the request using the requests library into the 
+    current directory"""
+    from urllib.parse import urlsplit
+    from os.path import basename
+    import requests
+    url = request['url']
+    path = urlsplit(url).path
+    filename = basename(path)
+    f = open(filename, 'wb')
+    r = requests.request(**request)
+    r.raise_for_status()
+    while True:
+        b = r.raw.read(2046)
+        if len(b) > 0:
+            f.write(b)
+        else:
+            break
+    f.close()
+    return filename
+    
+
+"""
+High Level API
+=================
+(Will try not to break compatibility)
+"""
+
+#Sizes available for images in Illust and Manga
+Sizes = enum('Sizes', ('medium', 'large'))
+
+#High level classes
+class Illust:
+    """Illust"""
+    class Size:
+        def __init__(self, mode, parsecache):
+            self.mode = mode
+            self.pc = parsecache
+        @property
+        def imgrequests(self):
+            return [gen_illust_img_request(self.pc.session, self.imgurl, self.pc.id,  self.mode)]
+        @property
+        def imgurl(self):
+            return self.pc[self.mode].imgurl
+        @property
+        def imgurls(self):
+            return [self.imgurl]
+
+    def __init__(self, context, id:int):
+        self.context = context
+        self.id = id
+        self.pc = IllustModeParseCache(self.context.session, self.id)
+        self.sizes = dict((x, Illust.Size(x, self.pc)) for x in ('medium', 'big'))
+
+    @staticmethod
+    def from_url(context, url):
+        """Returns an Illust instance from a URL
+        using Pixiv instance `context`.
+        Raises ValueError if URL is invalid."""
+        p = parse_illust_url(url)
+        if p.mode in ('medium', 'big'):
+            return Illust(context, p.id)
+        else:
+            raise ValueError('Not a Illust URL')
+    
+    def __str__(self):
+        return 'Illust({})'.format(self.id)
+
+    def __repr__(self):
+        return 'Illust({}, {})'.format(repr(self.context), 
+                repr(self.id))
+
+class Manga:
+    """Manga"""
+    #NOTE: Even though internally (in Pixiv) illustrations and manga
+    #are the same, they are treated differently in the high-level
+    #API
+    class MediumSize:
+        def __init__(self, parsecache):
+            self.pc = parsecache
+        @property
+        def imgurls(self):
+            """Returns the medium imgurl for each page"""
+            p = self.pc['manga']
+            return [x.imgurl for x in p.pages]
+        @property
+        def imgrequests(self):
+            return [gen_illust_img_request(self.pc, x, self.pc.id, 'manga') for x in \
+                    self.imgurls]
+
+    class LargeSize:
+        def __init__(self, parsecache):
+            self.pc = parsecache
+        @property
+        def imgurls(self):
+            """Returns the big imgurl for each page"""
+            p = self.pc['manga']
+            return [self.pc['manga_big', i].imgurl for i, _ in enumerate(p.pages)]
+        @property
+        def imgrequests(self):
+            return [gen_illust_img_request(self.pc.session, x,  self.pc.id, 'manga_big') for x in \
+                    self.imgurls]
+
+    def __init__(self, context, id):
+        self.context = context
+        self.id = id
+        #Cached mode=manga request
+        self._manga_content = None
+        self.pc = IllustModeParseCache(self.context.session, self.id)
+        self.sizes = {Sizes.medium: Manga.MediumSize(self.pc),
+                Sizes.large: Manga.LargeSize(self.pc)}
+
+    @property
+    def pages(self):
+        return self.pc['manga'].pages
+
+    @staticmethod
+    def from_url(context, url):
+        """Returns a Manga instance from a URL using
+        the Pixiv instance `context`.
+        Raises ValueError if URL is invalid."""
+        p = parse_illust_url(url)
+        if p.mode in ('manga', 'manga_big'):
+            return Manga(context, p.id)
+        else:
+            raise ValueError('Not a manga URL')
+    def __str__(self):
+        return 'Manga({})'.format(self.id)
+    def __repr__(self):
+        return 'Manga({}, {})'.format(repr(self.context), 
+                repr(self.id))
+
+
+class Pixiv:
+    """High level API for Pixiv"""
+    def __init__(self, username_or_email:str, password:str, 
+            remember_me:bool = False, ssl:bool = True) -> None:
+        self.username_or_email = username_or_email
+        self.remember_me = remember_me
+        self.ssl = ssl
+        self.session = login(username_or_email, password,
+                remember_me, ssl)
+
+    def get_url(self, url:str) -> OneOf(Illust, Manga):
+        """Returns one of the following:
+        - Illust
+        - Manga
+        based on the URL. Raises ValueError if the URL
+        is invalid.
+        """
+        p = [Illust.from_url, Manga.from_url]
+        for x in p:
+            try:
+                return x(self, url)
+            except ValueError:
+                pass
+        raise ValueError('Invalid URL')
+
+    def download(self, x, size = None) -> [str]:
+        """Downloads an Illust or Manga `x` to the
+        current directory. `size` can be used to select
+        the sizes of the images. 
+        If `size` is a Sizes enum, then images of only that size
+        will be selected.
+        If `size` is a function, it will be called with a 
+        list of Sizes. The function should return the
+        Sizes value to be selected, or None if no image
+        should be downloaded.
+        Returns the list of filenames of files downloaded."""
+        from collections import Mapping
+        if size is None:
+            size_func = lambda x: sorted(x, reverse = True)[0]
+        elif size in Sizes:
+            size_func = lambda x: size if size in x else None
+        else:
+            size_func = size
+        def get_imgrequests(x):
+            if hasattr(x, 'sizes') and isinstance(x.sizes, Mapping):
+                selected_size = size_func(x.sizes.keys())
+                if selected_size is not None:
+                    return get_imgrequests(x.sizes[selected_size])
+                else:
+                    return None
+            else:
+                return x.imgrequests
+        reqs = get_imgrequests(x)
+        filenames = [req_download(x._asdict()) for x in reqs]
+        return filenames
+
+    def download_url(self, url:str, size = None) -> [str]:
+        """
+        Parses the URL, and if it is an Illust or Manga,
+        downloads it to the current directory.
+        `size` can be used to select
+        the sizes of the images. 
+        If `size` is a Sizes enum, then images of only that size
+        will be selected.
+        If `size` is a function, it will be called with a 
+        list of Sizes. The function should return the
+        Sizes value to be selected, or None if no image
+        should be downloaded.
+        Returns the list of filenames of files downloaded."""
+        x = self.get_url(url)
+        return self.download(x, size)
+
+    def logout(self) -> None:
+        """Logs out the session. This instance is invalid
+        once this function is called."""
+        return logout(self.session)
+
+    def __str__(self):
+        return 'Pixiv({})'.format(self.username_or_email)
+
+    def __repr__(self):
+        return 'Pixiv({}, ..., {}, {})'.format(
+                repr(self.username_or_email),
+                repr(self.remember_me),
+                repr(self.ssl))

File setup.py

View file
+#! /usr/bin/env python3
+from distutils.core import setup
+
+setup(name = 'pixiv',
+        version = '0.1.1',
+        py_modules = ['pixiv']
+        )
+
+