puma / Puma.py

#!/usr/bin/python
#
# Puma robot arm class.
# Talks to the MDC network socket to get the state of the
# controllers.  Maps to FK position, commands via IK, etc.
# 274 318 -190
# 305 -13 -190
#
# 200 405 -150
# 250 -200 -150
#
import socket
import threading
import time
import os
import re
import sys
from PumaFK import PumaFK
from PumaIK import PumaIK
from math import sqrt, pi

class Puma:
	def __init__(self, con, joints):
		self.live = True
		self.joints = joints
		self.counts = [0,0,0,0,0]
		self.last_xyz = False
		self.volts = 0
		self.fk = PumaFK(joints)
		self.ik = PumaIK(False, True, joints)
		self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
		self.sock.connect(con)
		self.con = self.sock.makefile()
		self.thread = threading.Thread(target = self.rx_loop)
		self.thread.daemon = True
		self.thread.start()

	def rx_loop(self):
		while True:
			line = self.con.readline()
			if not line:
				break
			line = line.strip()
			counts = line.split(",")
			if len(counts) != 7:
				print "Parse error: " + line
				continue
			volts = int(counts[0]) / 10.0
			self.counts = [int(x) for x in counts[1:]]
			#print "rx: " + line
			self.fk.update(self.counts)
			#print [int(x) for x in r.fk.p]

			if volts < 12.0 and self.volts > 12.0:
				print "NO VOLTAGE"
			elif volts > 12.0 and self.volts < 12.0:
				print "DANGER: Motors are live: " + str(round(volts,2)) + " volts"
			self.volts = volts

		print "RX THREAD EXITED"
		self.live = False

	# Move immediately to a set of counts
	def move_raw(self, ms, counts):
		cmd = "go," + str(int(ms)) + "," + ",".join([str(x) for x in counts])
		#print cmd
		self.sock.send(cmd + "\r\n")

	# Move one axis to a set of counts
	def moveto(self, axis, count, vel):
		cmd = "move," + str(int(axis)) + "," + str(int(count)) + "," + str(int(vel))
		self.sock.send(cmd + "\r\n")

	def home(self):
		self.move_raw(10000, [0,0,0,0,0,0])
	def start(self):
		self.move_raw(20000, [-417, 10904, -8125, -8109, 295, -6651])

	def hold(self):
		counts = self.counts[:]
		for i in range(0,6):
			self.moveto(i+1, counts[i], 1000)
		self.last_xyz = False
		print "Holding " + str(self.counts) + ": xyz " + str([int(x) for x in self.fk.p])

	def swing(self, speed):
		cur = self.counts[5]
		self.moveto(6, cur + 6000, 1000)
		time.sleep(0.5)
		self.moveto(6, cur + 12000, 1000)
		time.sleep(0.5)
		self.moveto(6, cur + 18000, 1000)
		time.sleep(0.5)

		self.moveto(6, cur + 12000, speed)
		time.sleep(0.125)
		self.moveto(6, cur + 6000, speed)
		time.sleep(0.125)
		self.moveto(6, cur, speed)
		time.sleep(0.125)
		self.moveto(6, cur - 6000, speed)
		time.sleep(0.125)
		self.moveto(6, cur - 12000, speed)
		time.sleep(0.125)
		self.moveto(6, cur - 18000, speed)

		time.sleep(5)
		self.moveto(6, cur - 12000, 1000)
		time.sleep(1)
		self.moveto(6, cur - 6000, 1000)
		time.sleep(1)
		self.moveto(6, cur, 1000)

	# Move immediately to an explicit IK computed position
	def move_ik(self, ms, xyz, a, s, n):
		self.ik.ik(xyz, a, s, n)

		counts = self.ik.get_counts(self.fk.theta)
		self.move_raw(ms, counts)

		print "counts:  ", [int(x) for x in self.counts]
		print "command: ", [int(x) for x in counts]
		print "current: ", [round(x* 180/pi, 1) for x in self.fk.theta]
		print "desired: ", [round(x* 180/pi, 1) for x in self.ik.theta]

	# Move linearly to an IK computed position at mm/s speed
	# This will hold the current tool configuration
	# The current xyz position is the last xyz position sent
	# or if we have had an intermediate save command, the last
	# computed FK position.  Otherwise we have a "step" when
	# the new xyz does not match the current xyz.
	def move_to(self, vel, dst_xyz):
		if self.last_xyz:
			xyz = self.last_xyz[:]
		else:
			xyz = self.fk.p[:]

		# For the swinging tool on the left side of the board,
		# the position is facing (approach) due X,
		# sliding in Z and the normal to Y
		cur_a = [1,0,0]
		cur_s = [0,0,1]
		cur_n = [0,1,0]

		self.move_ik(100000 / vel, dst_xyz, cur_a, cur_s, cur_n)
		self.last_xyz = dst_xyz[:]

	# Move linearly to an IK computed position at mm/s speed
	# This will hold the current tool configuration
	# The current xyz position is the last xyz position sent
	# or if we have had an intermediate save command, the last
	# computed FK position.  Otherwise we have a "step" when
	# the new xyz does not match the current xyz.
	def move_linear(self, vel, dst_xyz):
		if self.last_xyz:
			xyz = self.last_xyz[:]
		else:
			xyz = self.fk.p[:]

		cur_a = self.fk.a[:]
		cur_s = self.fk.s[:]
		cur_n = self.fk.n[:]

		# For the swinging tool on the left side of the board,
		# the position is facing (approach) due X,
		# sliding in Z and the normal to Y
		cur_a = [1,0,0]
		cur_s = [0,0,1]
		cur_n = [0,1,0]

		dx = dst_xyz[0] - xyz[0]
		dy = dst_xyz[1] - xyz[1]
		dz = dst_xyz[2] - xyz[2]
		dist = sqrt(dx*dx + dy*dy + dz*dz)

		steps_per_sec = 10
		steps = int(dist * steps_per_sec / vel)
		if steps == 0:
			steps = 1
		dt = 1.0 / steps_per_sec

		print "dist=" + str(int(dist))
		print "steps=" + str(steps)
		print "dt=" + str(round(dt, 4))

		dx /= steps
		dy /= steps
		dz /= steps

		print "dx=" + str(round(dx, 2))
		print "dy=" + str(round(dy, 2))
		print "dz=" + str(round(dz, 2))

		#dt = 0.

		for i in range(0,steps):
			xyz[0] += dx
			xyz[1] += dy
			xyz[2] += dz
			self.move_ik(dt*1000, xyz, cur_a, cur_s, cur_n)
			time.sleep(dt * 0.4)
		self.last_xyz = xyz[:]


