Source

Pypaste / friendpaste / models.py

The default branch has multiple heads

Full commit
# -*- coding: utf-8 -
# Copyright 2008 by Benoît Chesneau <benoitc@e-engura.com>
# 
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#@
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import datetime
from uuid import UUID

from couchdb.client import ResourceNotFound, ResourceConflict
from couchdb.schema import *

from friendpaste.utils import local, make_hash, short
from friendpaste.utils.diff import unified_diff, diff_blocks
from friendpaste.utils.base62 import b62encode

__all__ = ['Paste']

class PasteExist(Exception):
    """ exception raised when paste is found """
    
class ArrayField(Field):
    _to_python = list


class Review(Document):
    pasteid = TextField()
    revision = TextField()
    nb_line = IntegerField()
    reviews = ArrayField(default=[])

    itemType = TextField(default='review')

    @classmethod
    def get_line(cls, db, snippet, nb_line):
        if snippet.itemType == "revision":
            pasteid = snippet.parent
        else:
            pasteid = snippet.id

        rows = db.view("_view/reviews/line_for_revision", 
                key=[pasteid, int(nb_line), snippet.revid])
        reviews = list(rows)
        results = []
        if reviews:
            results = reviews[0].value['reviews']
            results.sort(lambda a,b: cmp(a['created'], b['created']))
            results.reverse()
        
        return results

    @classmethod
    def review_for_line(cls, db, snippet, nb_line):
        if snippet.itemType == "revision":
            pasteid = snippet.parent
        else:
            pasteid = snippet.id

        ireviews = cls.view(db, "_view/reviews/line_for_revision", 
                key=[pasteid, int(nb_line), snippet.revid])
        results = list(ireviews)
        if results:
            return results[0]
        return None

    @classmethod
    def get_lines_reviews(cls, db, pasteid, revision):
        results = cls.view(db, '_view/reviews/lines_for_revision', key=[pasteid, revision])
        return list(results)

    @classmethod
    def get_reviews_counts(cls, db, pasteid, revision):
        results = db.view('_view/reviews/reviews_count', key=[pasteid, revision])
        return dict([row.value for row in list(results)])

    @classmethod
    def update_positions(cls, db, pasteid, revision, old_revision, unchanged):
        lines_reviews = cls.get_lines_reviews(db, pasteid, old_revision)
        docs = []
        for review in lines_reviews:
            if review.nb_line in unchanged:
                docs.append({
                    'pasteid': pasteid,
                    'revision': revision,
                    'nb_line': unchanged[review.nb_line],
                    'reviews': review.reviews,
                    'itemType': 'review'
                })
        db.resource.post('_bulk_docs', { "docs": docs })

    
class Fork(Document):
    fork_parent = TextField(default='')
    forked_atrevison = TextField(default='')
    fork_id = TextField()
    fork_date = DateTimeField(default=datetime.utcnow())

    itemType = TextField(default='fork')

class Privacy(Document):
    pasteid = TextField()
    privacy = TextField(default='open')
    password = TextField(default='')
    last_changed = DateTimeField()

    itemType = TextField(default='privacy')

    def store(self, db):
        self.last_changed = datetime.utcnow()
        super(Privacy, self).store(db)

    @classmethod
    def for_paste(cls, db, pasteid):
        rows = cls.view(db, '_view/auth/pasteid', key=pasteid)
        results = list(rows)
        if results:
            return results[0]
        return None 

