Commits

Benoît Allard committed b67632b

protocol: regroup bases and libusb into 'connection'

  • Participants
  • Parent commits 484b9a7

Comments (0)

Files changed (6)

python/antprotocol/bases.py

-from .libusb import ANTlibusb
-import usb
-import time
-
-def getBase(debug):
-    for base in [bc(debug=debug) for bc in BASES]:
-        for retries in (2,1,0):
-            try:
-                if base.open():
-                    print "Found %s base" % (base.NAME,)
-                    return base
-            except Exception, e: # We shouldn't except Exception
-                print e
-                if retries:
-                    print "retrying"
-                    time.sleep(5)
-                    continue
-                raise
-
-class DynastreamANT(ANTlibusb):
-    """Class that represents the Dynastream USB stick base, for
-    garmin/suunto equipment. Only needs to set VID/PID.
-
-    """
-    VID = 0x0fcf
-    PID = 0x1008
-    NAME = "Dynastream"
-
-class FitBitANT(ANTlibusb):
-    """Class that represents the fitbit base. Due to the extra
-    hardware to handle tracker connection and charging, has an extra
-    initialization sequence.
-
-    """
-
-    VID = 0x10c4
-    PID = 0x84c4
-    NAME = "FitBit"
-
-    def open(self, vid = None, pid = None):
-        if not super(FitBitANT, self).open(vid, pid):
-            return False
-        self.init()
-        return True
-    
-    def init(self):
-        # Device setup
-        # bmRequestType, bmRequest, wValue, wIndex, data
-        self._connection.ctrl_transfer(0x40, 0x00, 0xFFFF, 0x0, [])
-        self._connection.ctrl_transfer(0x40, 0x01, 0x2000, 0x0, [])
-        # At this point, we get a 4096 buffer, then start all over
-        # again? Apparently doesn't require an explicit receive
-        self._connection.ctrl_transfer(0x40, 0x00, 0x0, 0x0, [])
-        self._connection.ctrl_transfer(0x40, 0x00, 0xFFFF, 0x0, [])
-        self._connection.ctrl_transfer(0x40, 0x01, 0x2000, 0x0, [])
-        self._connection.ctrl_transfer(0x40, 0x01, 0x4A, 0x0, [])
-        # Receive 1 byte, should be 0x2
-        self._connection.ctrl_transfer(0xC0, 0xFF, 0x370B, 0x0, 1)
-        self._connection.ctrl_transfer(0x40, 0x03, 0x800, 0x0, [])
-        self._connection.ctrl_transfer(0x40, 0x13, 0x0, 0x0, \
-                                       [0x08, 0x00, 0x00, 0x00,
-                                        0x40, 0x00, 0x00, 0x00,
-                                        0x00, 0x00, 0x00, 0x00,
-                                        0x00, 0x00, 0x00, 0x00
-                                        ])
-        self._connection.ctrl_transfer(0x40, 0x12, 0x0C, 0x0, [])
-        try:
-            self._receive()
-        except usb.USBError:
-            pass
-
-BASES = [FitBitANT, DynastreamANT]

python/antprotocol/connection.py

+import usb
+
+class ANTConnection(object):
+    """ An abstract class that represents a connection """
+
+    def open(self):
+        """ Open the connection """
+        raise NotImplementedError()
+
+    def close(self):
+        """ Close the connection """
+        raise NotImplementedError()
+
+    def send(self, bytes):
+        """ Send some bytes away """
+        raise NotImplementedError()
+
+    def receive(self, amount):
+        """ Get some bytes """
+        raise NotImplementedError()
+
+class ANTUSBConnection(ANTConnection):
+    ep = {
+        'in'  : 0x81,
+        'out' : 0x01
+    }
+
+    def __init__(self):
+        self._connection = False
+        self.timeout = 1000
+
+    def open(self):
+        self._connection = usb.core.find(idVendor = self.VID,
+                                         idProduct = self.PID)
+        if self._connection is None:
+            return False
+
+        # For some reason, we have to set config, THEN reset,
+        # otherwise we segfault back in the ctypes (on linux, at
+        # least).
+        self._connection.set_configuration()
+        self._connection.reset()
+        # The we have to set our configuration again
+        self._connection.set_configuration()
+
+        # Then we should get back a reset check, with 0x80
+        # (SUSPEND_RESET) as our status
+        #
+        # I've commented this out because -- though it should just work
+        # it does seem to be causing some odd problems for me and does
+        # work with out it. Reed Wade - 31 Dec 2011
+        ##self._check_reset_response(0x80)
+        return True
+
+    def close(self):
+        if self._connection is not None:
+            self._connection = None
+
+    def send(self, command):
+        # libusb expects ordinals, it'll redo the conversion itself.
+        c = command
+        self._connection.write(self.ep['out'], map(ord, c), 0, 100)
+
+    def receive(self, amount):
+        return self._connection.read(self.ep['in'], amount, 0, self.timeout)
+
+class DynastreamANT(ANTUSBConnection):
+    """Class that represents the Dynastream USB stick base, for
+    garmin/suunto equipment. Only needs to set VID/PID.
+
+    """
+    VID = 0x0fcf
+    PID = 0x1008
+    NAME = "Dynastream"
+
+class FitBitANT(ANTUSBConnection):
+    """Class that represents the fitbit base. Due to the extra
+    hardware to handle tracker connection and charging, has an extra
+    initialization sequence.
+
+    """
+
+    VID = 0x10c4
+    PID = 0x84c4
+    NAME = "FitBit"
+
+    def open(self):
+        if not super(FitBitANT, self).open():
+            return False
+        self.init()
+        return True
+
+    def init(self):
+        # Device setup
+        # bmRequestType, bmRequest, wValue, wIndex, data
+        self._connection.ctrl_transfer(0x40, 0x00, 0xFFFF, 0x0, [])
+        self._connection.ctrl_transfer(0x40, 0x01, 0x2000, 0x0, [])
+        # At this point, we get a 4096 buffer, then start all over
+        # again? Apparently doesn't require an explicit receive
+        self._connection.ctrl_transfer(0x40, 0x00, 0x0, 0x0, [])
+        self._connection.ctrl_transfer(0x40, 0x00, 0xFFFF, 0x0, [])
+        self._connection.ctrl_transfer(0x40, 0x01, 0x2000, 0x0, [])
+        self._connection.ctrl_transfer(0x40, 0x01, 0x4A, 0x0, [])
+        # Receive 1 byte, should be 0x2
+        self._connection.ctrl_transfer(0xC0, 0xFF, 0x370B, 0x0, 1)
+        self._connection.ctrl_transfer(0x40, 0x03, 0x800, 0x0, [])
+        self._connection.ctrl_transfer(0x40, 0x13, 0x0, 0x0, \
+                                       [0x08, 0x00, 0x00, 0x00,
+                                        0x40, 0x00, 0x00, 0x00,
+                                        0x00, 0x00, 0x00, 0x00,
+                                        0x00, 0x00, 0x00, 0x00
+                                        ])
+        self._connection.ctrl_transfer(0x40, 0x12, 0x0C, 0x0, [])
+        try:
+            self.receive()
+        except usb.USBError:
+            pass
+
+CONNS = [FitBitANT, DynastreamANT]
+
+
+def getConn():
+    for conn in [bc() for bc in CONNS]:
+        if conn.open():
+            print "Found %s base" % (conn.NAME,)
+            return conn
+    print "Failed to find a base"
+    return None

python/antprotocol/libusb.py

-#################################################################
-# pyusb access for ant devices
-# By Kyle Machulis <kyle@nonpolynomial.com>
-# http://www.nonpolynomial.com
-#
-# Licensed under the BSD License, as follows
-#
-# Copyright (c) 2011, Kyle Machulis/Nonpolynomial Labs
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, 
-# with or without modification, are permitted provided 
-# that the following conditions are met:
-#
-#    * Redistributions of source code must retain the 
-#      above copyright notice, this list of conditions 
-#      and the following disclaimer.
-#    * Redistributions in binary form must reproduce the 
-#      above copyright notice, this list of conditions and 
-#      the following disclaimer in the documentation and/or 
-#      other materials provided with the distribution.
-#    * Neither the name of the Nonpolynomial Labs nor the names 
-#      of its contributors may be used to endorse or promote 
-#      products derived from this software without specific 
-#      prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND 
-# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, 
-# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 
-# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 
-# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 
-# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 
-# NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
-# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
-# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 
-# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 
-# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-#################################################################
-#
-
-from protocol import ANT
-import usb
-
-class ANTlibusb(ANT):
-    ep = { 'in'  : 0x81, \
-           'out' : 0x01
-           }
-
-    def __init__(self, chan=0x0, debug=False):
-        super(ANTlibusb, self).__init__(chan, debug)
-        self._connection = False
-        self.timeout = 1000
-
-    def open(self, vid = None, pid = None):
-        if vid is None:
-            vid = self.VID
-        if pid is None:
-            pid = self.PID
-        self._connection = usb.core.find(idVendor = vid,
-                                         idProduct = pid)
-        if self._connection is None:
-            return False
-
-        # For some reason, we have to set config, THEN reset,
-        # otherwise we segfault back in the ctypes (on linux, at
-        # least). 
-        self._connection.set_configuration()
-        self._connection.reset()
-        # The we have to set our configuration again
-        self._connection.set_configuration()
-
-        # Then we should get back a reset check, with 0x80
-        # (SUSPEND_RESET) as our status
-        #
-        # I've commented this out because -- though it should just work
-        # it does seem to be causing some odd problems for me and does
-        # work with out it. Reed Wade - 31 Dec 2011
-        ##self._check_reset_response(0x80)
-        return True
-
-    def close(self):
-        if self._connection is not None:
-            self._connection = None
-
-    def _send(self, command):
-        # libusb expects ordinals, it'll redo the conversion itself.
-        c = command
-        self._connection.write(self.ep['out'], map(ord, c), 0, 100)
-
-    def _receive(self, size=4096):
-        return self._connection.read(self.ep['in'], size, 0, self.timeout)

