06: Practical Exercises & Implementationsยถ

โ€œThe only way to learn a new programming language is by writing programs in it.โ€ - Dennis Ritchie

Welcome to the hands-on reinforcement learning workshop! This notebook contains practical exercises and implementations to solidify your understanding of RL concepts.

๐ŸŽฏ Learning Objectivesยถ

By completing this notebook, youโ€™ll:

  • Implement core RL algorithms from scratch

  • Solve classic RL problems

  • Debug and tune RL training

  • Apply RL to real-world scenarios

  • Build confidence in RL implementation

๐Ÿ† Exercise 1: Tabular Q-Learning on Frozen Lakeยถ

Goal: Implement Q-learning to solve the Frozen Lake environment

Environment: 4x4 grid world where:

  • S: Start position

  • F: Frozen surface (safe)

  • H: Hole (game over)

  • G: Goal (win)

Actions: 0=Left, 1=Down, 2=Right, 3=Up

Rewards:

  • Goal: +1

  • Hole: 0

  • Frozen: 0

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import gymnasium as gym
from collections import defaultdict
import random
from typing import Dict, List, Tuple
import warnings
warnings.filterwarnings('ignore')

# Set up plotting
plt.style.use('default')
sns.set_palette("husl")
plt.rcParams['figure.figsize'] = [12, 8]

# Set random seeds for reproducibility
np.random.seed(42)
random.seed(42)
class QLearningAgent:
    """Tabular Q-Learning Agent for Frozen Lake"""
    
    def __init__(self, env, alpha=0.1, gamma=0.99, epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01):
        self.env = env
        self.alpha = alpha  # Learning rate
        self.gamma = gamma  # Discount factor
        self.epsilon = epsilon  # Exploration rate
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        
        # Initialize Q-table
        self.q_table = defaultdict(lambda: np.zeros(env.action_space.n))
        
        # Training statistics
        self.episode_rewards = []
        self.episode_lengths = []
        
    def get_action(self, state, explore=True):
        """Choose action using epsilon-greedy policy"""
        if explore and np.random.rand() < self.epsilon:
            return self.env.action_space.sample()  # Random action
        else:
            return np.argmax(self.q_table[state])  # Best action
    
    def update_q(self, state, action, reward, next_state, done):
        """Update Q-value using Q-learning update rule"""
        current_q = self.q_table[state][action]
        
        if done:
            target = reward
        else:
            target = reward + self.gamma * np.max(self.q_table[next_state])
        
        # Q-learning update
        self.q_table[state][action] = current_q + self.alpha * (target - current_q)
    
    def train_episode(self):
        """Run one training episode"""
        state, _ = self.env.reset()
        total_reward = 0
        steps = 0
        done = False
        
        while not done and steps < 100:
            # Choose action
            action = self.get_action(state, explore=True)
            
            # Take action
            next_state, reward, terminated, truncated, _ = self.env.step(action)
            done = terminated or truncated
            
            # Update Q-table
            self.update_q(state, action, reward, next_state, done)
            
            # Update state and counters
            state = next_state
            total_reward += reward
            steps += 1
            
            # Decay epsilon
            self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
        
        self.episode_rewards.append(total_reward)
        self.episode_lengths.append(steps)
        return total_reward, steps
    
    def train(self, num_episodes=1000):
        """Train the agent"""
        print("Training Q-Learning agent on Frozen Lake...")
        
        for episode in range(num_episodes):
            reward, length = self.train_episode()
            
            if (episode + 1) % 100 == 0:
                avg_reward = np.mean(self.episode_rewards[-100:])
                print(f"Episode {episode+1}/{num_episodes}, Avg Reward: {avg_reward:.3f}, Epsilon: {self.epsilon:.3f}")
    
    def evaluate(self, num_episodes=100):
        """Evaluate trained agent (no exploration)"""
        print("\nEvaluating trained agent...")
        
        eval_rewards = []
        eval_lengths = []
        successes = 0
        
        for episode in range(num_episodes):
            state, _ = self.env.reset()
            total_reward = 0
            steps = 0
            done = False
            
            while not done and steps < 100:
                action = self.get_action(state, explore=False)  # No exploration
                next_state, reward, terminated, truncated, _ = self.env.step(action)
                done = terminated or truncated
                
                state = next_state
                total_reward += reward
                steps += 1
            
            eval_rewards.append(total_reward)
            eval_lengths.append(steps)
            if total_reward == 1.0:  # Reached goal
                successes += 1
        
        avg_reward = np.mean(eval_rewards)
        avg_length = np.mean(eval_lengths)
        success_rate = successes / num_episodes
        
        print(f"Evaluation Results:")
        print(f"  Average Reward: {avg_reward:.3f}")
        print(f"  Average Episode Length: {avg_length:.1f}")
        print(f"  Success Rate: {success_rate:.1%}")
        
        return avg_reward, avg_length, success_rate
    
    def visualize_q_table(self):
        """Visualize the learned Q-table"""
        
        # Frozen Lake is 4x4 = 16 states
        q_values = np.zeros((4, 4, 4))  # 4x4 grid, 4 actions
        
        for state in range(16):
            row, col = state // 4, state % 4
            q_values[row, col] = self.q_table[state]
        
        # Plot Q-values for each action
        action_names = ['โ† Left', 'โ†“ Down', 'โ†’ Right', 'โ†‘ Up']
        
        fig, axes = plt.subplots(2, 2, figsize=(15, 12))
        
        for i, (ax, action_name) in enumerate(zip(axes.flat, action_names)):
            im = ax.imshow(q_values[:, :, i], cmap='viridis', origin='upper')
            ax.set_title(f'Q-Values: {action_name}')
            ax.set_xticks(range(4))
            ax.set_yticks(range(4))
            
            # Add Q-values as text
            for r in range(4):
                for c in range(4):
                    ax.text(c, r, f'{q_values[r, c, i]:.2f}', 
                           ha='center', va='center', fontsize=8)
            
            plt.colorbar(im, ax=ax, shrink=0.8)
        
        plt.tight_layout()
        plt.show()

