Jeffrey Goettsch avatar Jeffrey Goettsch committed 65a18e5 Merge

0.2

Comments (0)

Files changed (5)

 syntax: regexp
 (^|/)\..*
 ^tests/nmakeys.py
+^tests/pushoverkeys.py
 
 syntax: glob
 *.egg-info

pushnotify/nma.py

 
         return root
 
+    def _post(self, url, data):
+
+        request = urllib2.Request(url, data)
+        response_stream = self._browser.open(request)
+        response = response_stream.read()
+
+        return response
+
     def _raise_exception(self):
 
         if self._last_code == '400':
             raise exceptions.UnknownError(self._last_message,
                                           int(self._last_code))
 
-    def _post(self, url, data):
-
-        request = urllib2.Request(url, data)
-        response_stream = self._browser.open(request)
-        response = response_stream.read()
-
-        return response
-
     def notify(self, app, event, desc, kwargs=None):
         """Send a notification to each apikey in self.apikeys.
 

pushnotify/pushover.py

+#!/usr/bin/env python
+# vim: set fileencoding=utf-8
+
+"""Module for sending push notificiations to Android and iOS devices
+that have Pushover installed. See https://pushover.net/ for more
+information.
+
+copyright: Copyright (c) Jeffrey Goettsch and other contributors.
+license: BSD, see LICENSE for details.
+
+"""
+
+
+import json
+import time
+import urllib
+import urllib2
+
+from pushnotify import exceptions
+
+
+PUBLIC_API_URL = u'https://api.pushover.net/1'
+VERIFY_URL = u'/'.join([PUBLIC_API_URL, u'users/validate.json'])
+NOTIFY_URL = u'/'.join([PUBLIC_API_URL, u'messages.json'])
+
+
+class Client(object):
+    """Client for sending push notifications to Android and iOS devices
+    with the Pushover application installed.
+
+    Member Vars:
+        token: A string containing a valid API token.
+
+    """
+
+    def __init__(self, token, users=None):
+        """Initialize the Pushover client.
+
+        Args:
+            token: A string of 30 characters containing a valid API
+                token.
+            users: A list containing 1 or 2-item tuples, where the first
+                item is a string of 30 characters containing a user
+                token, and the second is an optional string of up to 25
+                characters containing a device name for the given user.
+                (default: None)
+
+        """
+
+        self._browser = urllib2.build_opener(urllib2.HTTPSHandler())
+        self._last_code = None
+        self._last_device = None
+        self._last_errors = None
+        self._last_status = None
+        self._last_token = None
+        self._last_user = None
+
+        self.token = token
+        self.users = [] if users is None else users
+
+    def _parse_response(self, stream, verify=False):
+
+        response = json.loads(stream.read())
+
+        self._last_code = stream.code
+        if 'device' in response.keys():
+            self._last_device = response['device']
+        else:
+            self._last_device = None
+        if 'errors' in response.keys():
+            self._last_errors = response['errors']
+        else:
+            self._last_errors = None
+        if 'status' in response.keys():
+            self._last_status = response['status']
+        else:
+            self._last_status = None
+        if 'token' in response.keys():
+            self._last_token = response['token']
+        if 'user' in response.keys():
+            self._last_user = response['user']
+        else:
+            self._last_user = None
+
+        return self._last_status
+
+    def _post(self, url, data):
+
+        request = urllib2.Request(url, data)
+        try:
+            response_stream = self._browser.open(request)
+        except urllib2.HTTPError, exc:
+            return exc
+        else:
+            return response_stream
+
+    def _raise_exception(self):
+
+        msg = ''
+        if self._last_errors:
+            messages = []
+            for key, value in self._last_errors.items():
+                messages.append('{0} {1}'.format(key, value[0]))
+            msg = '; '.join(messages)
+
+        if self._last_device and 'invalid' in self._last_device:
+            raise exceptions.ApiKeyError('device invalid', self._last_code)
+
+        elif self._last_token and 'invalid' in self._last_token:
+            raise exceptions.ApiKeyError('token invalid', self._last_code)
+
+        elif self._last_user and 'invalid' in self._last_user:
+            raise exceptions.ApiKeyError('user invalid', self._last_code)
+
+        elif self._last_code == 429:
+            # TODO: what is actually returned when the rate limit is hit?
+
+            msg = 'too many messages sent this month' if not msg else msg
+            raise exceptions.RateLimitExceeded(msg, self._last_code)
+
+        elif self._last_code >= 500 and self._last_code <= 599:
+            raise exceptions.ServerError(msg, self._last_code)
+
+        elif self._last_errors:
+            raise exceptions.FormatError(msg, self._last_code)
+
+        else:
+            raise exceptions.UnrecognizedResponseError(msg, self._last_code)
+
+    def notify(self, title, message, kwargs=None):
+        """Send a notification to each user/device in self.users.
+
+        Args:
+            title: A string of up to 100 characters containing the
+                title of the message (i.e. subject or brief description)
+            message: A string of up to 512 characters containing the
+                notification text.
+            kwargs: A dictionary with any of the following strings as
+                    keys:
+                priority: The integer 1, which will make the
+                    notification display in red and override any set
+                    quiet hours.
+                url: A string of up to 500 characters containing a URL
+                    to attach to the notification.
+                url_title: A string of up to 50 characters containing a
+                    title to give the attached URL.
+                (default: None)
+
+        Raises:
+            pushnotify.exceptions.ApiKeyError
+            pushnotify.exceptions.FormatError
+            pushnotify.exceptions.RateLimitExceeded
+            pushnotify.exceptions.ServerError
+            pushnotify.exceptions.UnrecognizedResponseError
+
+        """
+
+        # TODO: what to do if no users set?
+
+        """Here we match the behavior of Notify My Android and Prowl:
+        raise a single exception if and only if every notification
+        fails"""
+
+        raise_exception = False
+        for user in self.users:
+            data = {'token': self.token,
+                    'user': user[0],
+                    'title': title,
+                    'message': message,
+                    'timestamp': int(time.time())}
+
+            if user[1]:
+                data['device'] = user[1]
+
+            if kwargs:
+                data.update(kwargs)
+
+            data = urllib.urlencode(data)
+
+            response = self._post(NOTIFY_URL, data)
+            status = self._parse_response(response)
+            if not status:
+                raise_exception = not status
+
+        if raise_exception:
+            self._raise_exception()
+
+    def verify_user(self, user):
+        """Verify a user token.
+
+        Args:
+            user: A string containing a valid user token.
+
+        Returns:
+            A boolean containing True if the user token is valid, and
+            False if it is not.
+
+        """
+
+        data = {'token': self.token, 'user': user}
+
+        data = urllib.urlencode(data)
+        response_stream = self._post(VERIFY_URL, data)
+
+        self._parse_response(response_stream, True)
+
+        return self._last_status
+
+    def verify_device(self, user, device):
+        """Verify a device for a user.
+
+        Args:
+            user: A string containing a valid user token.
+            device: A string containing a device name.
+
+        Raises:
+            pushnotify.exceptions.ApiKeyError
+
+        Returns:
+            A boolean containing True if the device is valid, and
+            False if it is not.
+
+        """
+
+        data = {'token': self.token, 'user': user, 'device': device}
+
+        data = urllib.urlencode(data)
+        response_stream = self._post(VERIFY_URL, data)
+
+        self._parse_response(response_stream, True)
+
+        if self._last_user and 'invalid' in self._last_user.lower():
+            self._raise_exception()
+
+        return self._last_status
+
+
+if __name__ == '__main__':
+    pass
 from distutils.core import setup
 
 
-version = '0.1'
+version = '0.2'
 
 
 setup(
 
 from pushnotify import exceptions
 from pushnotify import nma
+from pushnotify import pushover
+
 try:
     imp.find_module('nmakeys', [os.path.dirname(__file__)])
 except ImportError:
-    API_KEYS = {}
-    DEVELOEPER_KEY = ''
+    NMA_API_KEYS = {}
+    NMA_DEVELOPER_KEY = ''
 else:
-    from nmakeys import API_KEYS
-    from nmakeys import DEVELOPER_KEY
+    from nmakeys import API_KEYS as NMA_API_KEYS
+    from nmakeys import DEVELOPER_KEY as NMA_DEVELOPER_KEY
+
+try:
+    imp.find_module('pushoverkeys', [os.path.dirname(__file__)])
+except ImportError:
+    PUSHOVER_TOKEN = ''
+    PUSHOVER_USER = ''
+    PUSHOVER_DEVICE = ''
+else:
+    from pushoverkeys import TOKEN as PUSHOVER_TOKEN
+    from pushoverkeys import USER as PUSHOVER_USER
+    from pushoverkeys import DEVICE as PUSHOVER_DEVICE
 
 
 class NMATest(unittest.TestCase):
 
     def setUp(self):
 
-        self.client = nma.Client(API_KEYS, DEVELOPER_KEY)
+        self.client = nma.Client(NMA_API_KEYS, NMA_DEVELOPER_KEY)
 
-    def test_notify(self):
+        self.app = 'pushnotify unit tests'
+        self.event = 'unit test: test_notify'
+        self.desc = 'valid notification test for pushnotify'
+
+    def test_notify_valid(self):
+        """Test notify with valid notifications.
+
+        """
 
         """valid notification"""
 
-        app = 'pushnotify unit tests'
-        event = 'unit test: test_notify'
-        desc = 'valid notification test for pushnotify'
-
-        self.client.notify(app, event, desc)
+        self.client.notify(self.app, self.event, self.desc)
 
         """valid notification, extra arguments, html"""
 
-        html_desc = '<h1>{0}</h1><p>{1}<br>{2}</p>'.format(app, event, desc)
+        html_desc = '<h1>{0}</h1><p>{1}<br>{2}</p>'.format(
+            self.app, self.event, self.desc)
         priority = 0
         url = nma.NOTIFY_URL
 
-        self.client.notify(app, event, html_desc,
+        self.client.notify(self.app, self.event, html_desc,
                            kwargs={'priority': priority, 'url': url,
                                    'content-type': 'text/html'})
 
+    def test_notify_invalid(self):
+        """Test notify with invalid notifications.
+
+        """
+
         """invalid API key"""
 
         char = self.client.apikeys[0][0]
         self.client.developerkey = ''
 
         self.assertRaises(exceptions.ApiKeyError,
-                          self.client.notify, app, event, desc)
+                          self.client.notify, self.app, self.event, self.desc)
 
-        self.client.apikeys = API_KEYS
-        self.client.developerkey = DEVELOPER_KEY
+        self.client.apikeys = NMA_API_KEYS
+        self.client.developerkey = NMA_DEVELOPER_KEY
 
-        """invalid argument length"""
+        """invalid argument lengths"""
 
         bad_app = 'a' * 257
+        self.assertRaises(exceptions.FormatError,
+                          self.client.notify, bad_app, self.event, self.desc)
 
-        self.assertRaises(exceptions.FormatError,
-                          self.client.notify, bad_app, event, desc)
+    def test_verify_valid(self):
+        """Test verify with a valid API key.
 
