Value iteration

To put it in simple terms, in value iteration, we first initialize some random value to the value function. There is a great probability that the random value we initialize is not going to be optimal. So, we iterate over each state and find the new value function; we stop the iteration until we find the optimal value function. Once we find the optimal value function, we can easily extract the optimal policy from that.

Now we will see how to solve the frozen lake problem using value iteration.

First, we import necessary libraries:

import gym
import numpy as np

Then we make our frozen lake environment using OpenAI's Gym:

env = gym.make('FrozenLake-v0')

We will first explore the environments.

The number of states in the environment is 16 as we have a 4*4 grid:

print(env.observation_space.n)

The number of actions in the environment is four, which are up, down, left, and right:

print(env.observation_space.n)

Now we define a value_iteration() function which returns the optimal value function (value table). We will first see the function step by step and then look at the whole function.

First, we initialize the random value table which is 0 for all the states and numbers of iterations:

value_table = np.zeros(env.observation_space.n)
no_of_iterations = 100000

Then, upon starting each iteration, we copy the value_table to updated_value_table:

 for i in range(no_of_iterations):
updated_value_table = np.copy(value_table)

Now we calculate the Q table and pick up the maximum state-action pair which has the highest value as the value table.

We will understand the code with the example we solved previously; we computed the Q value for state A and action 1 in our previous example:

Q(A,1) = (0.3 * (0+0)) + (0.1 * (-1.0 + 0)) + (0.5 + (1.0 + 0))
Q(A,1) = 0.5

Instead of creating a Q table for each state, we create a list called Q_value, then for each action in the state, we create a list called next_states_rewards, which store the Q_value for the next transition state. Then we sum the next_state_rewards and append it to our Q_value.

Look at the preceding example, where the state is A and the action is 1(0.3 * (0+0)) is the next state reward for the transition state A and (0.1 * (-1.0 + 0)) is the next state reward for the transition state B. (0.5 + (1.0 + 0)) is the next state reward for the transition state C. We sum all this as next_state_reward and append it to our Q_value, which would be 0.5.

As we calculate next_state_rewards for all actions of a state and append it to our Q value, we pick up the maximum Q value and update it as a value of our state:

for state in range(env.observation_space.n):
Q_value = []
for action in range(env.action_space.n):
next_states_rewards = []
for next_sr in env.P[state][action]:
trans_prob, next_state, reward_prob, _ = next_sr
next_states_rewards.append((trans_prob * (reward_prob + gamma * updated_value_table[next_state])))

Q_value.append(np.sum(next_states_rewards))

          #Pick up the maximum Q value and update it as value of a state
value_table[state] = max(Q_value)

Then, we will check whether we have reached the convergence, that is, the difference between our value table and updated value table is very small. How do we know it is very small? We define a variable called threshold and then we will see if the difference is less than our threshold; if it is less, we break the loop and return the value function as the optimal value function:

threshold = 1e-20
if (np.sum(np.fabs(updated_value_table - value_table)) <= threshold):
print ('Value-iteration converged at iteration# %d.' %(i+1))
break

Look at the complete function of value_iteration() for a better understanding:

def value_iteration(env, gamma = 1.0):
value_table = np.zeros(env.observation_space.n)
no_of_iterations = 100000
threshold = 1e-20

for i in range(no_of_iterations):
updated_value_table = np.copy(value_table)

for state in range(env.observation_space.n):
Q_value = []

for action in range(env.action_space.n):
next_states_rewards = []

for next_sr in env.P[state][action]:
trans_prob, next_state, reward_prob, _ = next_sr
next_states_rewards.append((trans_prob * (reward_prob + gamma * updated_value_table[next_state])))

Q_value.append(np.sum(next_states_rewards))
value_table[state] = max(Q_value)
if (np.sum(np.fabs(updated_value_table - value_table)) <= threshold):
print ('Value-iteration converged at iteration# %d.' %(i+1))
break

return value_table, Q_value

Thus, we can derive optimal_value_function using the value_iteration:

optimal_value_function = value_iteration(env=env,gamma=1.0)

After finding optimal_value_function, how can we extract the optimal policy from the optimal_value_function?  We calculate the Q value using our optimal value action and pick up the actions which have the highest Q value for each state as the optimal policy. We do this via a function called extract_policy(); we will look at this step by step now.

First, we define the random policy; we define it as 0 for all the states:

policy = np.zeros(env.observation_space.n)

Then, for each state, we build a Q_table and for each action in that state we compute the Q value and add it to our Q_table:

for state in range(env.observation_space.n):
Q_table = np.zeros(env.action_space.n)
for action in range(env.action_space.n):
for next_sr in env.P[state][action]:
trans_prob, next_state, reward_prob, _ = next_sr
Q_table[action] += (trans_prob * (reward_prob + gamma * value_table[next_state]))

Then we pick up the policy for the state as the action that has the highest Q value:

policy[state] = np.argmax(Q_table)

Look at the complete function:

def extract_policy(value_table, gamma = 1.0):

policy = np.zeros(env.observation_space.n)
for state in range(env.observation_space.n):
Q_table = np.zeros(env.action_space.n)
for action in range(env.action_space.n):
for next_sr in env.P[state][action]:
trans_prob, next_state, reward_prob, _ = next_sr
Q_table[action] += (trans_prob * (reward_prob + gamma * value_table[next_state]))
policy[state] = np.argmax(Q_table)

return policy

Thus, we can derive the optimal_policy as follows:

optimal_policy = extract_policy(optimal_value_function, gamma=1.0)

We will get an output as follows, which is the optimal_policy, the actions to be performed in each state:

array([0., 3., 3., 3., 0., 0., 0., 0., 3., 1., 0., 0., 0., 2., 1., 0.])

The complete program is given as follows:

import gym
import numpy as np
env = gym.make('FrozenLake-v0')

def value_iteration(env, gamma = 1.0):
value_table = np.zeros(env.observation_space.n)
no_of_iterations = 100000
threshold = 1e-20
for i in range(no_of_iterations):
updated_value_table = np.copy(value_table)

for state in range(env.observation_space.n):
Q_value = []
for action in range(env.action_space.n):
next_states_rewards = []
for next_sr in env.P[state][action]:
trans_prob, next_state, reward_prob, _ = next_sr
next_states_rewards.append((trans_prob * (reward_prob + gamma * updated_value_table[next_state])))
Q_value.append(np.sum(next_states_rewards))

value_table[state] = max(Q_value)

if (np.sum(np.fabs(updated_value_table - value_table)) <= threshold):
print ('Value-iteration converged at iteration# %d.' %(i+1))
break

return value_table


def extract_policy(value_table, gamma = 1.0):
policy = np.zeros(env.observation_space.n)
for state in range(env.observation_space.n):
Q_table = np.zeros(env.action_space.n)
for action in range(env.action_space.n):
for next_sr in env.P[state][action]:
trans_prob, next_state, reward_prob, _ = next_sr
Q_table[action] += (trans_prob * (reward_prob + gamma * value_table[next_state]))
policy[state] = np.argmax(Q_table)

return policy

optimal_value_function = value_iteration(env=env,gamma=1.0)
optimal_policy = extract_policy(optimal_value_function, gamma=1.0)

print(optimal_policy)