# Create environment
env = gym.make('FrozenLake-v1', is_slippery=False)  # Deterministic version first

# Train agent
agent = QLearningAgent(env, alpha=0.1, gamma=0.99, epsilon=1.0)
agent.train(num_episodes=2000)

# Evaluate
agent.evaluate(num_episodes=100)

# Visualize Q-table
agent.visualize_q_table()

๐Ÿ† Exercise 2: Deep Q-Network on CartPoleยถ

Goal: Implement DQN to solve the CartPole balancing task

Environment: Balance a pole on a cart

  • State: [cart_position, cart_velocity, pole_angle, pole_velocity]

  • Actions: 0=Push left, 1=Push right

  • Reward: +1 for each timestep pole stays upright

  • Goal: Keep pole balanced for 500 timesteps

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
import random

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

class DQN(nn.Module):
    """Deep Q-Network for CartPole"""
    
    def __init__(self, input_size, hidden_size, output_size):
        super(DQN, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, output_size)
        )
    
    def forward(self, x):
        return self.network(x)

class DQNCartPoleAgent:
    """DQN Agent for CartPole"""
    
    def __init__(self, env, hidden_size=128, learning_rate=1e-3, gamma=0.99,
                 epsilon_start=1.0, epsilon_end=0.01, epsilon_decay=0.995,
                 buffer_size=10000, batch_size=64, target_update_freq=10):
        
        self.env = env
        self.gamma = gamma
        self.epsilon = epsilon_start
        self.epsilon_end = epsilon_end
        self.epsilon_decay = epsilon_decay
        self.batch_size = batch_size
        self.target_update_freq = target_update_freq
        
        # Networks
        input_size = env.observation_space.shape[0]
        output_size = env.action_space.n
        self.policy_net = DQN(input_size, hidden_size, output_size).to(device)
        self.target_net = DQN(input_size, hidden_size, output_size).to(device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()
        
        # Optimizer
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=learning_rate)
        
        # Replay buffer
        self.replay_buffer = deque(maxlen=buffer_size)
        
        # Training stats
        self.episode_rewards = []
        self.episode_lengths = []
        self.losses = []
        self.training_step = 0
    
    def get_action(self, state, explore=True):
        """Select action using epsilon-greedy policy"""
        if explore and random.random() < self.epsilon:
            return self.env.action_space.sample()
        else:
            with torch.no_grad():
                state_tensor = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
                q_values = self.policy_net(state_tensor)
                return torch.argmax(q_values).item()
    
    def optimize_model(self):
        """Perform one optimization step"""
        if len(self.replay_buffer) < self.batch_size:
            return
        
        # Sample batch
        batch = random.sample(self.replay_buffer, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        
        # Convert to tensors
        states = torch.tensor(states, dtype=torch.float32, device=device)
        actions = torch.tensor(actions, dtype=torch.long, device=device)
        rewards = torch.tensor(rewards, dtype=torch.float32, device=device)
        next_states = torch.tensor(next_states, dtype=torch.float32, device=device)
        dones = torch.tensor(dones, dtype=torch.float32, device=device)
        
        # Compute Q(s,a)
        q_values = self.policy_net(states).gather(1, actions.unsqueeze(1)).squeeze(1)
        
        # Compute target Q-values
        with torch.no_grad():
            next_q_values = self.target_net(next_states).max(1)[0]
            target_q_values = rewards + self.gamma * next_q_values * (1 - dones)
        
        # Compute loss
        loss = F.mse_loss(q_values, target_q_values)
        
        # Optimize
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        self.losses.append(loss.item())
        return loss.item()
    
    def train_episode(self):
        """Run one training episode"""
        state, _ = self.env.reset()
        total_reward = 0
        steps = 0
        done = False
        
        while not done and steps < 500:
            # Select action
            action = self.get_action(state, explore=True)
            
            # Take action
            next_state, reward, terminated, truncated, _ = self.env.step(action)
            done = terminated or truncated
            
            # Store experience
            self.replay_buffer.append((state, action, reward, next_state, done))
            
            # Optimize model
            loss = self.optimize_model()
            
            # Update target network
            self.training_step += 1
            if self.training_step % self.target_update_freq == 0:
                self.target_net.load_state_dict(self.policy_net.state_dict())
            
            # Update state and counters
            state = next_state
            total_reward += reward
            steps += 1
            
            # Decay epsilon
            self.epsilon = max(self.epsilon_end, self.epsilon * self.epsilon_decay)
        
        self.episode_rewards.append(total_reward)
        self.episode_lengths.append(steps)
        return total_reward, steps
    
    def train(self, num_episodes=500):
        """Train the agent"""
        print("Training DQN agent on CartPole...")
        
        for episode in range(num_episodes):
            reward, length = self.train_episode()
            
            if (episode + 1) % 50 == 0:
                avg_reward = np.mean(self.episode_rewards[-50:])
                print(f"Episode {episode+1}/{num_episodes}, Avg Reward: {avg_reward:.1f}, Epsilon: {self.epsilon:.3f}")
                
                # Check if solved
                if avg_reward >= 195:
                    print(f"๐ŸŽ‰ CartPole solved in {episode+1} episodes!")
                    break
    
    def evaluate(self, num_episodes=10):
        """Evaluate trained agent"""
        print("\nEvaluating trained agent...")
        
        eval_rewards = []
        
        for episode in range(num_episodes):
            state, _ = self.env.reset()
            total_reward = 0
            done = False
            steps = 0
            
            while not done and steps < 500:
                action = self.get_action(state, explore=False)
                next_state, reward, terminated, truncated, _ = self.env.step(action)
                done = terminated or truncated
                
                state = next_state
                total_reward += reward
                steps += 1
            
            eval_rewards.append(total_reward)
            print(f"Episode {episode+1}: {total_reward} steps")
        
        avg_reward = np.mean(eval_rewards)
        print(f"\nAverage evaluation reward: {avg_reward:.1f}")
        
        return avg_reward

# Create environment
env = gym.make('CartPole-v1')

# Train DQN agent
dqn_agent = DQNCartPoleAgent(env)
dqn_agent.train(num_episodes=500)

# Evaluate
dqn_agent.evaluate(num_episodes=5)

# Plot training progress
plt.figure(figsize=(12, 6))
plt.plot(dqn_agent.episode_rewards, alpha=0.7)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('DQN Training on CartPole')
plt.grid(True, alpha=0.3)
plt.show()

๐Ÿ† Exercise 3: Policy Gradient on Lunar Landerยถ

Goal: Implement REINFORCE to solve the Lunar Lander environment

Environment: Land a spacecraft safely on the moon

  • State: 8-dimensional continuous state

  • Actions: 4 discrete actions (do nothing, fire left, fire main, fire right)

  • Reward: Based on landing quality, fuel efficiency, etc.

  • Goal: Achieve 200+ average reward

class PolicyNetwork(nn.Module):
    """Policy network for Lunar Lander"""
    
    def __init__(self, input_size, hidden_size, output_size):
        super(PolicyNetwork, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, output_size),
            nn.Softmax(dim=-1)
        )
    
    def forward(self, x):
        return self.network(x)
    
    def get_action(self, state):
        """Sample action from policy"""
        probs = self.forward(state)
        dist = torch.distributions.Categorical(probs)
        action = dist.sample()
        return action.item(), dist.log_prob(action)

