EventDrivenMD / edmd.py

import numpy as np
import math
import itertools


class Particle(object):
    def __init__(self, x, v):
        self.x = np.array(x)
        self.v = np.array(v)
        self.event_times = []

    def __repr__(self):
        return "Particle(%s, %s)" % (self.x, self.v)

    def move(self, delta_t):
        self.x += delta_t * self.v
        
    def overlaps_with(self, other_particle, radius):
        squared_distance = np.sum((other_particle.x - self.x)**2)
        return squared_distance < 4.0 * radius**2

class Box(object):
    def __init__(self, size=10):
        self.size = size

    def contains(self, position, radius):
        return position[0] <= self.size - radius and \
            position[0] >= -self.size + radius and \
            position[1] <= self.size - radius and  \
            position[1] >= -self.size + radius


class CollisionRules(object):
    def __init__(self, box=None, eps=1.0, radius=1.0):
        self.tolerance = 0.0e-10
        self.eps = eps
        self.radius = radius
        self.box = box if box else Box()

    def collide(self, particle_i, particle_j):
        x_ij = particle_i.x - particle_j.x
        v_ij = particle_i.v - particle_j.v
        unit_vector = x_ij / np.linalg.norm(x_ij)
        h = (1 + self.eps) * np.dot(v_ij, unit_vector) / 2.0
        particle_i.v -= h * unit_vector
        particle_j.v += h * unit_vector

    def collide_with_wall(self, particle):
        if particle.x[1] < 0:
            if particle.x[1] > particle.x[0] or particle.x[1] > -particle.x[0]:
                particle.v[0] = -particle.v[0]
            else:
                particle.v[1] = -particle.v[1]
        else:
            if particle.x[1] < -particle.x[0] or particle.x[1] < particle.x[0]:
                particle.v[0] = -particle.v[0]
            else:
                particle.v[1] = -particle.v[1]

    def time_to_next_collision(self, particle_i, particle_j):
        x_ij = particle_i.x - particle_j.x
        v_ij = particle_i.v - particle_j.v
        x_ij_dot_v_ij = np.dot(x_ij, v_ij)

        if x_ij_dot_v_ij > 0:
            return float('inf')

        squared_distance = np.sum(x_ij ** 2.0)

        # all particles with same radius
        if squared_distance >= 4.0 * (self.radius ** 2):
            radius = self.radius
        else:
            radius = self.radius - self.tolerance

        q = squared_distance - 4.0 * radius ** 2
        squared_velociy_difference = np.sum(v_ij ** 2)
        w = x_ij_dot_v_ij ** 2 - q * squared_velociy_difference

        if w < 0:
            return float('inf')

        delta_t = q / (-x_ij_dot_v_ij + math.sqrt(w))
        x_i = particle_i.x + delta_t * particle_i.v

        if not self.box.contains(x_i, self.radius):
            return float('inf')

        x_j = particle_j.x + delta_t * particle_j.v

        if not self.box.contains(x_j, self.radius):
            return float('inf')
        return delta_t

    def time_to_next_collision_with_wall(self, part):
        radius = self.radius
        if any(np.abs(part.x - self.box.size + self.radius) > 0):
            radius -= self.tolerance

        result = min(self._compute_dt(part, 0), self._compute_dt(part, 1))
        return result

    def _compute_dt(self, particle, index):
        dtime = float('inf')

        if particle.v[index] > 0:
            dtime = (self.box.size - (particle.x[index] + self.radius)) /\
                particle.v[index]
        elif particle.v[index] < 0:
            dtime = (self.box.size + (particle.x[index] - self.radius)) /\
                -particle.v[index]
        return dtime


