# Commits

committed ab0ef6c

Add an MDP solver module with tests.

• Participants
• Parent commits aac06e0
• Branches scent

# File mdp.py

`+# -*- coding: utf-8 -*-`
`+#`
`+#  mdp.py`
`+#  aichallenge-py`
`+#`
`+#  Created by Lars Yencken on 2011-11-13.`
`+#  Copyright 2011 Lars Yencken. All rights reserved.`
`+#`
`+`
`+"""`
`+Solve Markov Decision Processes using value iteration, where the agent is on`
`+a map and can only move in four directions.`
`+"""`
`+`
`+import numpy as np`
`+`
`+import settings`
`+`
`+def value_iteration(reward, torus=False, gamma=settings.MDP_GAMMA,`
`+        eps=settings.MDP_EPS):`
`+    "Solve the MDP problem with the value iteration method."`
`+    value = reward`
`+`
`+    if torus:`
`+        propagate = _propagate_torus`
`+    else:`
`+        propagate = _propagate_plane`
`+`
`+    last_value = value`
`+    value = propagate(reward, reward, gamma)`
`+    while abs(last_value - value).mean() > eps:`
`+        last_value = value`
`+        value = propagate(value, reward, gamma)`
`+`
`+    return value`
`+`
`+def _propagate_plane(value, reward, gamma):`
`+    rows, cols = value.shape`
`+    shape = (4,) + value.shape`
`+    actions = np.zeros(shape, dtype=np.float32)`
`+`
`+    # s`
`+    actions[0, 0:rows-1, :] = value[1:rows]`
`+    # n`
`+    actions[1, 1:rows, :] = value[0:rows-1]`
`+    # e`
`+    actions[2, :, 0:cols-1] = value[:, 1:cols]`
`+    # w`
`+    actions[3, :, 1:cols] = value[:, 0:cols-1]`
`+`
`+    action = np.maximum(actions[0], actions[1])`
`+    action = np.maximum(action, actions[2])`
`+    action = np.maximum(action, actions[3])`
`+`
`+    return reward + gamma * action`
`+`
`+def _propagate_torus(value, reward, gamma=settings.MDP_GAMMA):`
`+    rows, cols = value.shape`
`+`
`+    # each action is an offset on our torus`
`+    shape = (4,) + value.shape`
`+    actions = np.zeros(shape, dtype=np.float32)`
`+`
`+    # s: first row wraps around`
`+    actions[0, 0:rows-1] = value[1:rows]`
`+    actions[0, rows-1] = value[0]`
`+    # n: last row wraps around`
`+    actions[1, 1:rows] = value[0:rows-1]`
`+    actions[1, 0] = value[rows-1]`
`+    # e: first col wraps around`
`+    actions[2, :, 0:cols-1] = value[:, 1:cols]`
`+    actions[2, :, cols-1] = value[:, 0]`
`+    # w: last col wraps around`
`+    actions[3, :, 1:cols] = value[:, 0:cols-1]`
`+    actions[3, :, 0] = value[:, cols-1]`
`+`
`+    action = np.maximum(actions[0], actions[1])`
`+    action = np.maximum(action, actions[2])`
`+    action = np.maximum(action, actions[3])`
`+`
`+    return reward + gamma * action`
`+`

# File settings.py

` DEBUG = True`
` `
` # learning rate`
`-GAMMA = 0.7`
`+MDP_GAMMA = 0.7`
`+`
`+# convergence criterion`
`+MDP_EPS = 1e-3`
` `
` LOG_DIR = 'game_logs'`

# File test_mdp.py

`+# -*- coding: utf-8 -*-`
`+#`
`+#  test_mdp.py`
`+#  aichallenge-py`
`+#`
`+#  Created by Lars Yencken on 2011-11-13.`
`+#  Copyright 2011 Lars Yencken. All rights reserved.`
`+#`
`+`
`+"""`
`+Test cases for solving MDPs.`
`+"""`
`+`
`+import unittest`
`+`
`+import numpy as np`
`+`
`+import mdp`
`+`
`+class MDPPlaneTest(unittest.TestCase):`
`+    def setUp(self):`
`+        pass`
`+`
`+    #def test_single_row(self):`
`+        #reward = np.array([0.0,0.0,0.0,8.0])`
`+        #reward.reshape((1, 4))`
`+        #value = mdp.value_iteration(reward, gamma=0.5, eps=1e-5)`
`+        #expected = np.array([1.0, 2.0, 4.0, 8.0])`
`+        #expected.reshape((1, 4))`
`+        #assert abs(value - expected).mean() < 1e-5`
`+`
`+    def test_square(self):`
`+        reward = np.array([[0, 0, 0], [0, 1, 0], [0, 0, 0]])`
`+        expected = np.array(`
`+                [[ 0.3333, 0.6666, 0.3333],`
`+                [ 0.6666, 1.3333, 0.6666],`
`+                [ 0.3333, 0.6666, 0.3333]]`
`+            )`
`+        value = mdp.value_iteration(reward, gamma=0.5, eps=1e-3)`
`+        assert abs(value - expected).mean() < 1e-3, str(value)`
`+`
`+class MDPTorusTest(unittest.TestCase):`
`+    def test_square(self):`
`+        reward = np.array([[0, 0, 0], [0, 1, 0], [0, 0, 0]])`
`+        expected = np.array([[1, 2, 1], [2, 4, 2], [1, 2, 1]],`
`+                dtype=np.float32) / 3.0`
`+        value = mdp.value_iteration(reward, torus=True, gamma=0.5, eps=1e-5)`
`+        assert abs(value - expected).mean() < 1e-5, str(value)`