class REINFORCEAgent:
    """REINFORCE Agent for Lunar Lander"""
    
    def __init__(self, env, hidden_size=128, learning_rate=1e-3, gamma=0.99):
        self.env = env
        self.gamma = gamma
        
        # Policy network
        input_size = env.observation_space.shape[0]
        output_size = env.action_space.n
        self.policy_net = PolicyNetwork(input_size, hidden_size, output_size).to(device)
        
        # Optimizer
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=learning_rate)
        
        # Training stats
        self.episode_rewards = []
        self.episode_lengths = []
        self.losses = []
    
    def compute_returns(self, rewards):
        """Compute discounted returns"""
        returns = []
        G = 0
        for reward in reversed(rewards):
            G = reward + self.gamma * G
            returns.insert(0, G)
        return torch.tensor(returns, dtype=torch.float32, device=device)
    
    def train_episode(self):
        """Run one training episode"""
        states = []
        actions = []
        rewards = []
        log_probs = []
        
        state, _ = self.env.reset()
        done = False
        steps = 0
        
        while not done and steps < 1000:
            state_tensor = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
            
            # Get action
            action, log_prob = self.policy_net.get_action(state_tensor)
            
            # Take action
            next_state, reward, terminated, truncated, _ = self.env.step(action)
            done = terminated or truncated
            
            # Store experience
            states.append(state)
            actions.append(action)
            rewards.append(reward)
            log_probs.append(log_prob)
            
            state = next_state
            steps += 1
        
        # Compute returns
        returns = self.compute_returns(rewards)
        
        # Normalize returns
        if len(returns) > 1:
            returns = (returns - returns.mean()) / (returns.std() + 1e-8)
        
        # Compute policy loss
        policy_loss = []
        for log_prob, G in zip(log_probs, returns):
            policy_loss.append(-log_prob * G)
        
        policy_loss = torch.stack(policy_loss).sum()
        
        # Update policy
        self.optimizer.zero_grad()
        policy_loss.backward()
        self.optimizer.step()
        
        total_reward = sum(rewards)
        self.episode_rewards.append(total_reward)
        self.episode_lengths.append(steps)
        self.losses.append(policy_loss.item())
        
        return total_reward, steps
    
    def train(self, num_episodes=1000):
        """Train the agent"""
        print("Training REINFORCE agent on Lunar Lander...")
        
        for episode in range(num_episodes):
            reward, length = self.train_episode()
            
            if (episode + 1) % 50 == 0:
                avg_reward = np.mean(self.episode_rewards[-50:])
                print(f"Episode {episode+1}/{num_episodes}, Avg Reward: {avg_reward:.1f}")
                
                # Check if solved
                if avg_reward >= 200:
                    print(f"๐ŸŽ‰ Lunar Lander solved in {episode+1} episodes!")
                    break
    
    def evaluate(self, num_episodes=10):
        """Evaluate trained agent"""
        print("\nEvaluating trained agent...")
        
        eval_rewards = []
        
        for episode in range(num_episodes):
            state, _ = self.env.reset()
            total_reward = 0
            done = False
            steps = 0
            
            while not done and steps < 1000:
                state_tensor = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
                with torch.no_grad():
                    probs = self.policy_net(state_tensor)
                    action = torch.argmax(probs).item()
                
                next_state, reward, terminated, truncated, _ = self.env.step(action)
                done = terminated or truncated
                
                state = next_state
                total_reward += reward
                steps += 1
            
            eval_rewards.append(total_reward)
            print(f"Episode {episode+1}: {total_reward:.1f} reward")
        
        avg_reward = np.mean(eval_rewards)
        print(f"\nAverage evaluation reward: {avg_reward:.1f}")
        
        return avg_reward