-        bad_event = 'e' * 1001
-
-        self.assertRaises(exceptions.FormatError,
-                          self.client.notify, app, bad_event, desc)
-
-        bad_desc = 'd' * 10001
-
-        self.assertRaises(exceptions.FormatError,
-                          self.client.notify, app, event, bad_desc)
-
-    def test_verify(self):
-
-        """valid API key"""
+        """
 
         self.assertTrue(self.client.verify(self.client.apikeys[0]))
 
+    def test_verify_invalid(self):
+        """Test verify with invalid API keys.
+
+        """
+
         """invalid API key of incorrect length"""
 
         apikey = u'{0}{1}'.format(self.client.apikeys[0], '1')
         self.assertFalse(self.client.verify(apikey))
 
 
+class PushoverTest(unittest.TestCase):
+
+    def setUp(self):
+
+        self.client = pushover.Client(PUSHOVER_TOKEN,
+                                      [(PUSHOVER_USER, PUSHOVER_DEVICE)])
+
+        self.title = 'pushnotify unit tests'
+        self.message = 'valid notification test for pushnotify'
+
+    def test_notify_valid(self):
+
+        """valid notification"""
+
+        self.client.notify(self.title, self.message,
+                           kwargs={'priority': 1, 'url': 'http://google.com/',
+                                   'url_title': 'Google'})
+
+    def test_notify_invalid_token(self):
+
+        """invalid token"""
+
+        char = self.client.token[0]
+        bad_token = self.client.token.replace(char, '_')
+        self.client.token = bad_token
+
+        self.assertRaises(exceptions.ApiKeyError, self.client.notify,
+                          self.title, self.message)
+
+    def test_notify_invalid_user(self):
+
+        """invalid user"""
+
+        char = self.client.users[0][0][0]
+        bad_users = (self.client.users[0][0].replace(char, '_'),
+                     PUSHOVER_DEVICE)
+        self.client.users = bad_users
+
+        self.assertRaises(exceptions.ApiKeyError, self.client.notify,
+                          self.title, self.message)
+
+    def test_notify_invalid_device(self):
+
+        """invalid device"""
+
+        char = self.client.users[0][1][0]
+        bad_users = (PUSHOVER_USER, self.client.users[0][1].replace(char, '_'))
+        self.client.users = bad_users
+
+        self.assertRaises(exceptions.ApiKeyError, self.client.notify,
+                          self.title, self.message)
+
+    def test_notify_invalid_args(self):
+
+        """invalid argument lengths"""
+
+        msg = 'a' * 513
+
+        self.assertRaises(exceptions.FormatError, self.client.notify,
+                          self.title, msg)
+
+    def test_verify_user_valid(self):
+        """Test veriy_user with a valid user token.
+
+        """
+
+        self.assertTrue(self.client.verify_user(PUSHOVER_USER))
+
+    def test_verify_user_invalid(self):
+        """Test verify_user with an invalid user token.
+
+        """
+
+        self.assertFalse(self.client.verify_user('foo'))
+
+    def test_verify_device_valid(self):
+        """Test verify_device with a valid device string.
+
+        """
+
+        self.assertTrue(self.client.verify_device(PUSHOVER_USER,
+                                                  PUSHOVER_DEVICE))
+
+    def test_verify_device_invalid(self):
+        """Test verify_device with an invalid device string.
+
+        """
+
+        self.assertFalse(self.client.verify_device(PUSHOVER_USER, 'foo'))
+
+    def test_verify_device_invalid_user(self):
+        """Test verify_device with an invalid user token.
+
+        """
+
+        self.assertRaises(exceptions.ApiKeyError, self.client.verify_device,
+                          'foo', PUSHOVER_DEVICE)
+
+
 if __name__ == '__main__':
     pass
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.