class Simulation(object):
    def __init__(self, particles, collision_rules=None, nprint=1):
        self.particles = particles
        if collision_rules is None:
            collision_rules = CollisionRules()
        self.collision_rules = collision_rules
        self.events = dict()
        self.time = 0
        self.update_particles()
        self.nprint = nprint
        self.t_events = []

    def update_particles(self):
        for particle in self.particles:
            self.update(particle)

    def kinenergy(self):
        return sum(np.sum(particle.v ** 2) for particle in self.particles)

    def update(self, particle):
        for time in particle.event_times:
            if time in self.events:
                del self.events[time]
        particle.event_times = []
        for other_particle in self.particles:
            if other_particle != particle:
                dt = self.collision_rules.time_to_next_collision(particle,
                    other_particle)
                if dt < float('inf'):
                    time_coll = self.time + dt
                    self.events[time_coll] = (particle, other_particle)
                    particle.event_times.append(time_coll)
                    other_particle.event_times.append(time_coll)
        dt = self.collision_rules.time_to_next_collision_with_wall(particle)
        # if dt >= 0:
        if dt < float('inf'):
            time_coll = self.time + dt
            self.events[time_coll] = (particle, None)
            particle.event_times.append(time_coll)

    def move_particles(self, delta_t):
        for particle in self.particles:
            particle.move(delta_t)

    def run(self, steps=10):
        for i in xrange(steps):
            t_next_event = min(self.events.keys())
            next_event = self.events[t_next_event]

            if i % self.nprint == 0:
                print i,  self.time
                
            self.t_events.append(self.time)

            if next_event[1] is None:
                delta_t = self.collision_rules.time_to_next_collision_with_wall(next_event[0])
                self.move_particles(delta_t)
                self.time = t_next_event
                self.collision_rules.collide_with_wall(next_event[0])
                self.update(next_event[0])
            else:
                delta_t = self.collision_rules.time_to_next_collision(next_event[0], next_event[1])
                self.move_particles(delta_t)
                self.time = t_next_event
                self.collision_rules.collide(next_event[0], next_event[1])
                self.update(next_event[0])
                self.update(next_event[1])


def load_initial_particles(coordinates_file_path, velocities_file_path):
    coords = np.loadtxt(coordinates_file_path)
    vels = np.loadtxt(velocities_file_path)
    particles = [Particle(x, v) for x, v in itertools.izip(coords, vels)]
    return particles

def create_random_particles(seed, box, num_particles, radius, minimium_velocity, maximum_velocity):
    particles = []
    np.random.seed(seed)
    coords_upper_bound = box.size - radius
    coords_lower_bound = -box.size + radius
    for i in xrange(0, num_particles):
        overlap = True
        while(overlap):   
            x = np.random.uniform(coords_lower_bound, coords_upper_bound, 2)
            v = np.random.uniform(minimium_velocity, maximum_velocity, 2)
            new_particle = Particle(x, v)
            overlap = is_there_overlap(new_particle, particles, radius)    
        particles.append(new_particle)
    return particles

def is_there_overlap(new_particle, particles, radius):
    for particle in particles:
        if new_particle.overlaps_with(particle, radius):
            return True
    return False

def create_simulation_from_files(coords_filename, vels_filename, box_size, eps):
    particles = load_initial_particles(coords_filename, vels_filename)
    rules = CollisionRules(Box(box_size), eps)
    return Simulation(particles, rules)

def create_simulation(num_particles, seed, radius, box_size, eps, minimium_velocity, maximum_velocity):
    box = Box(box_size)
    particles = create_random_particles(seed, box, num_particles, radius, minimium_velocity, maximum_velocity)
    rules = CollisionRules(box, eps, radius)
    return Simulation(particles, rules)


if __name__ == '__main__':
    import sys
    coords_filename = "./tests/data/coords_ini_test_update.txt"
    if len(sys.argv) > 1:
        coords_filename = sys.argv[1]
    vels_filename = "./tests/data/v_ini_test_update.txt"
    if len(sys.argv) > 2:
        vels_filename = sys.argv[2]
    sim = create_simulation_from_files(coords_filename, vels_filename, box_size=10)
    sim.run(100)
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.