# Create environment
env = gym.make('LunarLander-v2')

# Train REINFORCE agent
reinforce_agent = REINFORCEAgent(env)
reinforce_agent.train(num_episodes=1000)

# Evaluate
reinforce_agent.evaluate(num_episodes=5)

# Plot training progress
plt.figure(figsize=(12, 6))
plt.plot(reinforce_agent.episode_rewards, alpha=0.7)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('REINFORCE Training on Lunar Lander')
plt.grid(True, alpha=0.3)
plt.show()

๐Ÿ† Exercise 4: Custom Environment - Mountain Carยถ

Goal: Create and solve a custom RL environment

Task: Modify the Mountain Car environment to make it more challenging

  • Add random wind forces

  • Change reward structure

  • Implement your own RL solution

import gymnasium as gym
from gymnasium import spaces
import numpy as np

class WindyMountainCar(gym.Env):
    """Modified Mountain Car with wind forces"""
    
    def __init__(self, wind_strength=0.1):
        super(WindyMountainCar, self).__init__()
        
        self.wind_strength = wind_strength
        
        # Action space: push left, no push, push right
        self.action_space = spaces.Discrete(3)
        
        # Observation space: position, velocity
        self.observation_space = spaces.Box(
            low=np.array([-1.2, -0.07]),
            high=np.array([0.6, 0.07]),
            dtype=np.float32
        )
        
        self.reset()
    
    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.position = -0.5
        self.velocity = 0.0
        self.steps = 0
        return np.array([self.position, self.velocity], dtype=np.float32), {}
    
    def step(self, action):
        # Standard Mountain Car physics
        force = action - 1  # -1, 0, 1
        self.velocity += force * 0.001 + np.cos(3 * self.position) * (-0.0025)
        self.velocity = np.clip(self.velocity, -0.07, 0.07)
        self.position += self.velocity
        self.position = np.clip(self.position, -1.2, 0.6)
        
        # Add wind force
        wind_force = np.random.normal(0, self.wind_strength)
        self.velocity += wind_force
        self.velocity = np.clip(self.velocity, -0.07, 0.07)
        self.position += self.velocity
        self.position = np.clip(self.position, -1.2, 0.6)
        
        # Reward: distance to goal + small penalty for wind
        reward = (self.position + 1.2) / 1.8  # 0 to 1 based on progress
        reward -= abs(wind_force) * 0.1  # Penalty for wind
        
        # Check termination
        terminated = self.position >= 0.5
        truncated = self.steps >= 200
        self.steps += 1
        
        if terminated:
            reward += 10  # Bonus for reaching goal
        
        return np.array([self.position, self.velocity], dtype=np.float32), reward, terminated, truncated, {}