python/antprotocol/protocol.py

 
 class ANT(object):
 
-    def __init__(self, chan=0x00, debug=False):
+    def __init__(self, connection, chan=0x00, debug=False):
+        self.connection = connection
         self._debug = debug
         self._chan = chan
 
         msg = MessageOUT(msgid, *args)
         if self._debug:
             print '  '*self._loglevel, msg
-        return self._send(msg.toBytes())
+        return self.connection.send(msg.toBytes())
 
     def _find_sync(self, buf, start=0):
         i = 0;
                 # data[] too small, try to read some more
                 from usb.core import USBError
                 try:
-                    data += self._receive(size).tolist()
+                    data += self.connection.receive(size).tolist()
                     timeouts = 0
                 except USBError:
                     timeouts = timeouts+1
                 print '  '*self._loglevel, msg
             return msg
 
-    def _receive(self, size=4096):
-        raise NotImplementedError("Need to define _receive function for ANT child class!")
-
-    def _send(self):
-        raise NotImplementedError("Need to define _send function for ANT child class!")
-

python/fitbit_client.py

 import argparse
 import xml.etree.ElementTree as et
 from fitbit import FitBit
-from antprotocol.bases import getBase
-from antprotocol.protocol import ANTException, FitBitBeaconTimeout
+from antprotocol.connection import getConn
+from antprotocol.protocol import ANT, ANTException, FitBitBeaconTimeout
 
 class FitBitRequest(object):
 
         self.log_info = {}
         self.time = time.time()
         self.data = []
-        base = getBase(debug)
-        if base is None:
+        conn = getConn()
+        if conn is None:
             print "No base found!"
             exit(1)
+        base = ANT(conn)
         self.fitbit = FitBit(base)
         if not self.fitbit:
             print "No devices connected!"
         self.dump_connection()
         print 'Closing USB device'
         try:
-            self.fitbit.base.close()
+            self.fitbit.base.connection.close()
         except AttributeError:
             pass
         self.fitbit.base = None

python/ifitbit.py

     for cmd in sorted(helps.keys()):
         print '%s\t%s' % (cmd, helps[cmd])
 
-from antprotocol.bases import getBase
+from antprotocol.connection import getConn
+from antprotocol.protocol import ANT
 from fitbit import FitBit
 import time
 
     if len(args) >= 1:
         debug = bool(int(args[0]))
         if debug: print "Debug ON"
-    base = getBase(debug)
-    if base is None:
+    conn = getConn()
+    if conn is None:
         print "No device connected."
         return
+    base = ANT(conn)
     tracker = FitBit(base)
     tracker.init_tracker_for_transfer()
 
     global base, tracker
     if base is not None:
         print "Closing connection"
-        base.close()
+        base.connection.close()
     base = None
     tracker = None
 
 def test():
     global base
     if base is None:
-        base = getBase(True)
-        if base is None:
+        conn = getConn()
+        if conn is None:
             print "No devices connected!"
             return 1
-
+        base = ANT(conn)
     device = FitBit(base)
 
     device.init_tracker_for_transfer()
 
     # for i in range(0, len(d), 14):
     #     print ["%02x" % x for x in d[i:i+14]]
-    base.close()
+    base.connection.close()
     base = None
 
 @command('>', 'Run opcode')