Source

python-specbar / specbar.py

Full commit
#!/usr/bin/python

import re
import sys
import time
import urllib.request as req
import xml.etree.ElementTree as ET

from collections import defaultdict, deque
from datetime import datetime
from functools import partial
from subprocess import check_output


__author__ = 'Thorsten Weimann'
__version__ = '0.1'
__license__ = 'MIT'

SLEEP_SECS = 5
WLAN_INTERFACE = 'wlan0'
DEFAULT_FORMAT = (
    'CPU: {model}@{speed:<5} ESSID: {essid} [{quality}] '
    'Gmail: {gmail_count:<3} {date} {time}'
)
GMAIL_REALM = 'New mail feed'
GMAIL_URI = 'https://mail.google.com/mail/feed/atom'
ENC = sys.getfilesystemencoding()

# re's
net = {
    'essid': re.compile(r'ESSID:"(?P<essid>.+)"', re.I),
    'tx': re.compile(r'Tx\-Power=(?P<tx>\d+)', re.I),
    'quality': re.compile(r'Link Quality=(?P<quality>\d+/\d+)', re.I),
    'rate': re.compile(r'Bit Rate=(?P<rate>\d+ [a-zA-Z]+)', re.I),
    'level': re.compile(r'Signal Level=(?P<level>[+-]\d+ [a-zA-Z]+)', re.I),
}

cpu = {
    'vendor': re.compile(r'vendor_id\s+:\s+(?P<vendor>.+)', re.I),
    'model': re.compile(r'model name\s+:\s+(?P<model>.+)', re.I),
    'speed': re.compile(r'cpu MHz\s+: (?P<speed>\d+)', re.I),
}

actions = deque()


def get_net_info(interface, encoding=None):
    d = {}
    out = check_output(['iwconfig', interface])
    out = out.decode(encoding or ENC)
    for name, regex in net.items():
        match = regex.search(out)
        d[name] = match.group(name)
    return d


def get_cpu_info():
    d = {}
    with open('/proc/cpuinfo') as fp:
        data = fp.read()
    for name, regex in cpu.items():
        match = regex.search(data)
        d[name] = match.group(name)
    return d


def get_gmail_count(user, passwd):
    gmail = dict(realm=GMAIL_REALM, uri=GMAIL_URI, user=user, passwd=passwd)
    auth_handler = req.HTTPBasicAuthHandler()
    auth_handler.add_password(**gmail)
    opener = req.build_opener(auth_handler)
    with opener.open(gmail['uri']) as response:
        root = ET.fromstring(response.read())
    count = root.find('{http://purl.org/atom/ns#}fullcount').text
    return dict(gmail_count=count)


def get_date_time(date_format='%d.%m.%Y', time_format='%H:%M'):
    dt = datetime.now()
    return dict(date=dt.strftime(date_format), time=dt.strftime(time_format))


def info_collector(*args, **kwargs):
    def wrapped(f):
        actions.append(partial(f, *args, **kwargs))
        return f


def register(func, *args, **kwargs):
    actions.append(partial(func, *args, **kwargs))


def register_builtins():
    actions.extend([
        get_cpu_info,
        partial(get_net_info, WLAN_INTERFACE),
        get_date_time,
    ])


def loop(format_str=DEFAULT_FORMAT, sleep_secs=SLEEP_SECS):
    try:
        while True:
            d = defaultdict(lambda: '-')
            for func in actions:
                try:
                    ret = func()
                    d.update(ret)
                except Exception as e:
                    sys.stderr.write(str(e) + '\n')
            sys.stdout.write(format_str.format_map(d) + '\n')
            sys.stdout.flush()
            time.sleep(sleep_secs)
    except KeyboardInterrupt:
        sys.stderr.write('\rShutting down specbar...\n')


if __name__ == '__main__':
    register_builtins()
    loop()