arduino-comm-example / python-ui.py

#!/usr/bin/env python
"""Simple Arduino User Interface Example

This program creates a simple user interface using Tkinter in python. The
user interface is event driven. It also starts a communication manager
class which handles two-way communication with the Arduino over the serial
connection. This class handles two threads, one for output to the arduino
and one for input from the arduino. The threads perform simple tasks. The
input thread waits for input from the arduino, when it receives an entire
command (indicated by a newline) it adds that command to the input queue.
The output thread waits for commands on the output queue and then sends
them to the Arduino.

The main loop of the user interface periodically polls for input from the
input queue and then reacts to that. Items are added from the main loop of
the user interface directly to the output queue as well for later 
communication to the Arduino.

Note that under Tkinter (and Tk in general) you should not interact with the
GUI from outside the Tkinter event loops. Therefore this polling from within
the loop is necessary for any GUI interaction.

Copyright 2011 Christopher De Vries
This program is distributed under the Artistic License 2.0, a copy of which
is included in the file LICENSE.txt
"""

import sys
import Tkinter
import tkMessageBox
import threading
import Queue
import serial

def main(argv=None):
    if argv is None:
        argv = sys.argv
    
    inputQ = Queue.Queue()
    outputQ = Queue.Queue()
    manager = CommunicationManager(inputQ,outputQ)
    gui = GuiClass(inputQ,outputQ,manager)

    gui.go()

    return 0

class GuiClass(object):
    def __init__(self,inputQ,outputQ,commManager):
        self.inputQ=inputQ
        self.outputQ=outputQ
        self.manager=commManager

        self.root = Tkinter.Tk()
        
        text_frame = Tkinter.Frame(self.root)
        entry_frame = Tkinter.Frame(self.root)
        
        scrollbar = Tkinter.Scrollbar(text_frame)
        self.text = Tkinter.Text(text_frame,height=5,width=80,wrap=Tkinter.WORD,yscrollcommand=scrollbar.set)
        scrollbar.pack(side=Tkinter.RIGHT,fill=Tkinter.Y)
        self.text.pack(fill=Tkinter.BOTH,expand=True)
        scrollbar.config(command=self.text.yview)

        portLabel = Tkinter.Label(entry_frame, text="Arduino Port: ")
        self.arduinoFile = Tkinter.StringVar()
        portEntry = Tkinter.Entry(entry_frame, textvariable=self.arduinoFile)
        portButton = Tkinter.Button(entry_frame, text="Connect", command=self.connect)
        portLabel.pack(side=Tkinter.LEFT)
        portEntry.pack(side=Tkinter.LEFT,fill=Tkinter.X,expand=True)
        portButton.pack(side=Tkinter.LEFT)

        queryButton = Tkinter.Button(self.root,text="Query",command=self.query)
        quitButton = Tkinter.Button(self.root,text="Quit",command=self.root.quit)
        entry_frame.pack(fill=Tkinter.X)
        text_frame.pack(fill=Tkinter.BOTH,expand=True)
        queryButton.pack(fill=Tkinter.X)
        quitButton.pack(fill=Tkinter.X)
        self.text.configure(state=Tkinter.DISABLED)
        self.firstline=True

        # Menubar
        menubar = Tkinter.Menu(self.root)
        
        filemenu = Tkinter.Menu(menubar,tearoff=False)
        #filemenu.add_command(label="Open...",command=self.openport)
        filemenu.add_separator()
        filemenu.add_command(label="Quit",command=self.root.quit)

        helpmenu = Tkinter.Menu(menubar,tearoff=False)
        helpmenu.add_command(label="About",command=self.about)

        menubar.add_cascade(label="File",menu=filemenu)
        menubar.add_cascade(label="Help",menu=helpmenu)
        self.root.config(menu=menubar)
        self.root.title("Python UI")

    def go(self):
        self.root.after(100, self.periodic_check)
        self.root.mainloop()

    def connect(self):
        filename = self.arduinoFile.get()
        self.manager.connect(filename)
        self.root.after(2000,self.check_connection)

    def check_connection(self):
        self.outputQ.put("CHECK")

    def periodic_check(self):
        try:
            inp = self.inputQ.get_nowait()
            tokens = inp.split()
            if len(tokens)>=1:
                if tokens[0]=="CONNECT":
                    self.writeline("Arduino Connected")
                elif tokens[0]=="HIGH":
                    self.writeline("LED is On")
                elif tokens[0]=="LOW":
                    self.writeline("LED is Off")
                elif tokens[0]=="CYCLE":
                    self.writeline(tokens[1]+" LED Flashes Completed.")
                elif tokens[0]=="IOHALT":
                    self.writeline("IO threads halted")
                elif tokens[0]=="ERROR":
                    self.writeline("Error: "+' '.join(tokens[1:]))
                else:
                    self.writeline("Unrecognized response:")
                    self.writeline(inp)
        except Queue.Empty:
            pass
            # self.writeline("No Data")
        self.root.after(100, self.periodic_check)

    def writeline(self,text):
        self.text.configure(state=Tkinter.NORMAL)
        if self.firstline:
            self.firstline=False
        else:
            self.text.insert(Tkinter.END,"\n")

        self.text.insert(Tkinter.END,text)
        self.text.yview(Tkinter.END)
        self.text.configure(state=Tkinter.DISABLED)
    
    def query(self):
        self.outputQ.put("QUERY")

    def about(self):
        tkMessageBox.showinfo("About",__doc__,parent=self.root)
        return

class CommunicationManager(object):
    def __init__(self,inputQ,outputQ):
        self.inputQ=inputQ
        self.outputQ=outputQ
        self.serialPort=None
        self.inputThread = None
        self.outputThread = None
        self.keepRunning = True
        self.activeConnection = False

    def runInput(self):
        while self.keepRunning:
            try:
                inputline = self.serialPort.readline()
                self.inputQ.put(inputline.rstrip())
            except:
                # This area is reached on connection closing
                pass
        return

    def runOutput(self):
        while self.keepRunning:
            try:
                outputline = self.outputQ.get()
                self.serialPort.write("%s\n"%outputline)
            except:
                # This area is reached on connection closing
                pass

        return

    def connect(self,filename):
        if self.activeConnection:
            self.close()
            self.activeConnection = False
        try:
            self.serialPort = serial.Serial(filename,9600)
        except serial.SerialException:
            self.inputQ.put("ERROR Unable to Connect to Serial Port")
            return
        self.keepRunning=True
        self.inputThread = threading.Thread(target=self.runInput)
        self.inputThread.daemon=True
        self.inputThread.start()
        self.outputThread = threading.Thread(target=self.runOutput)
        self.outputThread.daemon=True
        self.outputThread.start()
        self.activeConnection = True

    def close(self):
        self.keepRunning=False;
        self.serialPort.close()
        self.outputQ.put("TERMINATE") # This does not get sent, but stops the outputQ from blocking.
        self.inputThread.join()
        self.outputThread.join()
        self.inputQ.put("IOHALT")

if __name__ == "__main__":
    sys.exit(main())
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.