Snippets

HauertLab Stochastic behaviours for retrieval of storage items using simulated robot swarms

Created by Emma Milner

README

These files run a simulation of a 5mx5m warehouse space in 2D. A stcohastic swarm of robots move around the warehouse picking up and delivering boxes to the delivery area on the far right hand side of the warehouse. The picking up/dropping off of a box by the robot is not simulated and assumed to be instantaneous for the purposes of this project. There are 2 tasks and 2 controllers:

Task 1: Unordered retrieval. The user requests all the boxes to be delivered asap. Task 2: Ordered retrieval. The user requests the boxes to all be delivered in a specified order (1 to N (number of boxes)). The robots put down boxes that are the wrong box for the sequence with probabilistic behaviours (0.03 prob every simulated second)

Random walk: the robots move with constant speed and a noisy heading. They avoid collisions which they can sense within their local sensory range. Biased heading behaviour: When the robot has a box/the correct box (depending on the task), they have a bias added to their heading which tends to move them towards the delivery area. This is achieved by adding a vector of magnitude 1 and direction entirely Easterly (the delivery area is on the East wall as seen from above via the simulation animation).

The simulation will run for the time limit given in run_sim.py. In the terminal it will print the total time taken to collect all the boxes OR if it has not collected all in the time limit then it will display a success rate and the total time taken. Success rate is e.g. 1.0 of 11 collected = 9.09% in 201 seconds

