Tarek Ziadé avatar Tarek Ziadé committed bcede51

using sql now to store the URLs

Comments (0)

Files changed (6)

 pyramid.default_locale_name = en
 
 keys = booba76
+sqluri = sqlite:////tmp/errli.db
+
 
 [server:main]
 use = egg:Paste#http

errli/__init__.py

 """Main entry point
 """
 from pyramid.config import Configurator
-from errli.db import MemoryStorage
+from errli.db import SQLStorage
 
 
 def main(global_config, **settings):
     config = Configurator(settings=settings)
     config.include("cornice")
     config.scan("errli.views")
-    config.registry['storage'] = MemoryStorage()
+    config.registry['storage'] =  SQLStorage(settings['sqluri'])
     return config.make_wsgi_app()
 """
 import binascii
 import os
+from sqlalchemy import create_engine
+from sqlalchemy.ext.declarative import declarative_base, Column
+from sqlalchemy import Integer, String
 
 
-class MemoryStorage(dict):
+_Base = declarative_base()
+
+
+class Short(_Base):
+    __tablename__ = 'short'
+    id = Column(Integer, primary_key=True)
+    key = Column(String(10), nullable=False)
+    url = Column(String(512), nullable=False)
+
+
+short = Short.__table__
+
+
+GET_URL = """\
+select
+    url
+from
+    short
+where
+    key = :key
+"""
+
+
+GET_KEY = """\
+select
+    key
+from
+    short
+where
+    url = :url
+"""
+
+
+ADD = """\
+insert into short
+    (url, key)
+values
+    (:url, :key)
+"""
+
+
+class SQLStorage(object):
+
+    def __init__(self, sqluri):
+        self.engine = create_engine(sqluri)
+        short.metadata.bind = self.engine
+        short.create(checkfirst=True)
+
     def add_short(self, long_url):
+        key = self.get_key(long_url)
+        if key is not None:
+            return key
         new_key = None
         tries = 0
 
         while new_key is None and tries < 100:
             key = binascii.b2a_hex(os.urandom(4))[:4]
-            if key not in self:
+            if self.get_url(key) is None:
                 new_key = key
             else:
                 tries += 1
         if new_key is None:
             raise ValueError("Cannot do this")
 
-        self[new_key] = long_url
+        self.engine.execute(ADD, key=new_key, url=long_url)
         return new_key
 
-    def get_redirect(self, key):
-        return self[key]
+    def get_url(self, key):
+        res = self.engine.execute(GET_URL, key=key)
+        res = res.fetchone()
+        if res is None:
+            return None
+        return res.url
+
+    def get_key(self, url):
+        res = self.engine.execute(GET_KEY, url=url)
+        res = res.fetchone()
+        if res is None:
+            return None
+        return res.key

errli/tests/test_errli.py

 
 from pyramid import testing
 from errli.views import new_url, get_url
-from errli.db import MemoryStorage
+from errli.db import SQLStorage
 
 
 class ViewTests(unittest.TestCase):
         self.config = testing.setUp()
         self.config.include("cornice")
         self.config.scan("errli.views")
-        self.config.registry['storage'] = MemoryStorage()
+        self.config.registry['storage'] = SQLStorage('sqlite:///:memory:')
 
     def tearDown(self):
         testing.tearDown()
 short = Service(name='shortener', path='/', description="URL Shortener")
 
 
+def get_storage(request):
+    return request.registry['storage']
+
+
 def auth(request):
     """Look for an authorized key.
     """
         raise exc.HTTPUnauthorized()
 
 
-def unique(request):
-    """Makes sure the url is not already listed.
-    """
-    url = request.body
-    try:
-        key = request.registry['storage'].get_short(url)
-    except KeyError:
-        pass
-    else:
-        request.errors.add('body', 'url', 'Already shortened at %s' % key)
-
-
-@short.put(validator=(auth, unique))
+@short.put(validator=auth)
 def new_url(request):
     """Adds a shortened URL."""
     url = request.body
 def get_url(request):
     """Adds a shortened URL."""
     key = request.matchdict['key']
-    try:
-        url = request.registry['storage'].get_redirect(key)
-    except KeyError:
+    url = get_storage(request).get_url(key)
+    if url is None:
         request.matchdict = None  # XXX o/wise cornice throws a 405
         raise exc.HTTPNotFound()
 
 def delete_url(request):
     """Removes a shortened URL."""
     key = request.matchdict['key']
-    try:
-        url = request.registry['storage'].get_redirect(key)
-    except KeyError:
+    if url is None:
+        url = get_storage(request).get_url(key)
+    else:
         raise exc.HTTPNotFound()
 
     return exc.HTTPRedirect(location=url)
     packages=find_packages(),
     include_package_data=True,
     zip_safe=False,
-    install_requires=['cornice', 'PasteScript', 'WebTest'],
+    install_requires=['cornice', 'PasteScript', 'WebTest', 'SQLAlchemy'],
     entry_points="""\
     [paste.app_factory]
     main = errli:main
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.