06: Practical Exercises & Implementationsยถ
โThe only way to learn a new programming language is by writing programs in it.โ - Dennis Ritchie
Welcome to the hands-on reinforcement learning workshop! This notebook contains practical exercises and implementations to solidify your understanding of RL concepts.
๐ฏ Learning Objectivesยถ
By completing this notebook, youโll:
Implement core RL algorithms from scratch
Solve classic RL problems
Debug and tune RL training
Apply RL to real-world scenarios
Build confidence in RL implementation
๐ Exercise 1: Tabular Q-Learning on Frozen Lakeยถ
Goal: Implement Q-learning to solve the Frozen Lake environment
Environment: 4x4 grid world where:
S: Start position
F: Frozen surface (safe)
H: Hole (game over)
G: Goal (win)
Actions: 0=Left, 1=Down, 2=Right, 3=Up
Rewards:
Goal: +1
Hole: 0
Frozen: 0
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import gymnasium as gym
from collections import defaultdict
import random
from typing import Dict, List, Tuple
import warnings
warnings.filterwarnings('ignore')
# Set up plotting
plt.style.use('default')
sns.set_palette("husl")
plt.rcParams['figure.figsize'] = [12, 8]
# Set random seeds for reproducibility
np.random.seed(42)
random.seed(42)
class QLearningAgent:
"""Tabular Q-Learning Agent for Frozen Lake"""
def __init__(self, env, alpha=0.1, gamma=0.99, epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01):
self.env = env
self.alpha = alpha # Learning rate
self.gamma = gamma # Discount factor
self.epsilon = epsilon # Exploration rate
self.epsilon_decay = epsilon_decay
self.epsilon_min = epsilon_min
# Initialize Q-table
self.q_table = defaultdict(lambda: np.zeros(env.action_space.n))
# Training statistics
self.episode_rewards = []
self.episode_lengths = []
def get_action(self, state, explore=True):
"""Choose action using epsilon-greedy policy"""
if explore and np.random.rand() < self.epsilon:
return self.env.action_space.sample() # Random action
else:
return np.argmax(self.q_table[state]) # Best action
def update_q(self, state, action, reward, next_state, done):
"""Update Q-value using Q-learning update rule"""
current_q = self.q_table[state][action]
if done:
target = reward
else:
target = reward + self.gamma * np.max(self.q_table[next_state])
# Q-learning update
self.q_table[state][action] = current_q + self.alpha * (target - current_q)
def train_episode(self):
"""Run one training episode"""
state, _ = self.env.reset()
total_reward = 0
steps = 0
done = False
while not done and steps < 100:
# Choose action
action = self.get_action(state, explore=True)
# Take action
next_state, reward, terminated, truncated, _ = self.env.step(action)
done = terminated or truncated
# Update Q-table
self.update_q(state, action, reward, next_state, done)
# Update state and counters
state = next_state
total_reward += reward
steps += 1
# Decay epsilon
self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
self.episode_rewards.append(total_reward)
self.episode_lengths.append(steps)
return total_reward, steps
def train(self, num_episodes=1000):
"""Train the agent"""
print("Training Q-Learning agent on Frozen Lake...")
for episode in range(num_episodes):
reward, length = self.train_episode()
if (episode + 1) % 100 == 0:
avg_reward = np.mean(self.episode_rewards[-100:])
print(f"Episode {episode+1}/{num_episodes}, Avg Reward: {avg_reward:.3f}, Epsilon: {self.epsilon:.3f}")
def evaluate(self, num_episodes=100):
"""Evaluate trained agent (no exploration)"""
print("\nEvaluating trained agent...")
eval_rewards = []
eval_lengths = []
successes = 0
for episode in range(num_episodes):
state, _ = self.env.reset()
total_reward = 0
steps = 0
done = False
while not done and steps < 100:
action = self.get_action(state, explore=False) # No exploration
next_state, reward, terminated, truncated, _ = self.env.step(action)
done = terminated or truncated
state = next_state
total_reward += reward
steps += 1
eval_rewards.append(total_reward)
eval_lengths.append(steps)
if total_reward == 1.0: # Reached goal
successes += 1
avg_reward = np.mean(eval_rewards)
avg_length = np.mean(eval_lengths)
success_rate = successes / num_episodes
print(f"Evaluation Results:")
print(f" Average Reward: {avg_reward:.3f}")
print(f" Average Episode Length: {avg_length:.1f}")
print(f" Success Rate: {success_rate:.1%}")
return avg_reward, avg_length, success_rate
def visualize_q_table(self):
"""Visualize the learned Q-table"""
# Frozen Lake is 4x4 = 16 states
q_values = np.zeros((4, 4, 4)) # 4x4 grid, 4 actions
for state in range(16):
row, col = state // 4, state % 4
q_values[row, col] = self.q_table[state]
# Plot Q-values for each action
action_names = ['โ Left', 'โ Down', 'โ Right', 'โ Up']
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
for i, (ax, action_name) in enumerate(zip(axes.flat, action_names)):
im = ax.imshow(q_values[:, :, i], cmap='viridis', origin='upper')
ax.set_title(f'Q-Values: {action_name}')
ax.set_xticks(range(4))
ax.set_yticks(range(4))
# Add Q-values as text
for r in range(4):
for c in range(4):
ax.text(c, r, f'{q_values[r, c, i]:.2f}',
ha='center', va='center', fontsize=8)
plt.colorbar(im, ax=ax, shrink=0.8)
plt.tight_layout()
plt.show()
# Create environment
env = gym.make('FrozenLake-v1', is_slippery=False) # Deterministic version first
# Train agent
agent = QLearningAgent(env, alpha=0.1, gamma=0.99, epsilon=1.0)
agent.train(num_episodes=2000)
# Evaluate
agent.evaluate(num_episodes=100)
# Visualize Q-table
agent.visualize_q_table()
๐ Exercise 2: Deep Q-Network on CartPoleยถ
Goal: Implement DQN to solve the CartPole balancing task
Environment: Balance a pole on a cart
State: [cart_position, cart_velocity, pole_angle, pole_velocity]
Actions: 0=Push left, 1=Push right
Reward: +1 for each timestep pole stays upright
Goal: Keep pole balanced for 500 timesteps
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
import random
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
class DQN(nn.Module):
"""Deep Q-Network for CartPole"""
def __init__(self, input_size, hidden_size, output_size):
super(DQN, self).__init__()
self.network = nn.Sequential(
nn.Linear(input_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, output_size)
)
def forward(self, x):
return self.network(x)
class DQNCartPoleAgent:
"""DQN Agent for CartPole"""
def __init__(self, env, hidden_size=128, learning_rate=1e-3, gamma=0.99,
epsilon_start=1.0, epsilon_end=0.01, epsilon_decay=0.995,
buffer_size=10000, batch_size=64, target_update_freq=10):
self.env = env
self.gamma = gamma
self.epsilon = epsilon_start
self.epsilon_end = epsilon_end
self.epsilon_decay = epsilon_decay
self.batch_size = batch_size
self.target_update_freq = target_update_freq
# Networks
input_size = env.observation_space.shape[0]
output_size = env.action_space.n
self.policy_net = DQN(input_size, hidden_size, output_size).to(device)
self.target_net = DQN(input_size, hidden_size, output_size).to(device)
self.target_net.load_state_dict(self.policy_net.state_dict())
self.target_net.eval()
# Optimizer
self.optimizer = optim.Adam(self.policy_net.parameters(), lr=learning_rate)
# Replay buffer
self.replay_buffer = deque(maxlen=buffer_size)
# Training stats
self.episode_rewards = []
self.episode_lengths = []
self.losses = []
self.training_step = 0
def get_action(self, state, explore=True):
"""Select action using epsilon-greedy policy"""
if explore and random.random() < self.epsilon:
return self.env.action_space.sample()
else:
with torch.no_grad():
state_tensor = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
q_values = self.policy_net(state_tensor)
return torch.argmax(q_values).item()
def optimize_model(self):
"""Perform one optimization step"""
if len(self.replay_buffer) < self.batch_size:
return
# Sample batch
batch = random.sample(self.replay_buffer, self.batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
# Convert to tensors
states = torch.tensor(states, dtype=torch.float32, device=device)
actions = torch.tensor(actions, dtype=torch.long, device=device)
rewards = torch.tensor(rewards, dtype=torch.float32, device=device)
next_states = torch.tensor(next_states, dtype=torch.float32, device=device)
dones = torch.tensor(dones, dtype=torch.float32, device=device)
# Compute Q(s,a)
q_values = self.policy_net(states).gather(1, actions.unsqueeze(1)).squeeze(1)
# Compute target Q-values
with torch.no_grad():
next_q_values = self.target_net(next_states).max(1)[0]
target_q_values = rewards + self.gamma * next_q_values * (1 - dones)
# Compute loss
loss = F.mse_loss(q_values, target_q_values)
# Optimize
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
self.losses.append(loss.item())
return loss.item()
def train_episode(self):
"""Run one training episode"""
state, _ = self.env.reset()
total_reward = 0
steps = 0
done = False
while not done and steps < 500:
# Select action
action = self.get_action(state, explore=True)
# Take action
next_state, reward, terminated, truncated, _ = self.env.step(action)
done = terminated or truncated
# Store experience
self.replay_buffer.append((state, action, reward, next_state, done))
# Optimize model
loss = self.optimize_model()
# Update target network
self.training_step += 1
if self.training_step % self.target_update_freq == 0:
self.target_net.load_state_dict(self.policy_net.state_dict())
# Update state and counters
state = next_state
total_reward += reward
steps += 1
# Decay epsilon
self.epsilon = max(self.epsilon_end, self.epsilon * self.epsilon_decay)
self.episode_rewards.append(total_reward)
self.episode_lengths.append(steps)
return total_reward, steps
def train(self, num_episodes=500):
"""Train the agent"""
print("Training DQN agent on CartPole...")
for episode in range(num_episodes):
reward, length = self.train_episode()
if (episode + 1) % 50 == 0:
avg_reward = np.mean(self.episode_rewards[-50:])
print(f"Episode {episode+1}/{num_episodes}, Avg Reward: {avg_reward:.1f}, Epsilon: {self.epsilon:.3f}")
# Check if solved
if avg_reward >= 195:
print(f"๐ CartPole solved in {episode+1} episodes!")
break
def evaluate(self, num_episodes=10):
"""Evaluate trained agent"""
print("\nEvaluating trained agent...")
eval_rewards = []
for episode in range(num_episodes):
state, _ = self.env.reset()
total_reward = 0
done = False
steps = 0
while not done and steps < 500:
action = self.get_action(state, explore=False)
next_state, reward, terminated, truncated, _ = self.env.step(action)
done = terminated or truncated
state = next_state
total_reward += reward
steps += 1
eval_rewards.append(total_reward)
print(f"Episode {episode+1}: {total_reward} steps")
avg_reward = np.mean(eval_rewards)
print(f"\nAverage evaluation reward: {avg_reward:.1f}")
return avg_reward
# Create environment
env = gym.make('CartPole-v1')
# Train DQN agent
dqn_agent = DQNCartPoleAgent(env)
dqn_agent.train(num_episodes=500)
# Evaluate
dqn_agent.evaluate(num_episodes=5)
# Plot training progress
plt.figure(figsize=(12, 6))
plt.plot(dqn_agent.episode_rewards, alpha=0.7)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('DQN Training on CartPole')
plt.grid(True, alpha=0.3)
plt.show()
๐ Exercise 3: Policy Gradient on Lunar Landerยถ
Goal: Implement REINFORCE to solve the Lunar Lander environment
Environment: Land a spacecraft safely on the moon
State: 8-dimensional continuous state
Actions: 4 discrete actions (do nothing, fire left, fire main, fire right)
Reward: Based on landing quality, fuel efficiency, etc.
Goal: Achieve 200+ average reward
class PolicyNetwork(nn.Module):
"""Policy network for Lunar Lander"""
def __init__(self, input_size, hidden_size, output_size):
super(PolicyNetwork, self).__init__()
self.network = nn.Sequential(
nn.Linear(input_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, output_size),
nn.Softmax(dim=-1)
)
def forward(self, x):
return self.network(x)
def get_action(self, state):
"""Sample action from policy"""
probs = self.forward(state)
dist = torch.distributions.Categorical(probs)
action = dist.sample()
return action.item(), dist.log_prob(action)
class REINFORCEAgent:
"""REINFORCE Agent for Lunar Lander"""
def __init__(self, env, hidden_size=128, learning_rate=1e-3, gamma=0.99):
self.env = env
self.gamma = gamma
# Policy network
input_size = env.observation_space.shape[0]
output_size = env.action_space.n
self.policy_net = PolicyNetwork(input_size, hidden_size, output_size).to(device)
# Optimizer
self.optimizer = optim.Adam(self.policy_net.parameters(), lr=learning_rate)
# Training stats
self.episode_rewards = []
self.episode_lengths = []
self.losses = []
def compute_returns(self, rewards):
"""Compute discounted returns"""
returns = []
G = 0
for reward in reversed(rewards):
G = reward + self.gamma * G
returns.insert(0, G)
return torch.tensor(returns, dtype=torch.float32, device=device)
def train_episode(self):
"""Run one training episode"""
states = []
actions = []
rewards = []
log_probs = []
state, _ = self.env.reset()
done = False
steps = 0
while not done and steps < 1000:
state_tensor = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
# Get action
action, log_prob = self.policy_net.get_action(state_tensor)
# Take action
next_state, reward, terminated, truncated, _ = self.env.step(action)
done = terminated or truncated
# Store experience
states.append(state)
actions.append(action)
rewards.append(reward)
log_probs.append(log_prob)
state = next_state
steps += 1
# Compute returns
returns = self.compute_returns(rewards)
# Normalize returns
if len(returns) > 1:
returns = (returns - returns.mean()) / (returns.std() + 1e-8)
# Compute policy loss
policy_loss = []
for log_prob, G in zip(log_probs, returns):
policy_loss.append(-log_prob * G)
policy_loss = torch.stack(policy_loss).sum()
# Update policy
self.optimizer.zero_grad()
policy_loss.backward()
self.optimizer.step()
total_reward = sum(rewards)
self.episode_rewards.append(total_reward)
self.episode_lengths.append(steps)
self.losses.append(policy_loss.item())
return total_reward, steps
def train(self, num_episodes=1000):
"""Train the agent"""
print("Training REINFORCE agent on Lunar Lander...")
for episode in range(num_episodes):
reward, length = self.train_episode()
if (episode + 1) % 50 == 0:
avg_reward = np.mean(self.episode_rewards[-50:])
print(f"Episode {episode+1}/{num_episodes}, Avg Reward: {avg_reward:.1f}")
# Check if solved
if avg_reward >= 200:
print(f"๐ Lunar Lander solved in {episode+1} episodes!")
break
def evaluate(self, num_episodes=10):
"""Evaluate trained agent"""
print("\nEvaluating trained agent...")
eval_rewards = []
for episode in range(num_episodes):
state, _ = self.env.reset()
total_reward = 0
done = False
steps = 0
while not done and steps < 1000:
state_tensor = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
with torch.no_grad():
probs = self.policy_net(state_tensor)
action = torch.argmax(probs).item()
next_state, reward, terminated, truncated, _ = self.env.step(action)
done = terminated or truncated
state = next_state
total_reward += reward
steps += 1
eval_rewards.append(total_reward)
print(f"Episode {episode+1}: {total_reward:.1f} reward")
avg_reward = np.mean(eval_rewards)
print(f"\nAverage evaluation reward: {avg_reward:.1f}")
return avg_reward
# Create environment
env = gym.make('LunarLander-v2')
# Train REINFORCE agent
reinforce_agent = REINFORCEAgent(env)
reinforce_agent.train(num_episodes=1000)
# Evaluate
reinforce_agent.evaluate(num_episodes=5)
# Plot training progress
plt.figure(figsize=(12, 6))
plt.plot(reinforce_agent.episode_rewards, alpha=0.7)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('REINFORCE Training on Lunar Lander')
plt.grid(True, alpha=0.3)
plt.show()
๐ Exercise 4: Custom Environment - Mountain Carยถ
Goal: Create and solve a custom RL environment
Task: Modify the Mountain Car environment to make it more challenging
Add random wind forces
Change reward structure
Implement your own RL solution
import gymnasium as gym
from gymnasium import spaces
import numpy as np
class WindyMountainCar(gym.Env):
"""Modified Mountain Car with wind forces"""
def __init__(self, wind_strength=0.1):
super(WindyMountainCar, self).__init__()
self.wind_strength = wind_strength
# Action space: push left, no push, push right
self.action_space = spaces.Discrete(3)
# Observation space: position, velocity
self.observation_space = spaces.Box(
low=np.array([-1.2, -0.07]),
high=np.array([0.6, 0.07]),
dtype=np.float32
)
self.reset()
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self.position = -0.5
self.velocity = 0.0
self.steps = 0
return np.array([self.position, self.velocity], dtype=np.float32), {}
def step(self, action):
# Standard Mountain Car physics
force = action - 1 # -1, 0, 1
self.velocity += force * 0.001 + np.cos(3 * self.position) * (-0.0025)
self.velocity = np.clip(self.velocity, -0.07, 0.07)
self.position += self.velocity
self.position = np.clip(self.position, -1.2, 0.6)
# Add wind force
wind_force = np.random.normal(0, self.wind_strength)
self.velocity += wind_force
self.velocity = np.clip(self.velocity, -0.07, 0.07)
self.position += self.velocity
self.position = np.clip(self.position, -1.2, 0.6)
# Reward: distance to goal + small penalty for wind
reward = (self.position + 1.2) / 1.8 # 0 to 1 based on progress
reward -= abs(wind_force) * 0.1 # Penalty for wind
# Check termination
terminated = self.position >= 0.5
truncated = self.steps >= 200
self.steps += 1
if terminated:
reward += 10 # Bonus for reaching goal
return np.array([self.position, self.velocity], dtype=np.float32), reward, terminated, truncated, {}
# Test the custom environment
env = WindyMountainCar(wind_strength=0.05)
print("Testing Windy Mountain Car environment...")
# Run a few random episodes
for episode in range(3):
state, _ = env.reset()
total_reward = 0
done = False
steps = 0
while not done and steps < 50:
action = env.action_space.sample()
next_state, reward, terminated, truncated, _ = env.step(action)
done = terminated or truncated
total_reward += reward
state = next_state
steps += 1
print(f"Episode {episode+1}: {steps} steps, reward: {total_reward:.2f}")
print("\nNow implement your RL solution for this environment!")
# TODO: Implement DQN or Policy Gradient agent for WindyMountainCar
## ๐ Exercise 5: Hyperparameter Tuning
**Goal**: Systematically tune hyperparameters for RL algorithms
**Task**: Compare different hyperparameter configurations on a fixed environment
def hyperparameter_study():
"""Compare different hyperparameter configurations"""
env = gym.make('CartPole-v1')
# Different hyperparameter configurations
configs = [
{'name': 'Conservative', 'lr': 1e-4, 'gamma': 0.95, 'epsilon_decay': 0.99},
{'name': 'Aggressive', 'lr': 1e-2, 'gamma': 0.99, 'epsilon_decay': 0.9},
{'name': 'Balanced', 'lr': 1e-3, 'gamma': 0.99, 'epsilon_decay': 0.95},
{'name': 'Slow Decay', 'lr': 1e-3, 'gamma': 0.99, 'epsilon_decay': 0.999},
]
results = {}
for config in configs:
print(f"\nTesting configuration: {config['name']}")
# Create agent with this configuration
agent = DQNCartPoleAgent(
env,
learning_rate=config['lr'],
gamma=config['gamma'],
epsilon_decay=config['epsilon_decay']
)
# Train for fewer episodes for quick comparison
agent.train(num_episodes=200)
# Evaluate
eval_reward = agent.evaluate(num_episodes=5)
results[config['name']] = {
'final_avg_reward': np.mean(agent.episode_rewards[-50:]),
'eval_reward': eval_reward,
'rewards': agent.episode_rewards
}
# Plot comparison
plt.figure(figsize=(12, 8))
for name, data in results.items():
plt.plot(data['rewards'], label=f"{name} (final: {data['final_avg_reward']:.1f}, eval: {data['eval_reward']:.1f})", alpha=0.7)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Hyperparameter Comparison on CartPole')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
# Print summary
print("\nHyperparameter Study Results:")
print("=" * 50)
for name, data in results.items():
print(f"{name}:")
print(f" Final Training Avg: {data['final_avg_reward']:.1f}")
print(f" Evaluation Avg: {data['eval_reward']:.1f}")
print()
# Run hyperparameter study
hyperparameter_study()
๐ Exercise 6: Real-World Application - Stock Tradingยถ
Goal: Apply RL to a simplified stock trading environment
Environment:
State: Current stock price, portfolio value, trend indicators
Actions: Buy, Sell, Hold
Reward: Portfolio returns minus transaction costs
Goal: Maximize long-term returns
class StockTradingEnv(gym.Env):
"""Simplified stock trading environment"""
def __init__(self, initial_balance=10000, transaction_cost=0.001):
super(StockTradingEnv, self).__init__()
self.initial_balance = initial_balance
self.transaction_cost = transaction_cost
# Generate synthetic stock prices (random walk with trend)
self.generate_stock_data()
# Action space: 0=Hold, 1=Buy, 2=Sell
self.action_space = spaces.Discrete(3)
# Observation space: [price, balance, shares, trend]
self.observation_space = spaces.Box(
low=np.array([0, 0, 0, -1]),
high=np.array([200, 100000, 1000, 1]),
dtype=np.float32
)
def generate_stock_data(self):
"""Generate synthetic stock price data"""
np.random.seed(42)
n_steps = 1000
# Random walk with drift
prices = [100.0]
for i in range(n_steps - 1):
change = np.random.normal(0.001, 0.02) # Small drift, volatility
new_price = prices[-1] * (1 + change)
prices.append(max(new_price, 1.0)) # Floor at $1
self.prices = np.array(prices)
self.max_steps = len(prices)
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self.current_step = 0
self.balance = self.initial_balance
self.shares = 0
self.done = False
return self._get_observation(), {}
def _get_observation(self):
"""Get current observation"""
price = self.prices[self.current_step]
# Simple trend indicator (price change over last 5 steps)
if self.current_step >= 5:
trend = (price - self.prices[self.current_step - 5]) / self.prices[self.current_step - 5]
else:
trend = 0.0
return np.array([price, self.balance, self.shares, trend], dtype=np.float32)
def step(self, action):
"""Execute trading action"""
price = self.prices[self.current_step]
portfolio_value = self.balance + self.shares * price
# Execute action
if action == 1: # Buy
# Buy as many shares as possible with available balance
max_shares = int(self.balance / (price * (1 + self.transaction_cost)))
if max_shares > 0:
cost = max_shares * price * (1 + self.transaction_cost)
self.balance -= cost
self.shares += max_shares
elif action == 2: # Sell
if self.shares > 0:
proceeds = self.shares * price * (1 - self.transaction_cost)
self.balance += proceeds
self.shares = 0
# Move to next time step
self.current_step += 1
# Check if episode is done
done = self.current_step >= self.max_steps - 1
# Calculate reward (change in portfolio value)
new_portfolio_value = self.balance + self.shares * self.prices[self.current_step]
reward = new_portfolio_value - portfolio_value
# Additional reward for holding during uptrends
if action == 0 and self._get_observation()[3] > 0.01: # Positive trend
reward += 1.0
return self._get_observation(), reward, done, False, {}
def render(self):
"""Render current state"""
price = self.prices[self.current_step]
portfolio_value = self.balance + self.shares * price
print(f"Step: {self.current_step}, Price: ${price:.2f}, "
f"Balance: ${self.balance:.2f}, Shares: {self.shares}, "
f"Portfolio: ${portfolio_value:.2f}")
# Test the trading environment
trading_env = StockTradingEnv()
print("Testing Stock Trading Environment...")
# Run a random trading episode
state, _ = trading_env.reset()
total_reward = 0
done = False
while not done:
action = trading_env.action_space.sample()
next_state, reward, done, _, _ = trading_env.step(action)
total_reward += reward
state = next_state
final_value = trading_env.balance + trading_env.shares * trading_env.prices[-1]
print(f"Random trading result: ${final_value:.2f} (initial: ${trading_env.initial_balance:.2f})")
print(f"Total reward: {total_reward:.2f}")
print("\nNow implement an RL agent to learn trading strategies!")
# TODO: Implement DQN or Policy Gradient agent for stock trading
๐ Bonus Exercise: Multi-Agent RLยถ
Goal: Implement a simple multi-agent environment and train cooperative agents
Environment: Two agents must cooperate to achieve a goal
State: Positions of both agents and goal
Actions: Move in 4 directions
Reward: Shared reward for reaching goal
Challenge: Agents must coordinate their movements
# TODO: Implement multi-agent environment and training
print("Multi-Agent RL Exercise")
print("- Implement cooperative multi-agent environment")
print("- Train agents using independent Q-learning")
print("- Experiment with communication protocols")
print("- Compare individual vs team rewards")
# This is a challenging advanced exercise - implement it as a personal project!
๐ง Key Takeawaysยถ
Implementation is key: Understanding algorithms theoretically is only half the battle
Debugging RL is hard: Training can be unstable, hyperparameters matter greatly
Start simple: Begin with tabular methods, then move to function approximation
Experiment systematically: Change one thing at a time when tuning
Real applications are messy: Reward design, state representation, and safety matter
๐ Next Stepsยถ
Now that youโve implemented core RL algorithms, youโre ready to:
Explore advanced algorithms: PPO, SAC, Rainbow DQN
Tackle harder environments: Atari games, continuous control
Apply to real problems: Robotics, game AI, recommendation systems
Research current developments: Follow arXiv papers and conferences
Contribute to open source: Improve existing RL libraries
๐๏ธ Final Challengesยถ
Solve Atari Breakout using DQN
Train a robot arm to reach targets (simulation)
Implement AlphaZero for a simple game
Build an RL trading system with real market data
Create a multi-agent system for cooperative tasks
๐ก Pro Tipsยถ
Use existing libraries: Stable Baselines3, Ray RLlib for production
Monitor training: Use TensorBoard or Weights & Biases
Start with Gymnasium: Standard environments for testing
Read papers: Original papers often have implementation details
Join communities: Reddit r/reinforcementlearning, Discord servers
๐ Congratulations!ยถ
Youโve completed the practical reinforcement learning exercises! You now have hands-on experience with:
Tabular Q-Learning
Deep Q-Networks
Policy Gradients
Custom environments
Hyperparameter tuning
Real-world applications
Keep experimenting, keep learning, and most importantly - have fun! ๐ฎ๐ค