# Test the custom environment
env = WindyMountainCar(wind_strength=0.05)
print("Testing Windy Mountain Car environment...")

# Run a few random episodes
for episode in range(3):
    state, _ = env.reset()
    total_reward = 0
    done = False
    steps = 0
    
    while not done and steps < 50:
        action = env.action_space.sample()
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        total_reward += reward
        state = next_state
        steps += 1
    
    print(f"Episode {episode+1}: {steps} steps, reward: {total_reward:.2f}")

print("\nNow implement your RL solution for this environment!")
# TODO: Implement DQN or Policy Gradient agent for WindyMountainCar
## ๐Ÿ† Exercise 5: Hyperparameter Tuning

**Goal**: Systematically tune hyperparameters for RL algorithms

**Task**: Compare different hyperparameter configurations on a fixed environment
def hyperparameter_study():
    """Compare different hyperparameter configurations"""
    
    env = gym.make('CartPole-v1')
    
    # Different hyperparameter configurations
    configs = [
        {'name': 'Conservative', 'lr': 1e-4, 'gamma': 0.95, 'epsilon_decay': 0.99},
        {'name': 'Aggressive', 'lr': 1e-2, 'gamma': 0.99, 'epsilon_decay': 0.9},
        {'name': 'Balanced', 'lr': 1e-3, 'gamma': 0.99, 'epsilon_decay': 0.95},
        {'name': 'Slow Decay', 'lr': 1e-3, 'gamma': 0.99, 'epsilon_decay': 0.999},
    ]
    
    results = {}
    
    for config in configs:
        print(f"\nTesting configuration: {config['name']}")
        
        # Create agent with this configuration
        agent = DQNCartPoleAgent(
            env,
            learning_rate=config['lr'],
            gamma=config['gamma'],
            epsilon_decay=config['epsilon_decay']
        )
        
        # Train for fewer episodes for quick comparison
        agent.train(num_episodes=200)
        
        # Evaluate
        eval_reward = agent.evaluate(num_episodes=5)
        
        results[config['name']] = {
            'final_avg_reward': np.mean(agent.episode_rewards[-50:]),
            'eval_reward': eval_reward,
            'rewards': agent.episode_rewards
        }
    
    # Plot comparison
    plt.figure(figsize=(12, 8))
    
    for name, data in results.items():
        plt.plot(data['rewards'], label=f"{name} (final: {data['final_avg_reward']:.1f}, eval: {data['eval_reward']:.1f})", alpha=0.7)
    
    plt.xlabel('Episode')
    plt.ylabel('Total Reward')
    plt.title('Hyperparameter Comparison on CartPole')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()
    
    # Print summary
    print("\nHyperparameter Study Results:")
    print("=" * 50)
    for name, data in results.items():
        print(f"{name}:")
        print(f"  Final Training Avg: {data['final_avg_reward']:.1f}")
        print(f"  Evaluation Avg: {data['eval_reward']:.1f}")
        print()

# Run hyperparameter study
hyperparameter_study()

๐Ÿ† Exercise 6: Real-World Application - Stock Tradingยถ

Goal: Apply RL to a simplified stock trading environment

Environment:

  • State: Current stock price, portfolio value, trend indicators

  • Actions: Buy, Sell, Hold

  • Reward: Portfolio returns minus transaction costs

  • Goal: Maximize long-term returns

class StockTradingEnv(gym.Env):
    """Simplified stock trading environment"""
    
    def __init__(self, initial_balance=10000, transaction_cost=0.001):
        super(StockTradingEnv, self).__init__()
        
        self.initial_balance = initial_balance
        self.transaction_cost = transaction_cost
        
        # Generate synthetic stock prices (random walk with trend)
        self.generate_stock_data()
        
        # Action space: 0=Hold, 1=Buy, 2=Sell
        self.action_space = spaces.Discrete(3)
        
        # Observation space: [price, balance, shares, trend]
        self.observation_space = spaces.Box(
            low=np.array([0, 0, 0, -1]),
            high=np.array([200, 100000, 1000, 1]),
            dtype=np.float32
        )
    
    def generate_stock_data(self):
        """Generate synthetic stock price data"""
        np.random.seed(42)
        n_steps = 1000
        
        # Random walk with drift
        prices = [100.0]
        for i in range(n_steps - 1):
            change = np.random.normal(0.001, 0.02)  # Small drift, volatility
            new_price = prices[-1] * (1 + change)
            prices.append(max(new_price, 1.0))  # Floor at $1
        
        self.prices = np.array(prices)
        self.max_steps = len(prices)
    
    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.current_step = 0
        self.balance = self.initial_balance
        self.shares = 0
        self.done = False
        
        return self._get_observation(), {}
    
    def _get_observation(self):
        """Get current observation"""
        price = self.prices[self.current_step]
        
        # Simple trend indicator (price change over last 5 steps)
        if self.current_step >= 5:
            trend = (price - self.prices[self.current_step - 5]) / self.prices[self.current_step - 5]
        else:
            trend = 0.0
        
        return np.array([price, self.balance, self.shares, trend], dtype=np.float32)
    
    def step(self, action):
        """Execute trading action"""
        price = self.prices[self.current_step]
        portfolio_value = self.balance + self.shares * price
        
        # Execute action
        if action == 1:  # Buy
            # Buy as many shares as possible with available balance
            max_shares = int(self.balance / (price * (1 + self.transaction_cost)))
            if max_shares > 0:
                cost = max_shares * price * (1 + self.transaction_cost)
                self.balance -= cost
                self.shares += max_shares
        
        elif action == 2:  # Sell
            if self.shares > 0:
                proceeds = self.shares * price * (1 - self.transaction_cost)
                self.balance += proceeds
                self.shares = 0
        
        # Move to next time step
        self.current_step += 1
        
        # Check if episode is done
        done = self.current_step >= self.max_steps - 1
        
        # Calculate reward (change in portfolio value)
        new_portfolio_value = self.balance + self.shares * self.prices[self.current_step]
        reward = new_portfolio_value - portfolio_value
        
        # Additional reward for holding during uptrends
        if action == 0 and self._get_observation()[3] > 0.01:  # Positive trend
            reward += 1.0
        
        return self._get_observation(), reward, done, False, {}
    
    def render(self):
        """Render current state"""
        price = self.prices[self.current_step]
        portfolio_value = self.balance + self.shares * price
        print(f"Step: {self.current_step}, Price: ${price:.2f}, "
              f"Balance: ${self.balance:.2f}, Shares: {self.shares}, "
              f"Portfolio: ${portfolio_value:.2f}")

