1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
| import torch import torch.nn as nn import torch.nn.functional as F import numpy as np import gym
BATCH_SIZE = 32 LR = 0.01 EPSILON = 0.9 GAMMA = 0.9 TARGET_REPLACE_ITER = 100 MEMORY_CAPACITY = 2000 env = gym.make('CartPole-v1') env = env.unwrapped N_ACTIONS = env.action_space.n N_STATES = env.observation_space.shape[0] ENV_A_SHAPE = 0 if isinstance(env.action_space.sample(), int) else env.action_space.sample().shape
""" Action Space | Num | Action | |-----|------------------------| | 0 | Push cart to the left | | 1 | Push cart to the right | """
""" Observation Space | Num | Observation | Min | Max | |-----|-----------------------|----------------------|--------------------| | 0 | Cart Position | -4.8 | 4.8 | | 1 | Cart Velocity | -Inf | Inf | | 2 | Pole Angle | ~ -0.418 rad (-24°) | ~ 0.418 rad (24°) | | 3 | Pole Angular Velocity | -Inf | Inf | """
class Net(nn.Module): def __init__(self, ): super(Net, self).__init__() self.fc1 = nn.Linear(N_STATES, 50) self.fc1.weight.data.normal_(0, 0.1) self.out = nn.Linear(50, N_ACTIONS) self.out.weight.data.normal_(0, 0.1)
def forward(self, x): x = self.fc1(x) x = F.relu(x) actions_value = self.out(x) return actions_value
class DQN(object): def __init__(self): self.eval_net, self.target_net = Net(), Net()
self.learn_step_counter = 0 self.memory_counter = 0 self.memory = np.zeros((MEMORY_CAPACITY, N_STATES * 2 + 2)) self.optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=LR) self.loss_func = nn.MSELoss()
def choose_action(self, x): x = torch.unsqueeze(torch.FloatTensor(x), 0) if np.random.uniform() < EPSILON: actions_value = self.eval_net.forward(x) action = torch.max(actions_value, 1)[1].data.numpy() action = action[0] if ENV_A_SHAPE == 0 else action.reshape(ENV_A_SHAPE) else: action = np.random.randint(0, N_ACTIONS) action = action if ENV_A_SHAPE == 0 else action.reshape(ENV_A_SHAPE) return action
def store_transition(self, s, a, r, s_): transition = np.hstack((s, [a, r], s_)) index = self.memory_counter % MEMORY_CAPACITY self.memory[index, :] = transition self.memory_counter += 1
def learn(self): if self.learn_step_counter % TARGET_REPLACE_ITER == 0: self.target_net.load_state_dict(self.eval_net.state_dict()) self.learn_step_counter += 1
sample_index = np.random.choice(MEMORY_CAPACITY, BATCH_SIZE) b_memory = self.memory[sample_index, :] b_s = torch.FloatTensor(b_memory[:, :N_STATES]) b_a = torch.LongTensor(b_memory[:, N_STATES:N_STATES+1].astype(int)) b_r = torch.FloatTensor(b_memory[:, N_STATES+1:N_STATES+2]) b_s_ = torch.FloatTensor(b_memory[:, -N_STATES:])
q_eval = self.eval_net(b_s).gather(1, b_a) q_next = self.target_net(b_s_).detach() q_target = b_r + GAMMA * q_next.max(1)[0].view(BATCH_SIZE, 1) loss = self.loss_func(q_eval, q_target)
self.optimizer.zero_grad() loss.backward() self.optimizer.step()
dqn = DQN()
print('\nCollecting experience...') for i_episode in range(400): s = env.reset() ep_r = 0 while True: env.render() a = dqn.choose_action(s)
s_, r, done, info = env.step(a)
x, x_dot, theta, theta_dot = s_ r1 = (env.x_threshold - abs(x)) / env.x_threshold - 0.8 r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5 r = r1 + r2
dqn.store_transition(s, a, r, s_)
ep_r += r if dqn.memory_counter > MEMORY_CAPACITY: dqn.learn() if done: print('Ep: ', i_episode, '| Ep_r: ', round(ep_r, 2))
if done: break s = s_
|