Implementing REINFORCE Policy Gradient for CartPole-v1
############################### lAB 9 ####################3
import gymnasium as gym
import numpy as np
import tensorflow as tf
# Create the environment
env = gym.Make("CartPole-v1")
# Define a simple neural network model for the policy
model = tf.Keras.Sequential([
tf.Keras.Layers.Dense(16, activation='relu', input_shape=(env.Observation_space.Shape[0],)),
tf.Keras.Layers.Dense(env.Action_space.N, activation='softmax')
])
# Define the optimizer
optimizer = tf.Keras.Optimizers.Adam(learning_rate=0.01)
# Function to choose an action based on the current policy
def choose_action(state):
"""
Chooses an action based on the probabilities output by the policy model.
Args:
state (np.Array): The current observation/state from the environment.
Returns:
int: The chosen action.
"""
state = np.Expand_dims(state, axis=0) # Add batch dimension
probs = model(state).Numpy()[0] # Get action probabilities
return np.Random.Choice(env.Action_space.N, p=probs) # Sample action
# Train the agent using the REINFORCE algorithm
print("Starting REINFORCE training for CartPole-v1...")
for episode in range(200): # Total episodes
state, _ = env.Reset() # Reset the environment
done = False
states, actions, rewards = [], [], []
# Collect data for one episode
while not done:
action = choose_action(state)
next_state, reward, done, truncated, _ = env.Step(action)
# Save transition
states.Append(state)
actions.Append(action)
rewards.Append(reward)
state = next_state
if truncated:
done = True
# Compute discounted returns
G = 0
returns = []
for r in reversed(rewards):
G = r + 0.99 * G
returns.Insert(0, G)
returns = np.Array(returns)
returns = (returns - np.Mean(returns)) / (np.Std(returns) + 1e-8) # Normalize
# Update the policy
with tf.GradientTape() as tape:
loss = 0
for i in range(len(states)):
state_input = np.Expand_dims(states[i], axis=0)
probs = model(state_input)
log_prob = tf.Math.Log(probs[0, actions[i]] + 1e-8)
loss += -log_prob * returns[i]
grads = tape.gradient(loss, model.Trainable_variables)
optimizer.Apply_gradients(zip(grads, model.Trainable_variables))
# Print episode summary
if (episode + 1) % 20 == 0:
print(f"Episode {episode+1}: Total Reward = {sum(rewards)}")
env.Close()
print("\nREINFORCE training finished.")
English with a size of 3.14 KB