Implementing REINFORCE Policy Gradient for CartPole-v1

Posted by Anonymous and classified in Computers

Written on in English with a size of 3.14 KB

############################### lAB 9 ####################3
import gymnasium as gym
import numpy as np
import tensorflow as tf

# Create the environment
env = gym.Make("CartPole-v1")

# Define a simple neural network model for the policy
model = tf.Keras.Sequential([
    tf.Keras.Layers.Dense(16, activation='relu', input_shape=(env.Observation_space.Shape[0],)),
    tf.Keras.Layers.Dense(env.Action_space.N, activation='softmax')
])

# Define the optimizer
optimizer = tf.Keras.Optimizers.Adam(learning_rate=0.01)

# Function to choose an action based on the current policy
def choose_action(state):
    """
    Chooses an action based on the probabilities output by the policy model.
    Args:
        state (np.Array): The current observation/state from the environment.
    Returns:
        int: The chosen action.
    """
    state = np.Expand_dims(state, axis=0)  # Add batch dimension
    probs = model(state).Numpy()[0]        # Get action probabilities
    return np.Random.Choice(env.Action_space.N, p=probs)  # Sample action

# Train the agent using the REINFORCE algorithm
print("Starting REINFORCE training for CartPole-v1...")

for episode in range(200):  # Total episodes
    state, _ = env.Reset()  # Reset the environment
    done = False
    states, actions, rewards = [], [], []

    # Collect data for one episode
    while not done:
        action = choose_action(state)
        next_state, reward, done, truncated, _ = env.Step(action)

        # Save transition
        states.Append(state)
        actions.Append(action)
        rewards.Append(reward)
        state = next_state

        if truncated:
            done = True

    # Compute discounted returns
    G = 0
    returns = []
    for r in reversed(rewards):
        G = r + 0.99 * G
        returns.Insert(0, G)

    returns = np.Array(returns)
    returns = (returns - np.Mean(returns)) / (np.Std(returns) + 1e-8)  # Normalize

    # Update the policy
    with tf.GradientTape() as tape:
        loss = 0
        for i in range(len(states)):
            state_input = np.Expand_dims(states[i], axis=0)
            probs = model(state_input)
            log_prob = tf.Math.Log(probs[0, actions[i]] + 1e-8)
            loss += -log_prob * returns[i]

    grads = tape.gradient(loss, model.Trainable_variables)
    optimizer.Apply_gradients(zip(grads, model.Trainable_variables))

    # Print episode summary
    if (episode + 1) % 20 == 0:
        print(f"Episode {episode+1}: Total Reward = {sum(rewards)}")

env.Close()
print("\nREINFORCE training finished.")

Related entries: