PyTMEX / tmex / session.py

# -*- coding: utf-8 -*-
# Copyright (c) 2011 Erik Svensson <erik.public@gmail.com>
# Licensed under the MIT license.

import time
import ctypes
import binascii

from collections import namedtuple
from datetime import datetime
from .tmex import *

_DEVINFO = namedtuple('DevInfo', 'kind, model, mint, maxt, info')
_DEVICE = namedtuple('Device', 'rom, devinfo')
_T = namedtuple('T', 'device, amount, stamp, humidity')

DEVICEINFO = {
    0x01: _DEVINFO(0x01, 'DS1990A', None, None, 'Serial Number iButton'),
    0x10: _DEVINFO(0x10, 'DS18S2', None, None, 'High-Precision 1-Wire Digital Thermometer'),
    0x26: _DEVINFO(0x26, 'DS2438', None, None, 'Smart Battery Monitor'),
    0x28: _DEVINFO(0x28, 'DS18B2', -55, 125, 'Programmable Resolution 1-Wire Digital Thermometer'),
    0x81: _DEVINFO(0x81, 'DS1420', None, None, 'Serial ID Button'),
    None: _DEVINFO(None, '???', None, None, 'Unknown device')
}

ROM_CMD_TCONV          = 0x44
ROM_CMD_MATCH          = 0x55
ROM_CMD_READPOWER      = 0xB4
ROM_CMD_RECALL         = 0xB8
ROM_CMD_READSCRATCHPAD = 0xBE
ROM_CMD_SKIPROM        = 0xCC

DELAY_GET_TEMP = 0.750