if __name__ == "__main__":
	import PumaConfig
	r = Puma(("localhost", 31415), PumaConfig.joints)
	vel = 20

	while r.live:
		line = sys.stdin.readline()
		if not line:
			break
		line = line.strip()

		if line == "home":
			r.home()
			continue
		if line == "start":
			r.start()
			continue
		g = re.match("^swing ([0-9]+)", line)
		if g is not None:
			r.swing(int(g.group(1)))
			continue
		if line == "hold":
			r.hold()
			continue
		if line == "r":
			print r.counts
			continue
		if line == "a":
			print [round(x * 180 / pi, 1) for x in r.fk.theta]
			continue
		g = re.match("^a (-?[0-9]+) (-?[0-9]+) (-?[0-9]+) (-?[0-9]+) (-?[0-9]+) (-?[0-9]+)", line)
		if g is not None:
			theta = [int(x) * pi / 180 for x in g.group(1,2,3,4,5,6)]
			counts = r.ik.angle2count(theta, [])
			print counts
			r.move_raw(10000, counts)
			continue
		if line == "p":
			print [int(x) for x in r.fk.p], [round(x,2) for x in r.fk.a], [round(x,2) for x in r.fk.s], [round(x,2) for x in r.fk.n]
			continue
		g = re.match("^v ([0-9]+)", line)
		if g is not None:
			vel = int(g.group(1))
			if (vel == 0):
				vel = 1
			continue

		g = re.match("^(-?[0-9]+ -?[0-9]+ -?[0-9]+)$", line)
		if g is not None:
			xyz = [int(x) for x in g.group(1).split(" ")]
			r.move_to(vel, xyz)
			continue;

		print "?"
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.