Run the file run_sim.py in the Terminal. Edit the following parameters within the run_sim.py files, depending on the simulation you want to run: - Task = 1 (unordered retrieval) or 2 (ordered retrieval) - Bias = 0 (no) or 1 (yes) - Swarm size = (number of agents, recommended 10-50) - num_items = (number of boxes to collect, recommend 10-50) - time_limit = (amount of time (s) in simulation, not actual time taken to run code) - ani = True (run a figure with an animation of the simulation) or False (don't run animation)

import task_1_RW
import task_1_bias
import task_2_RW
import task_2_bias
'To run this code in the Terminal use the command python run_sim.py'

'Task 1: Unordered retrieval, collect all the items in the time limit'
'Task 2: Ordered retrieval, collect the items in order of their ascending ID number'

'Inputs'
task = 2 # Task 1 or 2?
bias = 0 # Should the behaviour have the Biased Heading Behaviour or not?
swarm_size =10#number of agents in the swarm 
num_items = 10# number of items to be collected
time_limit = 5000 # how long to give the robots to complete the task
ani = True # Do you want to generate an animation of the behaviour?

##################################################
total_units = swarm_size + num_items
if total_units > 201:
	print("Error: Too many units in the warehouse. Max num box + num rob = 200")
	exit()
	
	
if task == 1:
	if bias == 0:
		sim = task_1_RW.data(swarm_size,num_items,ani,time_limit)
	if bias == 1:
		sim = task_1_bias.data(swarm_size,num_items,ani,time_limit)
		
if task == 2:
	if bias == 0:
		sim = task_2_RW.data(swarm_size,num_items,ani,time_limit)
	if bias == 1:
		sim = task_2_bias.data(swarm_size,num_items,ani,time_limit)
'''
Swarm Warehouse with Boxes Code:
Displays a bird's eye view of a warehouse with robots moving around, avoiding the walls and each other. Boxes are picked up and moved to exit zone by robots. 

Code authored by Emma Milner 

The actual specification for the Toshiba robots is as follows: 
agent speed = 100 cm/s
agent acceleration 200 cm/s/s (not used in this code)
diameter of agent is 25 cm 
width of testbed is 500 cm
height (depth) of warehouse is 500 cm 
'''

import numpy as np 
import math 
import random 
from matplotlib import pyplot as plt
from matplotlib import animation  
import scipy
from scipy.spatial.distance import cdist, pdist, euclidean
import pickle
import sys
import os

### INPUTS ###
radius = 12.5 # Radius of single agent (half of 25)
width = 500 # Width of warehouse
height = 500 # Height (depth) of warehouse 
speed = 100 # Agent speed
repulsion_distance =radius*2 + 10  # Distance at which repulsion is first felt 
box_radius = radius # radius of the box is the same as the robots
box_range = 2*box_radius # range at which a box can be picked up 
exit_width = int(0.2*width) # if it is too small then it will avoid the wall and be less likely to reach the exit zone 
time_step = 0.02 # 0.02 of 1 second

sf = 500/width # scale factor for the animation 
marker_size = sf*sf*width*0.5/20 #diameter of a robot for the animation 
	
class boxes():
	def __init__(self,number_of_boxes,robots):
		self.num_boxes = number_of_boxes 
		self.radius = box_radius # physical radius of the box (approximated to a circle even though square in animation)
		self.free_b = np.ones(self.num_boxes) # Box states set to 1 = Free (not on a robot), if = 0 = Not free (on a robot)
		self.delivered = 0 # Number of boxes that have been delivered 
		self.box_c = [] # centre coorindates of boxes starts as an empty list 
		self.box_d = np.zeros((self.num_boxes,2)) # deviation of the box centre coordinate (how far the box moves in one time step)
		self.robot_carrier = np.full((self.num_boxes),-1) # Value at index = box number is the robot number that is currently moving that box
		self.beyond_b = np.zeros(self.num_boxes) # Boolean list of boxes that are over the delivery boundary (1 is in the delivery area)
		
	# Initialising positions of boxes AND robots
		## No boxes or robots can start in the exit area
		possible_x = int((width-exit_width)/(radius*2)) # Number of positions possible on the x axis
		possible_y = int((height)/(radius*2)) # Number of positions possible on the y axis
		list_n = [] # empty list of possible positions 
		for x in range(possible_x):
			for y in range(possible_y):
				list_n.append([x,y]) # list of possible positions in the warehouse
		N = self.num_boxes+robots.num_agents # total number of units to assign positions to
		XY = random.sample(list_n,N) # select N unique coordinates at random from the list of possible positions
		c_select = [] # central coordinates (empty list) 
		for j in range(N): #for the total number of units (boxes + robots)
			c_select.append([radius + ((radius*2))*XY[j][0],radius + ((radius*2))*XY[j][1]]) # assign a central coordinate to unit j (can be a box or an agent) based on the unique randomly selected list, XY
		for b in range(self.num_boxes):
			self.box_c.append(c_select[b]) # assign initial box positions
		for r in range(robots.num_agents):
			robots.rob_c.append(c_select[r+self.num_boxes]) # assign initial robot positions
		self.box_c = np.array(self.box_c) # convert list to array
		robots.rob_c = np.array(robots.rob_c) # convert list to array 		
		###
		
class swarm():
	def __init__(self,num_agents):
		self.speed = speed # Agent speed 
		self.counter = 0.0 # time starts at 0s 
		self.num_agents = num_agents # Swarm size
		self.free_r = np.ones(self.num_agents) # Robot states set to 1 = Free (no box), if = 0 = Not free (has a box)
		self.heading = 0.0314*np.random.randint(-100,100,self.num_agents) # initial heading for all robots is randomly chosen
		self.rob_c = [] # centre coordinates of the robots starts as an empty list and is assigned in the boxes class
		self.rob_d = np.zeros((self.num_agents,2)) # robot centre cooridinate deviation (how much the robot moves in one time step)
		
	def iterate(self,boxes): # moves the robot and box positions forward in one time step
		dist = cdist(boxes.box_c, self.rob_c) # calculates the euclidean distance from every robot to every box (centres)
		qu_close_box = np.min(dist,1) < box_range # if the minimum distance box-robot is less than the pick up sensory range, then qu_close_box = 1 which means a box is nearby a robot
		mins = np.argmin(dist,1) # robot IDS with the minimum robot-box distances 
		checkb = boxes.free_b*qu_close_box #only include boxes that are free to be picked up (i.e. not already on a robot)
		box_n = np.argwhere(checkb==1) #the box numbers that are nearby and available
	# needs to be a loop (rather than vectorised) in case two robots are close to the same box
		for b in box_n:		# loop through the free, nearby boxes
			if self.free_r[mins[b]] == 1: # if the box is close to a robot and free AND the robot that is closest to box b is also free:
				self.free_r[mins[b]] = 0 # change robot state to 0 (not free, has a box)
				boxes.free_b[b] = 0 # change box state to 0 (not free, on a robot)
				boxes.box_c[b] = self.rob_c[mins[b]] # change the box centre so it is aligned with its robot carrier's centre
				boxes.robot_carrier[b] = mins[b] # set the robot_carrier for box b to that robot ID

		random_walk(self,boxes) # the robots move using the random walk function which generates a new deviation (rob_d) based on collision avoidance and the robot controller algorithm 
		
		self.rob_c = self.rob_c + self.rob_d # robots centre coordinates change as they move
		anti_free_b = boxes.free_b == 0 # Need to know which boxes should be moved by the robot that is carrying them. Here, if box is being carried then anti_free_b = 1 and therefore box_d(below) is not 0 
		boxes.box_d = np.array((anti_free_b,anti_free_b)).T*self.rob_d[boxes.robot_carrier] # move the boxes by the amount equal to the robot carrying them 
		boxes.box_c = boxes.box_c + boxes.box_d
		boxes.beyond_b = boxes.box_c.T[0] > width - exit_width - radius # beyond_b = 1 if box centre is in the delivery area (including those already delivered)
		boxes.beyond_b = boxes.beyond_b*anti_free_b # do NOT include boxes that have NOT been found by a robot as being delivered
		if any(boxes.beyond_b) == 1: # if any boxes have been delivered
			boxes.delivered = boxes.delivered + np.sum(boxes.beyond_b) # add to the number of deliveries made
			box_n = np.argwhere(boxes.beyond_b == 1) # box IDs that have been delivered
			rob_n = boxes.robot_carrier[box_n] # robot IDs that have delivered a box just now
			boxes.free_b[box_n] = 1 # mark boxes as free again
			self.free_r[rob_n] = 1 # mark robots as free again
			boxes.box_c.T[0,box_n] = boxes.box_c.T[0,box_n] - 600 # remove boxes from the warehouse

## Movement function for collision avoidance and the controller algorithm ## 
def random_walk(swarm,boxes):
	swarm.counter += time_step # time step forward by one time step  

	## Random Walk ##
	# Add noise to the heading 
	noise = 0.01*np.random.randint(-50,50,(swarm.num_agents))
	swarm.heading += noise
	# New chosen heading 
	heading_x = np.cos(swarm.heading) # new heading with noise in x 
	heading_y = np.sin(swarm.heading) # in y
	F_heading = -np.array([[heading_x[n], heading_y[n]] for n in range(0, swarm.num_agents)]) # influence on the robot's movement based on the noise added to the heading
	
	## Collision avoidance between agents ##
	# Compute euclidean (cdist) distance between agents and other agents
	agent_distance = cdist(swarm.rob_c, swarm.rob_c)	# distance between all the agents to all the agents
	too_close = agent_distance < repulsion_distance # TRUE if agent is too close to another agent such that it can be detected (enable collision avoidance)
	proximity_to_robots = swarm.rob_c[:,:,np.newaxis]-swarm.rob_c.T[np.newaxis,:,:] 	# Compute vectors between agents
	F_agent = too_close[:,np.newaxis,:]*proximity_to_robots # only include repulsion from agents that are too close (detectable)
	F_agent = np.sum(F_agent, axis =0).T # sum the repulsion vectors
	
	## Collision avoidance between agents and boxes ##
	# Compute euclidean (cdist) distance between boxes and agents
	box_dist = cdist(boxes.box_c,swarm.rob_c) # distance between all the boxes and all the agents
	too_close_boxes = box_dist < repulsion_distance # TRUE if agent is too close to a box (enable collision avoidance). Does not avoid box if agent does not have a box but this is considered later in the code (not_free*F_box)
	proximity_to_boxes = boxes.box_c[:,:,np.newaxis] - swarm.rob_c.T[np.newaxis,:,:]	# Calc repulsion vector on agents due to proximity to boxes
	F_box = too_close_boxes[:,np.newaxis,:]*proximity_to_boxes # only include repulsion from agents that are too close (detectable)
	F_box = np.sum(F_box,axis=0) # sum the repulsion vectors due to boxes on the agents
	not_free = swarm.free_r == 0 # list of robots that do not have a box. These robots do not want to avoid boxes at all so remove box repulsion from their calcs with the following lines 
	F_box[0] = not_free*F_box[0].T 
	F_box[1] = not_free*F_box[1].T
	
	## Collision avoidance between agents and walls ##
	# Force on agent due to proximity to walls
	F_wall = np.zeros((swarm.num_agents,2)) # assign a list of zeros for the collision avoidance of walls
	too_close_to_south_wall = swarm.rob_c[:,1] < radius # if too close to the South (y = 0) wall then TRUE
	too_close_to_east_wall = swarm.rob_c[:,0] > (width-radius) # East is x = width
	too_close_to_north_wall = swarm.rob_c[:,1] > (height-radius) # North is y = height
	too_close_to_west_wall = swarm.rob_c[:,0] < radius # West is x = 0 
	F_wall[too_close_to_south_wall,1] = -100 # Any agent too close to the South wall gets a repulsion force equal to 100 in the entirely y direction to move it away from the wall.
	F_wall[too_close_to_east_wall,0] = 100
	F_wall[too_close_to_north_wall,1] = 100 
	F_wall[too_close_to_west_wall,0] = -100 

	# Movement vectors added together
	F_agent += F_wall + F_heading + F_box.T
	F_x = F_agent.T[0] # vector in x
	F_y = F_agent.T[1] # in y 

	# Movement due to vectors
	new_heading = np.arctan2(F_y, F_x) # new heading due to repulsions
	move_x = swarm.speed*np.cos(new_heading)*time_step # Movement in x 
	move_y = swarm.speed*np.sin(new_heading)*time_step # Movement in y 
	# Total change in movement of agent (robot deviation)
	swarm.rob_d = -np.array([[move_x[n], move_y[n]] for n in range(0, swarm.num_agents)])
	return swarm.rob_d

####################################################################################
'This is the end of the algorithm. The following are used to run the simulation and record data.' 
####################################################################################
class data:
	def __init__(self,num_agents,num_boxes,anim,limit):
		self.num_agents = num_agents
		self.num_boxes = num_boxes
		self.robots = swarm(self.num_agents)
		self.items = boxes(self.num_boxes,self.robots)
		self.time = limit
		self.anim = anim
		self.data_counter = 0.0
		self.data_collect()

	def data_collect(self):
		self.robots.iterate(self.items)
		done = False
		if self.anim == False:
			while self.robots.counter <= self.time:
				self.robots.iterate(self.items)
				if self.items.delivered == self.items.num_boxes: # if it completes the task in the time limit
					print("in",self.robots.counter,"seconds") # print the time taken to complete the task 
					done = True
					self.data_counter = self.robots.counter
					self.robots.counter = self.time+1
			sr = self.items.delivered # success rate is generated if the time limit runs out before the task is complete
			if sr > 0:
				sr = float(sr/self.items.num_boxes)
			if done == False:
				print("in",self.time,"seconds") # time limit is effectively printed
				print(self.items.delivered,"of",self.items.num_boxes,"collected =",sr*100,"%") # success rate within the time limit is printed as a percentage of boxes collected 
		if self.anim == True:
			self.ani()
		
	def ani(self):
		fig = plt.figure()
		ax = plt.axes(xlim=(0, width), ylim=(0, height))
		dot, = ax.plot([self.robots.rob_c[i,0] for i in range(self.num_agents)],[self.robots.rob_c[i,1] for i in range(self.num_agents)], 'ko', markersize = marker_size, fillstyle = 'none') # dots represent robots which appear as black circles
		box, = ax.plot([self.items.box_c[i,0] for i in range(self.num_boxes)],[self.items.box_c[i,1] for i in range(self.num_boxes)], 'rs', markersize = marker_size-5) # these are the boxes which appear as red squares 
		plt.axis('square') # the warehouse is square 
		plt.axis([0,width,0,height])
		
		def animate(i):
			self.robots.iterate(self.items)
			dot.set_data([self.robots.rob_c[n,0] for n in range(self.num_agents)],[self.robots.rob_c[n,1] for n in range(self.num_agents)])
			box.set_data([self.items.box_c[n,0] for n in range(self.num_boxes)],[self.items.box_c[n,1] for n in range(self.num_boxes)])

			plt.title("Time is "+str(round(self.robots.counter))+"s") # print the current time to the visualisation 
			if self.items.delivered == self.num_boxes or self.robots.counter > self.time: # the animation will print the success rate as a percentage of boxes delivered 
				sr = self.items.delivered
				if sr > 0:
					sr = float(sr/self.items.num_boxes)
				print(self.items.delivered,"of",self.items.num_boxes,"collected =",sr*100,"%")
				print("in",self.robots.counter,"seconds")
				exit()
		anim = animation.FuncAnimation(fig, animate, frames=50, interval=10,blit=False)
		plt.xlabel("Warehouse width (cm)")
		plt.ylabel("Warehouse height (cm)")
		ex = [width-exit_width, width-exit_width]
		ey = [0, height]
		plt.plot(ex,ey,':')
		plt.show()
'''
Swarm Warehouse with Boxes Code:
Displays a bird's eye view of a warehouse with robots moving around, avoiding the walls and each other. Boxes are picked up and moved to exit zone by robots. 

This includes the Biased Heading Behaviour

Code authored by Emma Milner

The actual specification for the Toshiba robots is as follows: 
agent speed = 100 cm/s
agent acceleration 200 cm/s/s (not used in this code)
diameter of agent is 25 cm 
width of testbed is 500 cm
height (depth) of warehouse is 500 cm 
'''

import numpy as np 
import math 
import random 
from matplotlib import pyplot as plt
from matplotlib import animation  
import scipy
from scipy.spatial.distance import cdist, pdist, euclidean
import pickle
import sys
import os

### INPUTS ###
radius = 12.5 # Radius of single agent (half of 25)
width = 500 # Width of warehouse
height = 500 # Height (depth) of warehouse 
speed = 100 # Agent speed
repulsion_distance =radius*2 + 10  # Distance at which repulsion is first felt 
box_radius = radius # radius of the box is the same as the robots
box_range = 2*box_radius # range at which a box can be picked up 
exit_width = int(0.2*width) # if it is too small then it will avoid the wall and be less likely to reach the exit zone 
time_step = 0.02 # 0.02 of 1 second

sf = 500/width # scale factor for the animation 
marker_size = sf*sf*width*0.5/20 #diameter of a robot for the animation 
	
class boxes():
	def __init__(self,number_of_boxes,robots):
		self.num_boxes = number_of_boxes 
		self.radius = box_radius # physical radius of the box (approximated to a circle even though square in animation)
		self.free_b = np.ones(self.num_boxes) # Box states set to 1 = Free (not on a robot), if = 0 = Not free (on a robot)
		self.delivered = 0 # Number of boxes that have been delivered 
		self.box_c = [] # centre coorindates of boxes starts as an empty list 
		self.box_d = np.zeros((self.num_boxes,2)) # deviation of the box centre coordinate (how far the box moves in one time step)
		self.robot_carrier = np.full((self.num_boxes),-1) # Value at index = box number is the robot number that is currently moving that box
		self.beyond_b = np.zeros(self.num_boxes) # Boolean list of boxes that are over the delivery boundary (1 is in the delivery area)
		
	# Initialising positions of boxes AND robots
		## No boxes or robots can start in the exit area
		possible_x = int((width-exit_width)/(radius*2)) # Number of positions possible on the x axis
		possible_y = int((height)/(radius*2)) # Number of positions possible on the y axis
		list_n = [] # empty list of possible positions 
		for x in range(possible_x):
			for y in range(possible_y):
				list_n.append([x,y]) # list of possible positions in the warehouse
		N = self.num_boxes+robots.num_agents # total number of units to assign positions to
		XY = random.sample(list_n,N) # select N unique coordinates at random from the list of possible positions
		c_select = [] # central coordinates (empty list) 
		for j in range(N): #for the total number of units (boxes + robots)
			c_select.append([radius + ((radius*2))*XY[j][0],radius + ((radius*2))*XY[j][1]]) # assign a central coordinate to unit j (can be a box or an agent) based on the unique randomly selected list, XY
		for b in range(self.num_boxes):
			self.box_c.append(c_select[b]) # assign initial box positions
		for r in range(robots.num_agents):
			robots.rob_c.append(c_select[r+self.num_boxes]) # assign initial robot positions
		self.box_c = np.array(self.box_c) # convert list to array
		robots.rob_c = np.array(robots.rob_c) # convert list to array 		
		###
		
class swarm():
	def __init__(self,num_agents):
		self.speed = speed # Agent speed 
		self.counter = 0.0 # time starts at 0s 
		self.num_agents = num_agents # Swarm size
		self.free_r = np.ones(self.num_agents) # Robot states set to 1 = Free (no box), if = 0 = Not free (has a box)
		self.heading = 0.0314*np.random.randint(-100,100,self.num_agents) # initial heading for all robots is randomly chosen
		self.rob_c = [] # centre coordinates of the robots starts as an empty list and is assigned in the boxes class
		self.rob_d = np.zeros((self.num_agents,2)) # robot centre cooridinate deviation (how much the robot moves in one time step)
		
	def iterate(self,boxes): # moves the robot and box positions forward in one time step
		dist = cdist(boxes.box_c, self.rob_c) # calculates the euclidean distance from every robot to every box (centres)
		qu_close_box = np.min(dist,1) < box_range # if the minimum distance box-robot is less than the pick up sensory range, then qu_close_box = 1 which means a box is nearby a robot
		mins = np.argmin(dist,1) # robot IDS with the minimum robot-box distances 
		checkb = boxes.free_b*qu_close_box #only include boxes that are free to be picked up (i.e. not already on a robot)
		box_n = np.argwhere(checkb==1) #the box numbers that are nearby and available
	# needs to be a loop (rather than vectorised) in case two robots are close to the same box
		for b in box_n:		# loop through the free, nearby boxes
			if self.free_r[mins[b]] == 1: # if the box is close to a robot and free AND the robot that is closest to box b is also free:
				self.free_r[mins[b]] = 0 # change robot state to 0 (not free, has a box)
				boxes.free_b[b] = 0 # change box state to 0 (not free, on a robot)
				boxes.box_c[b] = self.rob_c[mins[b]] # change the box centre so it is aligned with its robot carrier's centre
				boxes.robot_carrier[b] = mins[b] # set the robot_carrier for box b to that robot ID

		random_walk(self,boxes) # the robots move using the random walk function which generates a new deviation (rob_d) based on collision avoidance and the robot controller algorithm 
		
		self.rob_c = self.rob_c + self.rob_d # robots centre coordinates change as they move
		anti_free_b = boxes.free_b == 0 # Need to know which boxes should be moved by the robot that is carrying them. Here, if box is being carried then anti_free_b = 1 and therefore box_d(below) is not 0 
		boxes.box_d = np.array((anti_free_b,anti_free_b)).T*self.rob_d[boxes.robot_carrier] # move the boxes by the amount equal to the robot carrying them 
		boxes.box_c = boxes.box_c + boxes.box_d
		boxes.beyond_b = boxes.box_c.T[0] > width - exit_width - radius # beyond_b = 1 if box centre is in the delivery area (including those already delivered)
		boxes.beyond_b = boxes.beyond_b*anti_free_b # do NOT include boxes that have NOT been found by a robot as being delivered
		if any(boxes.beyond_b) == 1: # if any boxes have been delivered
			boxes.delivered = boxes.delivered + np.sum(boxes.beyond_b) # add to the number of deliveries made
			box_n = np.argwhere(boxes.beyond_b == 1) # box IDs that have been delivered
			rob_n = boxes.robot_carrier[box_n] # robot IDs that have delivered a box just now
			boxes.free_b[box_n] = 1 # mark boxes as free again
			self.free_r[rob_n] = 1 # mark robots as free again
			boxes.box_c.T[0,box_n] = boxes.box_c.T[0,box_n] - 600 # remove boxes from the warehouse

## Movement function for collision avoidance and the controller algorithm ## 
def random_walk(swarm,boxes):
	swarm.counter += time_step # time step forward by one time step  

	## Random Walk ##
	# Add noise to the heading 
	noise = 0.01*np.random.randint(-50,50,(swarm.num_agents))
	swarm.heading += noise
	# New chosen heading 
	heading_x = np.cos(swarm.heading) # new heading with noise in x 
	heading_y = np.sin(swarm.heading) # in y
	
	## Biased Heading Behaviour ##
	anti_free_r = swarm.free_r == 0 # This equals 1 if the robot has a box
	heading_x = heading_x + anti_free_r # Add 1 to the x component of the robot's heading if it has a box
	F_heading = -np.array([[heading_x[n], heading_y[n]] for n in range(0, swarm.num_agents)]) # influence on the robot's movement based on the noise added to the heading
	
	## Collision avoidance between agents ##
	# Compute euclidean (cdist) distance between agents and other agents
	agent_distance = cdist(swarm.rob_c, swarm.rob_c)	# distance between all the agents to all the agents
	too_close = agent_distance < repulsion_distance # TRUE if agent is too close to another agent such that it can be detected (enable collision avoidance)
	proximity_to_robots = swarm.rob_c[:,:,np.newaxis]-swarm.rob_c.T[np.newaxis,:,:] 	# Compute vectors between agents
	F_agent = too_close[:,np.newaxis,:]*proximity_to_robots # only include repulsion from agents that are too close (detectable)
	F_agent = np.sum(F_agent, axis =0).T # sum the repulsion vectors
	
	## Collision avoidance between agents and boxes ##
	# Compute euclidean (cdist) distance between boxes and agents
	box_dist = cdist(boxes.box_c,swarm.rob_c) # distance between all the boxes and all the agents
	too_close_boxes = box_dist < repulsion_distance # TRUE if agent is too close to a box (enable collision avoidance). Does not avoid box if agent does not have a box but this is considered later in the code (not_free*F_box)
	proximity_to_boxes = boxes.box_c[:,:,np.newaxis] - swarm.rob_c.T[np.newaxis,:,:]	# Calc repulsion vector on agents due to proximity to boxes
	F_box = too_close_boxes[:,np.newaxis,:]*proximity_to_boxes # only include repulsion from agents that are too close (detectable)
	F_box = np.sum(F_box,axis=0) # sum the repulsion vectors due to boxes on the agents
	not_free = swarm.free_r == 0 # list of robots that do not have a box. These robots do not want to avoid boxes at all so remove box repulsion from their calcs with the following lines 
	F_box[0] = not_free*F_box[0].T 
	F_box[1] = not_free*F_box[1].T
	
	## Collision avoidance between agents and walls ##
	# Force on agent due to proximity to walls
	F_wall = np.zeros((swarm.num_agents,2)) # assign a list of zeros for the collision avoidance of walls
	too_close_to_south_wall = swarm.rob_c[:,1] < radius # if too close to the South (y = 0) wall then TRUE
	too_close_to_east_wall = swarm.rob_c[:,0] > (width-radius) # East is x = width
	too_close_to_north_wall = swarm.rob_c[:,1] > (height-radius) # North is y = height
	too_close_to_west_wall = swarm.rob_c[:,0] < radius # West is x = 0 
	F_wall[too_close_to_south_wall,1] = -100 # Any agent too close to the South wall gets a repulsion force equal to 100 in the entirely y direction to move it away from the wall.
	F_wall[too_close_to_east_wall,0] = 100
	F_wall[too_close_to_north_wall,1] = 100 
	F_wall[too_close_to_west_wall,0] = -100 

	# Movement vectors added together
	F_agent += F_wall + F_heading + F_box.T
	F_x = F_agent.T[0] # vector in x
	F_y = F_agent.T[1] # in y 

	# Movement due to vectors
	new_heading = np.arctan2(F_y, F_x) # new heading due to repulsions
	move_x = swarm.speed*np.cos(new_heading)*time_step # Movement in x 
	move_y = swarm.speed*np.sin(new_heading)*time_step # Movement in y 
	# Total change in movement of agent (robot deviation)
	swarm.rob_d = -np.array([[move_x[n], move_y[n]] for n in range(0, swarm.num_agents)])
	return swarm.rob_d

####################################################################################
'This is the end of the algorithm. The following are used to run the simulation and record data.'
####################################################################################
class data:
	def __init__(self,num_agents,num_boxes,anim,limit):
		self.num_agents = num_agents
		self.num_boxes = num_boxes
		self.robots = swarm(self.num_agents)
		self.items = boxes(self.num_boxes,self.robots)
		self.time = limit
		self.anim = anim
		self.data_counter = 0.0
		self.data_collect()


	def data_collect(self):
		self.robots.iterate(self.items)
		done = False
		if self.anim == False:
			while self.robots.counter <= self.time:
				self.robots.iterate(self.items)
				if self.items.delivered == self.items.num_boxes: # if it completes the task in the time limit
					print("in",self.robots.counter,"seconds") # print the time taken to complete the task 
					done = True
					self.data_counter = self.robots.counter
					self.robots.counter = self.time+1
			sr = self.items.delivered # success rate is generated if the time limit runs out before the task is complete
			if sr > 0:
				sr = float(sr/self.items.num_boxes)
			if done == False:
				print("in",self.time,"seconds") # time limit is effectively printed
				print(self.items.delivered,"of",self.items.num_boxes,"collected =",sr*100,"%") # success rate within the time limit is printed as a percentage of boxes collected 
		if self.anim == True:
			self.ani()
		
	def ani(self):
		fig = plt.figure()
		ax = plt.axes(xlim=(0, width), ylim=(0, height))
		dot, = ax.plot([self.robots.rob_c[i,0] for i in range(self.num_agents)],[self.robots.rob_c[i,1] for i in range(self.num_agents)], 'ko', markersize = marker_size, fillstyle = 'none') # dots represent robots which appear as black circles
		box, = ax.plot([self.items.box_c[i,0] for i in range(self.num_boxes)],[self.items.box_c[i,1] for i in range(self.num_boxes)], 'rs', markersize = marker_size-5) # these are the boxes which appear as red squares 
		plt.axis('square') # the warehouse is square 
		plt.axis([0,width,0,height])
		
		def animate(i):
			self.robots.iterate(self.items)
			dot.set_data([self.robots.rob_c[n,0] for n in range(self.num_agents)],[self.robots.rob_c[n,1] for n in range(self.num_agents)])
			box.set_data([self.items.box_c[n,0] for n in range(self.num_boxes)],[self.items.box_c[n,1] for n in range(self.num_boxes)])

			plt.title("Time is "+str(round(self.robots.counter))+"s") # print the current time to the visualisation 
			if self.items.delivered == self.num_boxes or self.robots.counter > self.time: # the animation will print the success rate as a percentage of boxes delivered 
				sr = self.items.delivered
				if sr > 0:
					sr = float(sr/self.items.num_boxes)
				print(self.items.delivered,"of",self.items.num_boxes,"collected =",sr*100,"%")
				print("in",self.robots.counter,"seconds")
				exit()
		anim = animation.FuncAnimation(fig, animate, frames=50, interval=10,blit=False)
		plt.xlabel("Warehouse width (cm)")
		plt.ylabel("Warehouse height (cm)")
		ex = [width-exit_width, width-exit_width]
		ey = [0, height]
		plt.plot(ex,ey,':')
		plt.show()
'''
Swarm Warehouse with Boxes Code:
Displays a bird's eye view of a warehouse with robots moving around, avoiding the walls and each other. Boxes are picked up and moved to exit zone by robots. Only one particular box can be delivered at a time. This box is chosen at random and no other boxes can be delivered until this one has. 

Code authored by Emma Milner 

The actual specification for the Toshiba robots is as follows: 
agent speed = 100 cm/s
agent acceleration 2 m/s/s (not used in this code)
diameter of agent is 25 cm
width of testbed is 500 cm
height (depth) of warehouse is 500 cm 
'''

import numpy as np 
import math 
import random 
from matplotlib import pyplot as plt
from matplotlib import animation  
import scipy
from scipy.spatial.distance import cdist, pdist, euclidean
import pickle
import sys
import os

### INPUTS ###
radius = 12.5 # Radius of single agent (half of 25)
width = 500 # Width of warehouse
height = 500 # Height (depth) of warehouse 
speed = 100 # Agent speed
repulsion_distance =radius*2 + 10  # Distance at which repulsion is first felt 
box_radius = radius # radius of the box is the same as the robots
box_range = 2*box_radius # range at which a box can be picked up 
exit_width = int(0.2*width) # if it is too small then it will avoid the wall and be less likely to reach the exit zone 
time_step = 0.02 # 0.1 of 1 second
drop_prob = 3 # likelihood out of 100, for a robot to drop its box every time step
	
sf = 500/width # scale factor for the animation 
marker_size = sf*sf*width*0.5/20 #diameter of a robot for the animation 
	
class boxes():
	def __init__(self,number_of_boxes,robots):
		self.num_boxes = number_of_boxes 
		self.radius = box_radius # physical radius of the box (approximated to a circle even though square in animation)
		self.free_b = np.ones(self.num_boxes) # Box states set to 1 = Free (not on a robot), if = 0 = Not free (on a robot)
		self.delivered = 0 # Number of boxes that have been delivered 
		self.box_c = [] # centre coorindates of boxes starts as an empty list 
		self.box_d = np.zeros((self.num_boxes,2)) # deviation of the box centre coordinate (how far the box moves in one time step)
		self.robot_carrier = np.full((self.num_boxes),-1) # Value at index = box number is the robot number that is currently moving that box
		self.beyond_b = np.zeros(self.num_boxes) # Boolean list of boxes that are over the delivery boundary (1 is in the delivery area)
		self.seq = 0 # sequence number: the box id to be delivered, starts at 0 
		
	# Initialising positions of boxes AND robots
		## No boxes or robots can start in the exit area
		possible_x = int((width-exit_width)/(radius*2)) # Number of positions possible on the x axis
		possible_y = int((height)/(radius*2)) # Number of positions possible on the y axis
		list_n = [] # empty list of possible positions 
		for x in range(possible_x):
			for y in range(possible_y):
				list_n.append([x,y]) # list of possible positions in the warehouse
		N = self.num_boxes+robots.num_agents # total number of units to assign positions to
		XY = random.sample(list_n,N) # select N unique coordinates at random from the list of possible positions
		c_select = [] # central coordinates (empty list) 
		for j in range(N): #for the total number of units (boxes + robots)
			c_select.append([radius + ((radius*2))*XY[j][0],radius + ((radius*2))*XY[j][1]]) # assign a central coordinate to unit j (can be a box or an agent) based on the unique randomly selected list, XY
		for b in range(self.num_boxes):
			self.box_c.append(c_select[b]) # assign initial box positions
		for r in range(robots.num_agents):
			robots.rob_c.append(c_select[r+self.num_boxes]) # assign initial robot positions
		self.box_c = np.array(self.box_c) # convert list to array
		robots.rob_c = np.array(robots.rob_c) # convert list to array 		
		###
		
class swarm():
	def __init__(self,num_agents):
		self.speed = speed # Agent speed 
		self.counter = 0.0 # time starts at 0s
		self.num_agents = num_agents # Swarm size
		self.free_r = np.ones(self.num_agents) # Robot states set to 1 = Free (no box), if = 0 = Not free (has a box)
		self.heading = 0.0314*np.random.randint(-100,100,self.num_agents) 
		# initial heading for all robots is randomly chosen
		self.rob_c = [] # centre coordinates of the robots starts as an empty list and is assigned in the boxes class
		self.rob_d = np.zeros((self.num_agents,2)) # robot centre cooridinate deviation (how much the robot moves in one time step)
		self.last_box = np.full((self.num_agents,2),-1) # collection of the last two boxes that the robot has carried. Starts as a list of '-1' because no robots have carried any boxes yet.
		
	def iterate(self,boxes): # moves the robot and box positions forward in one time step
		dist = cdist(boxes.box_c, self.rob_c) # calculates the euclidean distance from every robot to every box (centres)
		qu_close_box = np.min(dist,1) < box_range # if the minimum distance box-robot is less than the pick up sensory range, then qu_close_box = 1 which means a box is nearby a robot
		mins = np.argmin(dist,1) # robot IDS with the minimum robot-box distances 
		checkb = boxes.free_b*qu_close_box  #only include boxes that are free to be picked up (i.e. not already on a robot)
		box_n = np.argwhere(checkb==1) #the box numbers that are nearby and available
	# needs to be a loop (rather than vectorised) in case two robots are close to the same box
		for b in box_n:		# loop through the free boxes
			if self.free_r[mins[b]] == 1 and self.last_box[mins[b],0] != b and self.last_box[mins[b],1] != b: # if the box is close to a robot and free AND the robot that is closest to box b is also free AND that box has is not one of the last two boxes that that robot picked up:
				self.free_r[mins[b]] = 0 # change robot state to 0 (not free, has a box)
				boxes.free_b[b] = 0 # change box state to 0 (not free, on a robot)
				boxes.box_c[b] = self.rob_c[mins[b]] # change the box centre so it is aligned with its robot carrier's centre
				boxes.robot_carrier[b] = mins[b] # set the robot_carrier for box b to that robot ID

		random_walk(self,boxes) # the robots move using the random walk function which generates a new deviation (rob_d) based on collision avoidance and the robot controller algorithm 
		
		self.rob_c = self.rob_c + self.rob_d # robots centres change as they move
		anti_free_b = boxes.free_b == 0 # Need to know which boxes should be moved by the robot that is carrying them. Here, if box is being carried then anti_free_b = 1 and therefore box_d(below) is not 0 
		boxes.box_d = np.array((anti_free_b,anti_free_b)).T*self.rob_d[boxes.robot_carrier] # move the boxes by the amount equal to the robot carrying them 
		boxes.box_c = boxes.box_c + boxes.box_d
		boxes.beyond_b = boxes.box_c.T[0,boxes.seq] > width - exit_width - radius # beyond_b = 1 if box centre is in the delivery area (including those already delivered)
		if boxes.beyond_b == 1: # if any boxes have been delivered
			boxes.delivered = boxes.seq+1 # add to the number of deliveries made
			box_n = boxes.seq # box IDs that have been delivered
			rob_n = boxes.robot_carrier[box_n] # robot IDs that have delivered a box just now
			boxes.free_b[box_n] = 1 # mark boxes as free again
			self.free_r[rob_n] = 1 # mark robots as free again
			boxes.seq+=1 # move the sequence on by one so now there is a new box id to be delivered 
			boxes.box_c.T[0,box_n] = boxes.box_c.T[0,box_n] - 600 # remove boxes from the warehouse
			
		## Probabilistic reshuffling behaviour ##
		if boxes.seq < boxes.num_boxes: # if there are still boxes to be delivered
			box_drop = np.random.randint(0,100,boxes.num_boxes) # For each box, generate a random number between 0 and 100
			prob = box_drop < drop_prob # Select the boxes that are below the designated threshold 
			prob[boxes.seq] = 0 # Never drop the box that is the one that should be delivered next in the sequence
			prob_free_b = boxes.free_b == 0 # Only do this for boxes which are being carried
			P = prob_free_b*prob == 1 # Which boxes are being carried that should be dropped?
			box_n = np.argwhere(P==1) # Which box IDs should be dropped?
			rob_n = boxes.robot_carrier[box_n] # Robot IDs that are carrying those boxes 
			self.last_box[rob_n,1] = self.last_box[rob_n,0] # assign the last box to the second last box ID that that robot has carried
			self.last_box[rob_n,0] = box_n # assign a new last box Id that that robot has carried
			self.free_r[rob_n] = 1 # Free up the robot (drop the box)
			boxes.robot_carrier[box_n] = -1 # Assign no robot to the dropped box
			boxes.free_b = boxes.free_b + (prob*prob_free_b) # Drop the box
		##

## Movement function with agent-agent avoidance behaviours ## 
def random_walk(swarm,boxes):
	swarm.counter += time_step # time step forward by one time step  

	## Random Walk ##
	# Add noise to the heading 
	noise = 0.01*np.random.randint(-50,50,(swarm.num_agents))
	swarm.heading += noise
	# New chosen heading 
	heading_x = np.cos(swarm.heading) # new heading with noise in x 
	heading_y = np.sin(swarm.heading) # in y
	F_heading = -np.array([[heading_x[n], heading_y[n]] for n in range(0, swarm.num_agents)]) # influence on the robot's movement based on the noise added to the heading
	
	## Collision avoidance between agents ##
	# Compute euclidean (cdist) distance between agents and other agents
	agent_distance = cdist(swarm.rob_c, swarm.rob_c)	# distance between all the agents to all the agents
	too_close = agent_distance < repulsion_distance # TRUE if agent is too close to another agent such that it can be detected (enable collision avoidance)
	proximity_to_robots = swarm.rob_c[:,:,np.newaxis]-swarm.rob_c.T[np.newaxis,:,:] 	# Compute vectors between agents
	F_agent = too_close[:,np.newaxis,:]*proximity_to_robots # only include repulsion from agents that are too close (detectable)
	F_agent = np.sum(F_agent, axis =0).T # sum the repulsion vectors
	
	## Collision avoidance between agents and boxes ##
	# Compute euclidean (cdist) distance between boxes and agents
	box_dist = cdist(boxes.box_c,swarm.rob_c) # distance between all the boxes and all the agents
	too_close_boxes = box_dist < repulsion_distance # TRUE if agent is too close to a box (enable collision avoidance). Does not avoid box if agent does not have a box but this is considered later in the code (not_free*F_box)
	proximity_to_boxes = boxes.box_c[:,:,np.newaxis] - swarm.rob_c.T[np.newaxis,:,:]	# Calc repulsion vector on agents due to proximity to boxes
	F_box = too_close_boxes[:,np.newaxis,:]*proximity_to_boxes # only include repulsion from agents that are too close (detectable)
	F_box = np.sum(F_box,axis=0) # sum the repulsion vectors due to boxes on the agents
	not_free = swarm.free_r == 0 # list of robots that do not have a box. These robots do not want to avoid boxes at all so remove box repulsion from their calcs with the following lines 
	F_box[0] = not_free*F_box[0].T 
	F_box[1] = not_free*F_box[1].T
	
	## Collision avoidance between agents and walls ##
	# Force on agent due to proximity to walls
	F_wall = np.zeros((swarm.num_agents,2)) # assign a list of zeros for the collision avoidance of walls
	too_close_to_south_wall = swarm.rob_c[:,1] < radius # if too close to the South (y = 0) wall then TRUE
	too_close_to_east_wall = swarm.rob_c[:,0] > (width-radius) # East is x = width
	too_close_to_north_wall = swarm.rob_c[:,1] > (height-radius) # North is y = height
	too_close_to_west_wall = swarm.rob_c[:,0] < radius # West is x = 0 
	F_wall[too_close_to_south_wall,1] = -100 # Any agent too close to the South wall gets a repulsion force equal to 100 in the entirely y direction to move it away from the wall.
	F_wall[too_close_to_east_wall,0] = 100
	F_wall[too_close_to_north_wall,1] = 100 
	F_wall[too_close_to_west_wall,0] = -100 

	# Movement vectors added together
	F_agent += F_wall + F_heading + F_box.T
	F_x = F_agent.T[0] # vector in x
	F_y = F_agent.T[1] # in y 

	# Movement due to vectors
	new_heading = np.arctan2(F_y, F_x) # new heading due to repulsions
	move_x = swarm.speed*np.cos(new_heading)*time_step # Movement in x 
	move_y = swarm.speed*np.sin(new_heading)*time_step # Movement in y 
	# Total change in movement of agent (robot deviation)
	swarm.rob_d = -np.array([[move_x[n], move_y[n]] for n in range(0, swarm.num_agents)])
	return swarm.rob_d

####################################################################################
'This is the end of the algorithm. The following are used to run the simulation and record data.'
####################################################################################
class data:
	def __init__(self,num_agents,num_boxes,anim,limit):
		self.num_agents = num_agents
		self.num_boxes = num_boxes
		self.robots = swarm(self.num_agents)
		self.items = boxes(self.num_boxes,self.robots)
		self.time = limit
		self.anim = anim
		self.data_counter = 0.0
		self.data_collect()


	def data_collect(self):
		self.robots.iterate(self.items)
		done = False
		if self.anim == False:
			while self.robots.counter <= self.time:
				self.robots.iterate(self.items)
				if self.items.delivered == self.items.num_boxes: # if it completes the task in the time limit
					print("in",self.robots.counter,"seconds") # print the time taken to complete the task 
					done = True
					self.data_counter = self.robots.counter
					self.robots.counter = self.time+1
			sr = self.items.delivered # success rate is generated if the time limit runs out before the task is complete
			if sr > 0:
				sr = float(sr/self.items.num_boxes)
			if done == False:
				print("in",self.time,"seconds") # time limit is effectively printed
				print(self.items.delivered,"of",self.items.num_boxes,"collected =",sr*100,"%") # success rate within the time limit is printed as a percentage of boxes collected 
		if self.anim == True:
			self.ani()
			
		
	def ani(self):
		fig = plt.figure()
		ax = plt.axes(xlim=(0, width), ylim=(0, height))
		dot, = ax.plot([self.robots.rob_c[i,0] for i in range(self.num_agents)],[self.robots.rob_c[i,1] for i in range(self.num_agents)], 'ko', markersize = marker_size, fillstyle = 'none') # dots represent robots which appear as black circles
		box, = ax.plot([self.items.box_c[i,0] for i in range(self.num_boxes)],[self.items.box_c[i,1] for i in range(self.num_boxes)], 'rs', markersize = marker_size-5) # these are the boxes which appear as red squares 
		seq, = ax.plot([self.items.box_c[0,0]],[self.items.box_c[0,1]],'ks',markersize = marker_size-5) # The box that needs to be delivered next in the sequence is indicated by a black square
		
		plt.axis('square') # the warehouse is square 
		plt.axis([0,width,0,height])
		
		def animate(i):
			self.robots.iterate(self.items)
			dot.set_data([self.robots.rob_c[n,0] for n in range(self.num_agents)],[self.robots.rob_c[n,1] for n in range(self.num_agents)])
			box.set_data([self.items.box_c[n,0] for n in range(self.num_boxes)],[self.items.box_c[n,1] for n in range(self.num_boxes)])
			seq.set_data([self.items.box_c[self.items.seq,0],[self.items.box_c[self.items.seq,1]]])
			plt.title("Time is "+str(round(self.robots.counter))+"s") # print the current time to the visualisation 
			if self.items.delivered == self.num_boxes or self.robots.counter > self.time: # the animation will print the success rate as a percentage of boxes delivered 
				sr = self.items.delivered
				if sr > 0:
					sr = float(sr/self.items.num_boxes)
				print(self.items.delivered,"of",self.items.num_boxes,"collected =",sr*100,"%")
				print("in",self.robots.counter,"seconds")
				exit()
		anim = animation.FuncAnimation(fig, animate, frames=50, interval=10,blit=False)
		plt.xlabel("Warehouse width (cm)")
		plt.ylabel("Warehouse height (cm)")
		ex = [width-exit_width, width-exit_width]
		ey = [0, height]
		plt.plot(ex,ey,':')
		plt.show()
'''
Swarm Warehouse with Boxes Code:
Displays a bird's eye view of a warehouse with robots moving around, avoiding the walls and each other. Boxes are picked up and moved to exit zone by robots. Only one particular box can be delivered at a time. This box is chosen at random and no other boxes can be delivered until this one has. 

This includes the Biased Heading Behaviour which only applies to the box that is next in the sequence to be delivered

Code authored by Emma Milner 

The actual specification for the Toshiba robots is as follows: 
agent speed = 100 cm/s
agent acceleration 2 m/s/s (not used in this code)
diameter of agent is 25 cm
width of testbed is 500 cm
height (depth) of warehouse is 500 cm 
'''

import numpy as np 
import math 
import random 
from matplotlib import pyplot as plt
from matplotlib import animation  
import scipy
from scipy.spatial.distance import cdist, pdist, euclidean
import pickle
import sys
import os

### INPUTS ###
radius = 12.5 # Radius of single agent (half of 25)
width = 500 # Width of warehouse
height = 500 # Height (depth) of warehouse 
speed = 100 # Agent speed
repulsion_distance =radius*2 + 10  # Distance at which repulsion is first felt 
box_radius = radius # radius of the box is the same as the robots
box_range = 2*box_radius # range at which a box can be picked up 
exit_width = int(0.2*width) # if it is too small then it will avoid the wall and be less likely to reach the exit zone 
time_step = 0.02 # 0.1 of 1 second
drop_prob = 3 # likelihood out of 100, for a robot to drop its box every time step
	
sf = 500/width # scale factor for the animation 
marker_size = sf*sf*width*0.5/20 #diameter of a robot for the animation 
	
class boxes():
	def __init__(self,number_of_boxes,robots):
		self.num_boxes = number_of_boxes 
		self.radius = box_radius # physical radius of the box (approximated to a circle even though square in animation)
		self.free_b = np.ones(self.num_boxes) # Box states set to 1 = Free (not on a robot), if = 0 = Not free (on a robot)
		self.delivered = 0 # Number of boxes that have been delivered 
		self.box_c = [] # centre coorindates of boxes starts as an empty list 
		self.box_d = np.zeros((self.num_boxes,2)) # deviation of the box centre coordinate (how far the box moves in one time step)
		self.robot_carrier = np.full((self.num_boxes),-1) # Value at index = box number is the robot number that is currently moving that box
		self.beyond_b = np.zeros(self.num_boxes) # Boolean list of boxes that are over the delivery boundary (1 is in the delivery area)
		self.seq = 0 # sequence number: the box id to be delivered, starts at 0 
		
	# Initialising positions of boxes AND robots
		## No boxes or robots can start in the exit area
		possible_x = int((width-exit_width)/(radius*2)) # Number of positions possible on the x axis
		possible_y = int((height)/(radius*2)) # Number of positions possible on the y axis
		list_n = [] # empty list of possible positions 
		for x in range(possible_x):
			for y in range(possible_y):
				list_n.append([x,y]) # list of possible positions in the warehouse
		N = self.num_boxes+robots.num_agents # total number of units to assign positions to
		XY = random.sample(list_n,N) # select N unique coordinates at random from the list of possible positions
		c_select = [] # central coordinates (empty list) 
		for j in range(N): #for the total number of units (boxes + robots)
			c_select.append([radius + ((radius*2))*XY[j][0],radius + ((radius*2))*XY[j][1]]) # assign a central coordinate to unit j (can be a box or an agent) based on the unique randomly selected list, XY
		for b in range(self.num_boxes):
			self.box_c.append(c_select[b]) # assign initial box positions
		for r in range(robots.num_agents):
			robots.rob_c.append(c_select[r+self.num_boxes]) # assign initial robot positions
		self.box_c = np.array(self.box_c) # convert list to array
		robots.rob_c = np.array(robots.rob_c) # convert list to array 		
		###
		
class swarm():
	def __init__(self,num_agents):
		self.speed = speed # Agent speed 
		self.counter = 0.0 # time starts at 0s
		self.num_agents = num_agents # Swarm size
		self.free_r = np.ones(self.num_agents) # Robot states set to 1 = Free (no box), if = 0 = Not free (has a box)
		self.heading = 0.0314*np.random.randint(-100,100,self.num_agents) 
		# initial heading for all robots is randomly chosen
		self.rob_c = [] # centre coordinates of the robots starts as an empty list and is assigned in the boxes class
		self.rob_d = np.zeros((self.num_agents,2)) # robot centre cooridinate deviation (how much the robot moves in one time step)
		self.last_box = np.full((self.num_agents,2),-1) # collection of the last two boxes that the robot has carried. Starts as a list of '-1' because no robots have carried any boxes yet.
		
	def iterate(self,boxes): # moves the robot and box positions forward in one time step
		dist = cdist(boxes.box_c, self.rob_c) # calculates the euclidean distance from every robot to every box (centres)
		qu_close_box = np.min(dist,1) < box_range # if the minimum distance box-robot is less than the pick up sensory range, then qu_close_box = 1 which means a box is nearby a robot
		mins = np.argmin(dist,1) # robot IDS with the minimum robot-box distances 
		checkb = boxes.free_b*qu_close_box  #only include boxes that are free to be picked up (i.e. not already on a robot)
		box_n = np.argwhere(checkb==1) #the box numbers that are nearby and available
	# needs to be a loop (rather than vectorised) in case two robots are close to the same box
		for b in box_n:		# loop through the free boxes
			if self.free_r[mins[b]] == 1 and self.last_box[mins[b],0] != b and self.last_box[mins[b],1] != b: # if the box is close to a robot and free AND the robot that is closest to box b is also free AND that box has is not one of the last two boxes that that robot picked up:
				self.free_r[mins[b]] = 0 # change robot state to 0 (not free, has a box)
				boxes.free_b[b] = 0 # change box state to 0 (not free, on a robot)
				boxes.box_c[b] = self.rob_c[mins[b]] # change the box centre so it is aligned with its robot carrier's centre
				boxes.robot_carrier[b] = mins[b] # set the robot_carrier for box b to that robot ID

		random_walk(self,boxes) # the robots move using the random walk function which generates a new deviation (rob_d) based on collision avoidance and the robot controller algorithm 
		
		self.rob_c = self.rob_c + self.rob_d # robots centres change as they move
		anti_free_b = boxes.free_b == 0 # Need to know which boxes should be moved by the robot that is carrying them. Here, if box is being carried then anti_free_b = 1 and therefore box_d(below) is not 0 
		boxes.box_d = np.array((anti_free_b,anti_free_b)).T*self.rob_d[boxes.robot_carrier] # move the boxes by the amount equal to the robot carrying them 
		boxes.box_c = boxes.box_c + boxes.box_d
		boxes.beyond_b = boxes.box_c.T[0,boxes.seq] > width - exit_width - radius # beyond_b = 1 if box centre is in the delivery area (including those already delivered)
		if boxes.beyond_b == 1: # if any boxes have been delivered
			boxes.delivered = boxes.seq+1 # add to the number of deliveries made
			box_n = boxes.seq # box IDs that have been delivered
			rob_n = boxes.robot_carrier[box_n] # robot IDs that have delivered a box just now
			boxes.free_b[box_n] = 1 # mark boxes as free again
			self.free_r[rob_n] = 1 # mark robots as free again
			boxes.seq+=1 # move the sequence on by one so now there is a new box id to be delivered 
			boxes.box_c.T[0,box_n] = boxes.box_c.T[0,box_n] - 600 # remove boxes from the warehouse
			
		## Probabilistic reshuffling behaviour ##
		if boxes.seq < boxes.num_boxes: # if there are still boxes to be delivered
			box_drop = np.random.randint(0,100,boxes.num_boxes) # For each box, generate a random number between 0 and 100
			prob = box_drop < drop_prob # Select the boxes that are below the designated threshold 
			prob[boxes.seq] = 0 # Never drop the box that is the one that should be delivered next in the sequence
			prob_free_b = boxes.free_b == 0 # Only do this for boxes which are being carried
			P = prob_free_b*prob == 1 # Which boxes are being carried that should be dropped?
			box_n = np.argwhere(P==1) # Which box IDs should be dropped?
			rob_n = boxes.robot_carrier[box_n] # Robot IDs that are carrying those boxes 
			self.last_box[rob_n,1] = self.last_box[rob_n,0] # assign the last box to the second last box ID that that robot has carried
			self.last_box[rob_n,0] = box_n # assign a new last box Id that that robot has carried
			self.free_r[rob_n] = 1 # Free up the robot (drop the box)
			boxes.robot_carrier[box_n] = -1 # Assign no robot to the dropped box
			boxes.free_b = boxes.free_b + (prob*prob_free_b) # Drop the box
		##

## Movement function with agent-agent avoidance behaviours ## 
def random_walk(swarm,boxes):
	swarm.counter += time_step # time step forward by one time step  

	## Random Walk ##
	# Add noise to the heading 
	noise = 0.01*np.random.randint(-50,50,(swarm.num_agents))
	swarm.heading += noise
	# New chosen heading 
	heading_x = np.cos(swarm.heading) # new heading with noise in x 
	heading_y = np.sin(swarm.heading) # in y
	## Biased Heading Behaviour ##
	anti_free_r = swarm.free_r == 0 # This equals 1 if the robot has a box
	heading_x = heading_x + anti_free_r # Add 1 to the x component of the robot's heading if it has a box
	F_heading = -np.array([[heading_x[n], heading_y[n]] for n in range(0, swarm.num_agents)]) # influence on the robot's movement based on the noise added to the heading
	
	## Collision avoidance between agents ##
	# Compute euclidean (cdist) distance between agents and other agents
	agent_distance = cdist(swarm.rob_c, swarm.rob_c)	# distance between all the agents to all the agents
	too_close = agent_distance < repulsion_distance # TRUE if agent is too close to another agent such that it can be detected (enable collision avoidance)
	proximity_to_robots = swarm.rob_c[:,:,np.newaxis]-swarm.rob_c.T[np.newaxis,:,:] 	# Compute vectors between agents
	F_agent = too_close[:,np.newaxis,:]*proximity_to_robots # only include repulsion from agents that are too close (detectable)
	F_agent = np.sum(F_agent, axis =0).T # sum the repulsion vectors
	
	## Collision avoidance between agents and boxes ##
	# Compute euclidean (cdist) distance between boxes and agents
	box_dist = cdist(boxes.box_c,swarm.rob_c) # distance between all the boxes and all the agents
	too_close_boxes = box_dist < repulsion_distance # TRUE if agent is too close to a box (enable collision avoidance). Does not avoid box if agent does not have a box but this is considered later in the code (not_free*F_box)
	proximity_to_boxes = boxes.box_c[:,:,np.newaxis] - swarm.rob_c.T[np.newaxis,:,:]	# Calc repulsion vector on agents due to proximity to boxes
	F_box = too_close_boxes[:,np.newaxis,:]*proximity_to_boxes # only include repulsion from agents that are too close (detectable)
	F_box = np.sum(F_box,axis=0) # sum the repulsion vectors due to boxes on the agents
	not_free = swarm.free_r == 0 # list of robots that do not have a box. These robots do not want to avoid boxes at all so remove box repulsion from their calcs with the following lines 
	F_box[0] = not_free*F_box[0].T 
	F_box[1] = not_free*F_box[1].T
	
	## Collision avoidance between agents and walls ##
	# Force on agent due to proximity to walls
	F_wall = np.zeros((swarm.num_agents,2)) # assign a list of zeros for the collision avoidance of walls
	too_close_to_south_wall = swarm.rob_c[:,1] < radius # if too close to the South (y = 0) wall then TRUE
	too_close_to_east_wall = swarm.rob_c[:,0] > (width-radius) # East is x = width
	too_close_to_north_wall = swarm.rob_c[:,1] > (height-radius) # North is y = height
	too_close_to_west_wall = swarm.rob_c[:,0] < radius # West is x = 0 
	F_wall[too_close_to_south_wall,1] = -100 # Any agent too close to the South wall gets a repulsion force equal to 100 in the entirely y direction to move it away from the wall.
	F_wall[too_close_to_east_wall,0] = 100
	F_wall[too_close_to_north_wall,1] = 100 
	F_wall[too_close_to_west_wall,0] = -100 

	# Movement vectors added together
	F_agent += F_wall + F_heading + F_box.T
	F_x = F_agent.T[0] # vector in x
	F_y = F_agent.T[1] # in y 

	# Movement due to vectors
	new_heading = np.arctan2(F_y, F_x) # new heading due to repulsions
	move_x = swarm.speed*np.cos(new_heading)*time_step # Movement in x 
	move_y = swarm.speed*np.sin(new_heading)*time_step # Movement in y 
	# Total change in movement of agent (robot deviation)
	swarm.rob_d = -np.array([[move_x[n], move_y[n]] for n in range(0, swarm.num_agents)])
	return swarm.rob_d

####################################################################################
'This is the end of the algorithm. The following are used to run the simulation and record data.'
####################################################################################
class data:
	def __init__(self,num_agents,num_boxes,anim,limit):
		self.num_agents = num_agents
		self.num_boxes = num_boxes
		self.robots = swarm(self.num_agents)
		self.items = boxes(self.num_boxes,self.robots)
		self.time = limit
		self.anim = anim
		self.data_counter = 0.0
		self.data_collect()


	def data_collect(self):
		self.robots.iterate(self.items)
		done = False
		if self.anim == False:
			while self.robots.counter <= self.time:
				self.robots.iterate(self.items)
				if self.items.delivered == self.items.num_boxes: # if it completes the task in the time limit
					print("in",self.robots.counter,"seconds") # print the time taken to complete the task 
					done = True
					self.data_counter = self.robots.counter
					self.robots.counter = self.time+1
			sr = self.items.delivered # success rate is generated if the time limit runs out before the task is complete
			if sr > 0:
				sr = float(sr/self.items.num_boxes)
			if done == False:
				print("in",self.time,"seconds") # time limit is effectively printed
				print(self.items.delivered,"of",self.items.num_boxes,"collected =",sr*100,"%") # success rate within the time limit is printed as a percentage of boxes collected 
		if self.anim == True:
			self.ani()
			
		
	def ani(self):
		fig = plt.figure()
		ax = plt.axes(xlim=(0, width), ylim=(0, height))
		dot, = ax.plot([self.robots.rob_c[i,0] for i in range(self.num_agents)],[self.robots.rob_c[i,1] for i in range(self.num_agents)], 'ko', markersize = marker_size, fillstyle = 'none') # dots represent robots which appear as black circles
		box, = ax.plot([self.items.box_c[i,0] for i in range(self.num_boxes)],[self.items.box_c[i,1] for i in range(self.num_boxes)], 'rs', markersize = marker_size-5) # these are the boxes which appear as red squares 
		seq, = ax.plot([self.items.box_c[0,0]],[self.items.box_c[0,1]],'ks',markersize = marker_size-5) # The box that needs to be delivered next in the sequence is indicated by a black square
		
		plt.axis('square') # the warehouse is square 
		plt.axis([0,width,0,height])
		
		def animate(i):
			self.robots.iterate(self.items)
			dot.set_data([self.robots.rob_c[n,0] for n in range(self.num_agents)],[self.robots.rob_c[n,1] for n in range(self.num_agents)])
			box.set_data([self.items.box_c[n,0] for n in range(self.num_boxes)],[self.items.box_c[n,1] for n in range(self.num_boxes)])
			seq.set_data([self.items.box_c[self.items.seq,0],[self.items.box_c[self.items.seq,1]]])
			plt.title("Time is "+str(round(self.robots.counter))+"s") # print the current time to the visualisation 
			if self.items.delivered == self.num_boxes or self.robots.counter > self.time: # the animation will print the success rate as a percentage of boxes delivered 
				sr = self.items.delivered
				if sr > 0:
					sr = float(sr/self.items.num_boxes)
				print(self.items.delivered,"of",self.items.num_boxes,"collected =",sr*100,"%")
				print("in",self.robots.counter,"seconds")
				exit()
		anim = animation.FuncAnimation(fig, animate, frames=50, interval=10,blit=False)
		plt.xlabel("Warehouse width (cm)")
		plt.ylabel("Warehouse height (cm)")
		ex = [width-exit_width, width-exit_width]
		ey = [0, height]
		plt.plot(ex,ey,':')
		plt.show()

Comments (0)

HTTPS SSH

You can clone a snippet to your computer for local editing. Learn more.