Jeffrey Goettsch avatar Jeffrey Goettsch committed aefb9d2

Added pushnotify/prowl.py. Can now verify API keys for and send notifications through the Prowl notification system.

Comments (0)

Files changed (5)

 syntax: regexp
 (^|/)\..*
 ^tests/nmakeys.py
+^tests/prowlkeys.py
 ^tests/pushoverkeys.py
 
 syntax: glob

pushnotify/exceptions.py

     pass
 
 
+class PermissionDenied(PushNotifyError):
+    """Raised when a request had not been approved.
+
+    Args:
+        args[0]: A string containing a message from the server.
+        args[1]: An integer containing an error code from the server.
+
+    """
+
+    pass
+
+
 class RateLimitExceeded(PushNotifyError):
     """Raised when too many requests are submitted in too small a time
     frame.

pushnotify/prowl.py

+#!/usr/bin/env python
+# vim: set fileencoding=utf-8
+
+"""Module for sending push notifications to iOS devices that have
+Prowl installed. See http://www.prowlapp.com/ for more
+information.
+
+copyright: Copyright (c) Jeffrey Goettsch and other contributors.
+license: BSD, see LICENSE for details.
+
+"""
+
+
+import urllib
+import urllib2
+try:
+    from xml.etree import cElementTree
+    ElementTree = cElementTree
+except ImportError:
+    from xml.etree import ElementTree
+
+from pushnotify import exceptions
+
+
+PUBLIC_API_URL = u'https://api.prowlapp.com/publicapi'
+VERIFY_URL = u'/'.join([PUBLIC_API_URL, 'verify'])
+NOTIFY_URL = u'/'.join([PUBLIC_API_URL, 'add'])
+
+
+class Client(object):
+    """Client for sending push notificiations to iOS devices with
+    the Prowl application installed.
+
+    Member Vars:
+        apikeys: A list of strings, each containing a 40 character api
+            key.
+        providerkey: A string containing a 40 character provider key.
+
+    """
+
+    def __init__(self, apikeys=None, providerkey=None):
+        """Initialize the Prowl client.
+
+        Args:
+            apikeys:  A list of strings of 40 characters each, each
+                containing a valid api key.
+            providerkey: A string of 40 characters containing a valid
+                provider key.
+
+        """
+
+        self._browser = urllib2.build_opener(urllib2.HTTPSHandler())
+        self._last_type = None
+        self._last_code = None
+        self._last_message = None
+        self._last_remaining = None
+        self._last_resetdate = None
+
+        self.apikeys = [] if apikeys is None else apikeys
+        self.providerkey = providerkey
+
+    def _get(self, url):
+
+        request = urllib2.Request(url)
+        try:
+            response_stream = self._browser.open(request)
+        except urllib2.HTTPError, exc:
+            return exc
+        else:
+            return response_stream
+
+    def _parse_response(self, response, verify=False):
+        xmlresp = response.read()
+        print xmlresp
+        root = ElementTree.fromstring(xmlresp)
+
+        self._last_type = root[0].tag.lower()
+        self._last_code = root[0].attrib['code']
+
+        if self._last_type == 'success':
+            self._last_message = None
+            self._last_remaining = root[0].attrib['remaining']
+            self._last_resetdate = root[0].attrib['resetdate']
+        elif self._last_type == 'error':
+            self._last_message = root[0].text
+            self._last_remaining = None
+            self._last_resetdate = None
+
+            if (not verify or
+                    (self._last_code != '400' and self._last_code != '401')):
+                self._raise_exception()
+        else:
+            raise exceptions.UnrecognizedResponseError(xmlresp, -1)
+
+        return root
+
+    def _post(self, url, data):
+
+        request = urllib2.Request(url, data)
+        try:
+            response_stream = self._browser.open(request)
+        except urllib2.HTTPError, exc:
+            return exc
+        else:
+            return response_stream
+
+    def _raise_exception(self):
+
+        if self._last_code == '400':
+            raise exceptions.FormatError(self._last_message,
+                                         int(self._last_code))
+        elif self._last_code == '401':
+            raise exceptions.ApiKeyError(self._last_message,
+                                         int(self._last_code))
+        elif self._last_code == '406':
+            raise exceptions.RateLimitExceeded(self._last_message,
+                                               int(self._last_code))
+        elif self._last_code == '409':
+            raise exceptions.PermissionDenied(self._last_message,
+                                              int(self._last_code))
+        elif self._last_code == '500':
+            raise exceptions.ServerError(self._last_message,
+                                         int(self._last_code))
+        else:
+            raise exceptions.UnknownError(self._last_message,
+                                          int(self._last_code))
+
+    def notify(self, app, event, desc, kwargs=None):
+        """Send a notification to each apikey in self.apikeys.
+
+        Args:
+            app: A string of up to 256 characters containing the name
+                of the application sending the notification.
+            event: A string of up to 1024 characters containing the
+                event that is being notified (i.e. subject or brief
+                description.)
+            desc: A string of up to 10000 characters containing the
+                notification text.
+            kwargs: A dictionary with any of the following strings as
+                    keys:
+                priority: An integer between -2 and 2, indicating the
+                    priority of the notification. -2 is the lowest, 2 is
+                    the highest, and 0 is normal.
+                url: A string of up to 512 characters containing a URL
+                    to attach to the notification.
+                (default: None)
+
+        Raises:
+            pushnotify.exceptions.FormatError
+            pushnotify.exceptions.ApiKeyError
+            pushnotify.exceptions.RateLimitExceeded
+            pushnotify.exceptions.ServerError
+            pushnotify.exceptions.UnknownError
+            pushnotify.exceptions.UnrecognizedResponseError
+
+        """
+
+        data = {'apikey': ','.join(self.apikeys),
+                'application': app,
+                'event': event,
+                'description': desc}
+
+        if self.providerkey:
+            data['providerkey'] = self.providerkey
+
+        if kwargs:
+            data.update(kwargs)
+
+        data = urllib.urlencode(data)
+
+        response = self._post(NOTIFY_URL, data)
+        self._parse_response(response)
+
+    def verify_user(self, apikey):
+        """Verify an API key for a user.
+
+        Args:
+            apikey: A string of 40 characters containing an API key.
+
+        Raises:
+            pushnotify.exceptions.RateLimitExceeded
+            pushnotify.exceptions.ServerError
+            pushnotify.exceptions.UnknownError
+            pushnotify.exceptions.UnrecognizedResponseError
+
+        Returns:
+            A boolean containing True if the API key is valid, and False
+            if it is not.
+
+        """
+
+        data = {'apikey': apikey}
+
+        if self.providerkey:
+            data['providerkey'] = self.providerkey
+
+        querystring = urllib.urlencode(data)
+        url = '?'.join([VERIFY_URL, querystring])
+
+        response = self._get(url)
+        self._parse_response(response, True)
+
+        return self._last_code == '200'
+
+if __name__ == '__main__':
+    pass
                  'and iOS devices.'),
     long_description=('A package for sending push notifications to '
                       'Android and iOS devices. It requires Notify My '
-                      'Android or Pushover be installed on each device.'),
+                      'Android, Prowl, or Pushover to be installed on '
+                      'each device.'),
     download_url=('https://bitbucket.org/jgoettsch/py-pushnotify/get/'
                   '{0}.tar.gz').format(version),
     classifiers=[
 
 from pushnotify import exceptions
 from pushnotify import nma
+from pushnotify import prowl
 from pushnotify import pushover
 
 try:
     imp.find_module('nmakeys', [os.path.dirname(__file__)])
 except ImportError:
-    NMA_API_KEYS = {}
+    NMA_API_KEYS = []
     NMA_DEVELOPER_KEY = ''
 else:
     from nmakeys import API_KEYS as NMA_API_KEYS
     from nmakeys import DEVELOPER_KEY as NMA_DEVELOPER_KEY
-
+try:
+    imp.find_module('prowlkeys', [os.path.dirname(__file__)])
+except ImportError:
+    PROWL_API_KEYS = []
+    PROWL_PROVIDER_KEY = ''
+else:
+    from prowlkeys import API_KEYS as PROWL_API_KEYS
+    from prowlkeys import PROVIDER_KEY as PROWL_PROVIDER_KEY
 try:
     imp.find_module('pushoverkeys', [os.path.dirname(__file__)])
 except ImportError:
         self.assertFalse(self.client.verify(apikey))
 
 
+class ProwlTest(unittest.TestCase):
+
+    def setUp(self):
+
+        self.client = prowl.Client(PROWL_API_KEYS, PROWL_PROVIDER_KEY)
+
+        self.app = 'pushnotify unit tests'
+        self.event = 'unit test: test_notify'
+        self.desc = 'valid notification test for pushnotify'
+
+    def test_notify_valid(self):
+        """Test notify with a valid notification.
+
+        """
+
+        self.client.notify(self.app, self.event, self.desc,
+                           kwargs={'priority': 0, 'url': 'http://google.com/'})
+
+    def test_notify_invalid(self):
+        """Test notify with invalid notifications.
+
+        """
+
+        """invalid API key"""
+
+        char = self.client.apikeys[0][0]
+        apikey = self.client.apikeys[0].replace(char, '_')
+        self.client.apikeys = [apikey, ]
+        self.client.developerkey = ''
+
+        self.assertRaises(exceptions.ApiKeyError,
+                          self.client.notify, self.app, self.event, self.desc)
+
+        self.client.apikeys = NMA_API_KEYS
+        self.client.developerkey = NMA_DEVELOPER_KEY
+
+        """invalid argument lengths"""
+
+        bad_app = 'a' * 257
+        self.assertRaises(exceptions.FormatError,
+                          self.client.notify, bad_app, self.event, self.desc)
+
+    def test_verify_user_valid(self):
+        """Test verify_user with a valid API key.
+
+        """
+
+        self.assertTrue(self.client.verify_user(self.client.apikeys[0]))
+
+    def test_verify_user_invalid(self):
+        """Test verify_user with invalid API keys.
+
+        """
+
+        """invalid API key of incorrect length"""
+
+        apikey = u'{0}{1}'.format(self.client.apikeys[0], '1')
+
+        self.assertFalse(self.client.verify_user(apikey))
+
+        """invalid API key of correct length"""
+
+        char = self.client.apikeys[0][0]
+        apikey = self.client.apikeys[0].replace(char, '_')
+
+        self.assertFalse(self.client.verify_user(apikey))
+
+
 class PushoverTest(unittest.TestCase):
 
     def setUp(self):
         self.message = 'valid notification test for pushnotify'
 
     def test_notify_valid(self):
+        """Test notify with a valid notification.
 
-        """valid notification"""
+        """
 
         self.client.notify(self.title, self.message,
                            kwargs={'priority': 1, 'url': 'http://google.com/',
                                    'url_title': 'Google'})
 
     def test_notify_invalid_token(self):
+        """Test notify with an invalid token.
 
-        """invalid token"""
+        """
 
         char = self.client.token[0]
         bad_token = self.client.token.replace(char, '_')
                           self.title, self.message)
 
     def test_notify_invalid_user(self):
+        """Test notify with an invalid user.
 
-        """invalid user"""
+        """
 
         char = self.client.users[0][0][0]
         bad_users = (self.client.users[0][0].replace(char, '_'),
                           self.title, self.message)
 
     def test_notify_invalid_device(self):
+        """Test notify with an invalid device.
 
-        """invalid device"""
+        """
 
         char = self.client.users[0][1][0]
         bad_users = (PUSHOVER_USER, self.client.users[0][1].replace(char, '_'))
                           self.title, self.message)
 
     def test_notify_invalid_args(self):
+        """Test notify with invalid argument lengths.
 
-        """invalid argument lengths"""
+        """
 
         msg = 'a' * 513
 
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.