class Paste(Document):
    title = TextField()
    parent = TextField(default='')
    previous  = TextField(default='')
    next = TextField(default='')
    revid = TextField()
    content = TextField(default='')
    language = TextField(default='text')
    changes = ArrayField(default=[])
    old_id = TextField(default='')
    fork = BooleanField(default=False)
    forked = BooleanField(default=False)
    fork_parent = TextField(default='')
    forked_atrevision = TextField(default='')
    edit_code = TextField(default='')
    locked = BooleanField(default=False)
    created = DateTimeField()
    updated = DateTimeField()
    
    itemType = TextField(default='paste')

    def store(self, db):
        """
        override store procedure to generate pasteid
        if not provided and manage revisions
        """
        self.updated = datetime.utcnow()
        self.content = self.content.replace("\r\n", "\n");
        if self.id is None:
            self.created = datetime.utcnow()
            node = make_hash(self.content, self.title, self.language,'')
            self.revid = short(node)
            docid = db.create(self._data)
            self._data = db.get(docid)
        else:
            original_data =  self._data

            if self.itemType == "revision":
                old_data = db.get(self.parent)
                self._data['_id'] = old_data['_id']
                self._data['_rev'] = old_data['_rev']
                self.itemType = 'paste'
            else:
                old_data = db.get(self.id)

            self.created = datetime.utcnow()
            old_hash = make_hash(old_data['content'],old_data['title'],old_data['language'], old_data['forked'])
            new_hash = make_hash(self.content,self.title,self.language, self.forked)
            if old_hash != new_hash:
                del old_data['_id']
                del old_data['_rev']
                old_data['parent'] = self.id
                old_data['itemType'] = 'revision'  
                _previous = db.create(old_data)
                
                # get new revid
                node = make_hash(self.content,self.title,self.language, old_data['revid'])
                self.revid = short(node)
                
                # save previous revision id, could be usefull
                self.previous = _previous
                
                # get changes 
                changes = diff_blocks(old_data['content'].splitlines(),
                    self.content.splitlines(), 3, 8, 1, 0, 1)
                _changes = []
                for row in changes:
                    for change in row:
                        _changes.append(change)
                self.changes = _changes
            
                # save changes
                try:
                    db[self.id] = self._data
                except ResourceConflict:
                    # if conflict, remove our changes first
                    # then raise.
                    self._data = original_data
                    if _previous:
                        del db[_previous]

                    raise ResourceConflict

            elif old_data['edit_code'] != self.edit_code or \
                    old_data['locked'] != self.locked:
                db[self.id] = self._data

        return self
       
    @classmethod
    def get_paste(cls, db, pasteid, revid=None):
        if revid is not None:
            return cls.for_revision(db, pasteid, revid)

        return cls.load(db, pasteid)
    
    @classmethod
    def is_exist(cls, db, pasteid):   
        if pasteid in db:
            return True
        return False
    
    def revision(self, db, revid):
        if not self.id:
            return None
        rows = self.view(db, '_view/paste/by_nbrevisions',
                key=[self.pasteid, revid])
        rows = list(iter(rows))
        if rows:
            return rows[0]

        return None
  
    @classmethod
    def for_revision(cls, db, pasteid, revid):
        rows = cls.view(db, '_view/paste/by_nbrevisions',
                key=[pasteid, revid])
        rows = list(rows)
        if rows:
            return rows[0]
        return None

    def revisions(self, db):
        if not self.id or not self.previous:
            return []
        rows = self.view(db, '_view/paste/revisions', key=self.id)
        revisions = list(iter(rows))
        if revisions:
            revisions.sort(lambda a,b: cmp(a.updated, b.updated))
            revisions.reverse()
        return revisions   
        
    def get_changeset(self, db):
        previous = Paste.load(db, self.previous)
        unidiff = '--- Revision %s\n+++ Revision %s\n' % (previous.revid, self.revid) + \
                '\n'.join(unified_diff (previous.content.splitlines(), self.content.splitlines(), 3))
        tabular = diff_blocks(previous.content.splitlines(), self.content.splitlines(), 3, 8, 1, 0, 1)
        return unidiff, tabular, previous
        
    @classmethod
    def with_revisions(cls, db, pasteid, revid):
        rows = cls.view(db, '_view/paste/with_revisions', startkey=[pasteid,0], endkey=[pasteid, 1])
        results = list(rows)

        encoded_id = b62encode(UUID(pasteid).int)
        snippet = None
        if results:
            results.sort(lambda a,b: cmp(a.updated, b.updated))
            results.reverse()
            revisions = []
            for paste in results:
                paste.pasteid = encoded_id
                if revid is not None and paste.revid == revid:
                    snippet = paste
                else:
                    revisions.append(paste)
            return snippet, revisions
        return None, []

    @classmethod
    def lock(cls, db, pasteid, locked):
        """ lock paste and all revisions """
        rows = cls.view(db, '_view/paste/with_revisions', startkey=[pasteid,0], endkey=[pasteid, 1])
        results = list(rows)

        docs = []
        if results:
            for paste in results:
                paste.locked = locked
                docs.append(paste._data)

            db.resource.post('_bulk_docs', { "docs": docs })


    @classmethod
    def delete(cls, db, pasteid):
        """ delete paste with all its revisions """
        rows = cls.view(db, '_view/paste/with_revisions', startkey=[pasteid,0], endkey=[pasteid, 1])
        results = list(rows)
        docs = []
        if results:
            for paste in results:
                data = paste._data
                data['_deleted'] = True
                docs.append(data)
            db.resource.post('_bulk_docs', { "docs": docs })