# Test the trading environment
trading_env = StockTradingEnv()
print("Testing Stock Trading Environment...")

# Run a random trading episode
state, _ = trading_env.reset()
total_reward = 0
done = False

while not done:
    action = trading_env.action_space.sample()
    next_state, reward, done, _, _ = trading_env.step(action)
    total_reward += reward
    state = next_state

final_value = trading_env.balance + trading_env.shares * trading_env.prices[-1]
print(f"Random trading result: ${final_value:.2f} (initial: ${trading_env.initial_balance:.2f})")
print(f"Total reward: {total_reward:.2f}")

print("\nNow implement an RL agent to learn trading strategies!")
# TODO: Implement DQN or Policy Gradient agent for stock trading

๐Ÿ† Bonus Exercise: Multi-Agent RLยถ

Goal: Implement a simple multi-agent environment and train cooperative agents

Environment: Two agents must cooperate to achieve a goal

  • State: Positions of both agents and goal

  • Actions: Move in 4 directions

  • Reward: Shared reward for reaching goal

  • Challenge: Agents must coordinate their movements

# TODO: Implement multi-agent environment and training
print("Multi-Agent RL Exercise")
print("- Implement cooperative multi-agent environment")
print("- Train agents using independent Q-learning")
print("- Experiment with communication protocols")
print("- Compare individual vs team rewards")

# This is a challenging advanced exercise - implement it as a personal project!

๐Ÿง  Key Takeawaysยถ

  1. Implementation is key: Understanding algorithms theoretically is only half the battle

  2. Debugging RL is hard: Training can be unstable, hyperparameters matter greatly

  3. Start simple: Begin with tabular methods, then move to function approximation

  4. Experiment systematically: Change one thing at a time when tuning

  5. Real applications are messy: Reward design, state representation, and safety matter

๐Ÿš€ Next Stepsยถ

Now that youโ€™ve implemented core RL algorithms, youโ€™re ready to:

  • Explore advanced algorithms: PPO, SAC, Rainbow DQN

  • Tackle harder environments: Atari games, continuous control

  • Apply to real problems: Robotics, game AI, recommendation systems

  • Research current developments: Follow arXiv papers and conferences

  • Contribute to open source: Improve existing RL libraries

๐Ÿ‹๏ธ Final Challengesยถ

  1. Solve Atari Breakout using DQN

  2. Train a robot arm to reach targets (simulation)

  3. Implement AlphaZero for a simple game

  4. Build an RL trading system with real market data

  5. Create a multi-agent system for cooperative tasks

๐Ÿ’ก Pro Tipsยถ

  • Use existing libraries: Stable Baselines3, Ray RLlib for production

  • Monitor training: Use TensorBoard or Weights & Biases

  • Start with Gymnasium: Standard environments for testing

  • Read papers: Original papers often have implementation details

  • Join communities: Reddit r/reinforcementlearning, Discord servers

๐ŸŽ‰ Congratulations!ยถ

Youโ€™ve completed the practical reinforcement learning exercises! You now have hands-on experience with:

  • Tabular Q-Learning

  • Deep Q-Networks

  • Policy Gradients

  • Custom environments

  • Hyperparameter tuning

  • Real-world applications

Keep experimenting, keep learning, and most importantly - have fun! ๐ŸŽฎ๐Ÿค–