class Session(object):
    """
    Session is a class that encapsulates a 1-Wire session.
    """

    def __init__(self, port=0, **opts):
        """
        Initializes a 1-Wire session.

        port:
            A optional port number that will be used to start the session.

        filter:
            A list of integers or strings where integers are the device family code and strings is the device family
            name.
        """

        self._handle = 0
        self._devices = {}
        self._context = ctypes.create_string_buffer('\000' * 15360)

        self.familyFilter = set(opts.get('filter', filter(lambda s: s != None, DEVICEINFO)))

        self.initialize(port)

        if opts.get('enumrate', False):
            self.enumrate()

    def __del__(self):
        self.closeSession()

    def __len__(self):
        return len(self._devices)

    def initialize(self, port):
        """
        Initializes a 1-Wire session.

        port:
            A optional port number that will be used to start the session.
        """

        portNumber = ctypes.c_short(port)
        portType = ctypes.c_short(0)

        TMReadDefaultPort(portNumber, portType)
        self._handle = TMExtendedStartSession(portNumber, portType, None)
        result = TMSetup(self._handle)

        if result != 1:
            TMEndSession(self._handle)

            if result in TMSetupMessages:
                raise TMEXException(TMSetupMessages[result])
            else:
                raise TMEXException('Unknown setup error, %r' % result)

    def valid(self):
        """
        Check if the 1-Wire session is valid.
        """

        return False if self._handle == 0 else TMValidSession(self._handle) == 1

    def closeSession(self):
        if self._handle != 0:
            TMEndSession(self._handle)

    def enumrate(self):
        """
        Enumerates the devices on the 1-Wire bus.
        """

        self._devices = {}

        if self._handle == 0:
            raise TMEXException('Bus not initialized')

        if not self.valid():
            raise TMEXException('Bus not valid')

        result = TMFirst(self._handle, self._context)

        while result != 0:
            rom = (ctypes.c_short * 8)()
            result = TMRom(self._handle, self._context, rom)

            if result == 1:
                devId = ''.join([ '%02X' % x for x in rom ])
                romBytes = [ x for x in rom ]

                rb = (ctypes.c_ubyte * 8)(*romBytes)
                result = TMCRC(8, rb, 0, 0)

                if result == 0 and int(rom[0]) in DEVICEINFO:
                    devinfo = DEVICEINFO[int(rom[0])]

                    if devinfo.kind in self.familyFilter:
                        self._devices[devId] = _DEVICE(devId, devinfo)
                result = TMNext(self._handle, self._context)
        return self._devices

    def enumrate2(self, *args, **kwargs):
        """
        The generator of devices on the 1-Wire bus
        """

        if self._handle == 0:
            raise TMEXException('Bus not initialized')

        if not self.valid():
            raise TMEXException('Bus not valid')

        result = TMFirst(self._handle, self._context)

        while result != 0:
            rom = (ctypes.c_short * 8)()
            result = TMRom(self._handle, self._context, rom)

            if result == 1:
                devId = ''.join([ '%02X' % x for x in rom ])
                romBytes = [ x for x in rom ]

                rb = (ctypes.c_ubyte * 8)(*romBytes)
                result = TMCRC(8, rb, 0, 0)

                if result == 0 and int(rom[0]) in DEVICEINFO:
                    devinfo = DEVICEINFO[int(rom[0])]

                    if devinfo.kind in self.familyFilter:
                        ( yield _DEVICE(devId, devinfo) )

                result = TMNext(self._handle, self._context)

    def readDevice(self, device, enableWireLeveling=True):
        """
        Reads the value from a device on the 1-Wire bus.

        deviceId:
            The device id of the device to read. Must have been enumerated.

        enableWireLeveling:
            Enables the reader to use wire leveling to read from certain devices.
        """

        if not isinstance(device, _DEVICE):
            raise ValueError()

        if device.rom not in self._devices:
            raise ValueError()

        try:
            func = getattr(self, '_read_%s' % device.devinfo.model)
        except AttributeError:
            return None
        else:
            return func(device, enableWireLeveling)

    def _addressDevice(self, device):
        """
        Addresses a device on the 1-Wire bus directly.

        device:
            The device of the device to read. Must have been enumerated.
        """

        if device.rom not in self._devices:
            raise ValueError()

        TMTouchReset(self._handle)
        TMTouchByte(self._handle, ROM_CMD_MATCH)

        for b in ( ord(x) for x in binascii.unhexlify(device.rom) ):
            TMTouchByte(self._handle, b)

        return 1

    def readDevices(self, devices=None):
        """
        Reads the value from a list of devices from the 1-Wire bus.

        devices:
            A list of device/deviceId of the devices to read. Must have been enumerated.
        """

        if not devices:
            devices = self._devices.values()

        if len(devices) < 1:
            return []

        TMTouchReset(self._handle)
        TMOneWireLevel(self._handle, 0, 1, 2)

        TMTouchByte(self._handle, ROM_CMD_SKIPROM)
        TMTouchByte(self._handle, ROM_CMD_TCONV)

        time.sleep(DELAY_GET_TEMP)
        data = TMTouchByte(self._handle, 0xFF)

        while data == 0:
            data = TMTouchByte(self._handle, 0xFF)

        result = []
        TMOneWireLevel(self._handle, 0, 0, 0)

        for device in devices:
            if isinstance(device, str):
                device = self._devices[device]
            result.append(self.readDevice(device, enableWireLeveling=False))
        return result

    def _read_DS18B2(self, device, enableWireLeveling=False):
        """
        Reads the value from a DS18B2 device on the 1-Wire bus.

        device:
            The device of the device to read. Must have been enumerated.

        enableWireLeveling:
            Enables the reader to use wire leveling to read from certain devices.
        """

        result = self._addressDevice(device)
        temp = None
        if result == 1:
            if enableWireLeveling:
                TMOneWireLevel(self._handle, 0, 1, 2)
                data = TMTouchByte(self._handle, ROM_CMD_TCONV)
                time.sleep(0.6)
                data = TMTouchByte(self._handle, 0xFF)
                while data == 0:
                    data = TMTouchByte(self._handle, 0xFF)
                TMOneWireLevel(self._handle, 0, 0, 0)
            result = self._addressDevice(device)
            data = TMTouchByte(self._handle, ROM_CMD_READSCRATCHPAD)
            data = [TMTouchByte(self._handle, 0xFF) for i in range(9)]
            temp = ((0x07 & data[1]) << 4) + ((0xF0 & data[0]) >> 4) + \
                    (((0x08 & data[0]) >> 3) * 0.5) + (((0x04 & data[0]) >> 2) * 0.25) + \
                    (((0x02 & data[0]) >> 1) * 0.125) + (((0x01 & data[0])) * 0.0625)
            if (0x08 & data[1]) == 0x08:
                temp = temp - 128  # Fixed calculation of the negative temperature
        if float(temp) > float(device.devinfo.maxt):
            temp = device.devinfo.maxt
        elif float(temp) < float(device.devinfo.mint):
            temp = device.devinfo.mint
        return _T(device, float(temp), datetime.now(), None)

    def _read_DS2438(self, device, enableWireLeveling=False):
        """
        Reads the value from a DS2438 device connected to a HIH-4021 family device on the 1-Wire bus.

        device:
            The device of the device to read. Must have been enumerated.

        enableWireLeveling:
            Enables the reader to use wire leveling to read from certain devices. Not used for DS2438.
        """

        result = self._addressDevice(device)
        temp = None
        if result == 1:
            data = TMTouchByte(self._handle, ROM_CMD_TCONV)
            data = TMTouchByte(self._handle, 0xFF)
            while data == 0:
                data = TMTouchByte(self._handle, 0xFF)
            result = self._addressDevice(device)
            data = TMTouchByte(self._handle, ROM_CMD_READPOWER)
            data = TMTouchByte(self._handle, 0xFF)
            while data == 0:
                data = TMTouchByte(self._handle, 0xFF)
            result = self._addressDevice(device)
            data = TMTouchByte(self._handle, ROM_CMD_RECALL)
            data = TMTouchByte(self._handle, 0x00)
            result = self._addressDevice(device)
            data = TMTouchByte(self._handle, ROM_CMD_READSCRATCHPAD)
            data = TMTouchByte(self._handle, 0x00)
            data = [TMTouchByte(self._handle, 0xFF) for i in range(9)]
            temp = (0x7F & data[2])
            temp += ((0x80 & data[1]) >> 7) * 0.5
            temp += ((0x40 & data[1]) >> 6) * 0.25
            temp += ((0x20 & data[1]) >> 5) * 0.125
            temp += ((0x10 & data[1]) >> 4) * 0.0625
            temp += ((0x08 & data[1]) >> 3) * 0.03125
            if (0x80 & data[2]) == 0x80:
                temp = -temp
            voltage = (((data[4] & 0x03) << 8) + (data[3])) * 0.01
            # conversion from voltage to humidity, see HIH-4021 datasheet
            humidity = ((voltage / 5.0) - 0.16) / 0.0062
            humidity = humidity / (1.0546 - (0.00216 * temp))
        if float(temp) > float(device.devinfo.maxt):
            temp = device.devinfo.maxt
        elif float(temp) < float(device.devinfo.mint):
            temp = device.devinfo.mint
        return _T(device, float(temp), datetime.now(), humidity)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.