Source code for neorl.rl.baselines.deepq.dqn

from functools import partial

import tensorflow as tf
import numpy as np
import gym

from neorl.rl.baselines.shared import logger
from neorl.rl.baselines.shared import tf_util, OffPolicyRLModel, SetVerbosity, TensorboardWriter
from neorl.rl.baselines.shared.vec_env import VecEnv
from neorl.rl.baselines.shared.schedules import LinearSchedule
from neorl.rl.baselines.shared.buffers import ReplayBuffer, PrioritizedReplayBuffer
from neorl.rl.baselines.deepq.build_graph import build_train
from neorl.rl.baselines.deepq.policies import DQNPolicy

# Filter tensorflow version warnings
import os
# https://stackoverflow.com/questions/40426502/is-there-a-way-to-suppress-the-messages-tensorflow-prints/40426709
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'  # or any {'0', '1', '2'}
import warnings
# https://stackoverflow.com/questions/15777951/how-to-suppress-pandas-future-warning
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=Warning)
import tensorflow as tf
tf.get_logger().setLevel('INFO')
tf.autograph.set_verbosity(0)
import logging
tf.get_logger().setLevel(logging.ERROR)


[docs]class DQN(OffPolicyRLModel): """ The DQN model class :param policy: (DQNPolicy or str) The policy model to use (MlpPolicy, CnnPolicy, LnMlpPolicy, ...) :param env: (NEORL environment or Gym environment) The environment to learn with PPO, either use NEORL method ``CreateEnvironment`` (see **below**) or construct your custom Gym environment :param gamma: (float) discount factor :param learning_rate: (float) learning rate for adam optimizer :param buffer_size: (int) size of the replay buffer :param exploration_fraction: (float) fraction of entire training period over which the exploration rate is annealed :param eps_final: (float) final value of random action probability (e.g. 0.05) :param eps_init: (float) initial value of random action probability (e.g. 1.0) :param train_freq: (int) update the model every `train_freq` steps. set to None to disable printing :param batch_size: (int) size of a batched sampled from replay buffer for training :param learning_starts: (int) how many steps of the model to collect transitions for before learning starts :param target_network_update_freq: (int) update the target network every `target_network_update_freq` steps. :param prioritized_replay: (bool) if True prioritized experience replay buffer will be used. :param verbose: (int) the verbosity level: 0 none, 1 training information, 2 tensorflow debug :param seed: (int) Seed for the pseudo-random generators (python, numpy, tensorflow). If None (default), use random seed. """ # :param prioritized_replay_alpha: (float)alpha parameter for prioritized replay buffer. # It determines how much prioritization is used, with alpha=0 corresponding to the uniform case. # :param prioritized_replay_beta0: (float) initial value of beta for prioritized replay buffer # :param prioritized_replay_beta_iters: (int) number of iterations over which beta will be annealed from initial # value to 1.0. If set to None equals to max_timesteps. # :param prioritized_replay_eps: (float) epsilon to add to the TD errors when updating priorities. def __init__(self, policy, env, gamma=0.99, learning_rate=5e-4, buffer_size=50000, exploration_fraction=0.1, eps_final=0.02, eps_init=1.0, train_freq=1, batch_size=32, learning_starts=1000, target_network_update_freq=500, prioritized_replay=True, verbose=0, seed=None, _init_setup_model=True): # TODO: replay_buffer refactoring super(DQN, self).__init__(policy=policy, env=env, replay_buffer=None, verbose=verbose, policy_base=DQNPolicy, requires_vec_env=False, policy_kwargs=None, seed=seed, n_cpu_tf_sess=1) self.param_noise = False self.learning_starts = learning_starts self.train_freq = train_freq self.prioritized_replay = prioritized_replay self.prioritized_replay_eps = 1e-6 self.batch_size = batch_size self.target_network_update_freq = target_network_update_freq self.prioritized_replay_alpha = 0.6 self.prioritized_replay_beta0 = 0.4 self.prioritized_replay_beta_iters = None self.exploration_final_eps = eps_final self.exploration_initial_eps = eps_init self.exploration_fraction = exploration_fraction self.buffer_size = buffer_size self.learning_rate = learning_rate self.gamma = gamma self.tensorboard_log = None self.full_tensorboard_log = False self.double_q = True self.graph = None self.sess = None self._train_step = None self.step_model = None self.update_target = None self.act = None self.proba_step = None self.replay_buffer = None self.beta_schedule = None self.exploration = None self.params = None self.summary = None if _init_setup_model: self.setup_model() def _get_pretrain_placeholders(self): policy = self.step_model return policy.obs_ph, tf.placeholder(tf.int32, [None]), policy.q_values def setup_model(self): with SetVerbosity(self.verbose): assert not isinstance(self.action_space, gym.spaces.Box), \ "Error: DQN cannot output a gym.spaces.Box action space." # If the policy is wrap in functool.partial (e.g. to disable dueling) # unwrap it to check the class type if isinstance(self.policy, partial): test_policy = self.policy.func else: test_policy = self.policy assert issubclass(test_policy, DQNPolicy), "Error: the input policy for the DQN model must be " \ "an instance of DQNPolicy." self.graph = tf.Graph() with self.graph.as_default(): self.set_random_seed(self.seed) self.sess = tf_util.make_session(num_cpu=self.n_cpu_tf_sess, graph=self.graph) optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate) self.act, self._train_step, self.update_target, self.step_model = build_train( q_func=partial(self.policy, **self.policy_kwargs), ob_space=self.observation_space, ac_space=self.action_space, optimizer=optimizer, gamma=self.gamma, grad_norm_clipping=10, param_noise=self.param_noise, sess=self.sess, full_tensorboard_log=self.full_tensorboard_log, double_q=self.double_q ) self.proba_step = self.step_model.proba_step self.params = tf_util.get_trainable_vars("deepq") # Initialize the parameters and copy them to the target network. tf_util.initialize(self.sess) self.update_target(sess=self.sess) self.summary = tf.summary.merge_all()
[docs] def learn(self, total_timesteps, callback=None, log_interval=100, tb_log_name="DQN", reset_num_timesteps=True, replay_wrapper=None): new_tb_log = self._init_num_timesteps(reset_num_timesteps) callback = self._init_callback(callback) with SetVerbosity(self.verbose), TensorboardWriter(self.graph, self.tensorboard_log, tb_log_name, new_tb_log) \ as writer: self._setup_learn() # Create the replay buffer if self.prioritized_replay: self.replay_buffer = PrioritizedReplayBuffer(self.buffer_size, alpha=self.prioritized_replay_alpha) if self.prioritized_replay_beta_iters is None: prioritized_replay_beta_iters = total_timesteps else: prioritized_replay_beta_iters = self.prioritized_replay_beta_iters self.beta_schedule = LinearSchedule(prioritized_replay_beta_iters, initial_p=self.prioritized_replay_beta0, final_p=1.0) else: self.replay_buffer = ReplayBuffer(self.buffer_size) self.beta_schedule = None if replay_wrapper is not None: assert not self.prioritized_replay, "Prioritized replay buffer is not supported by HER" self.replay_buffer = replay_wrapper(self.replay_buffer) # Create the schedule for exploration starting from 1. self.exploration = LinearSchedule(schedule_timesteps=int(self.exploration_fraction * total_timesteps), initial_p=self.exploration_initial_eps, final_p=self.exploration_final_eps) episode_rewards = [0.0] episode_successes = [] callback.on_training_start(locals(), globals()) callback.on_rollout_start() reset = True obs = self.env.reset() # Retrieve unnormalized observation for saving into the buffer if self._vec_normalize_env is not None: obs_ = self._vec_normalize_env.get_original_obs().squeeze() for _ in range(total_timesteps): # Take action and update exploration to the newest value kwargs = {} if not self.param_noise: update_eps = self.exploration.value(self.num_timesteps) update_param_noise_threshold = 0. else: update_eps = 0. # Compute the threshold such that the KL divergence between perturbed and non-perturbed # policy is comparable to eps-greedy exploration with eps = exploration.value(t). # See Appendix C.1 in Parameter Space Noise for Exploration, Plappert et al., 2017 # for detailed explanation. update_param_noise_threshold = \ -np.log(1. - self.exploration.value(self.num_timesteps) + self.exploration.value(self.num_timesteps) / float(self.env.action_space.n)) kwargs['reset'] = reset kwargs['update_param_noise_threshold'] = update_param_noise_threshold kwargs['update_param_noise_scale'] = True with self.sess.as_default(): action = self.act(np.array(obs)[None], update_eps=update_eps, **kwargs)[0] env_action = action reset = False new_obs, rew, done, info = self.env.step(env_action) self.num_timesteps += 1 # Stop training if return value is False callback.update_locals(locals()) if callback.on_step() is False: break # Store only the unnormalized version if self._vec_normalize_env is not None: new_obs_ = self._vec_normalize_env.get_original_obs().squeeze() reward_ = self._vec_normalize_env.get_original_reward().squeeze() else: # Avoid changing the original ones obs_, new_obs_, reward_ = obs, new_obs, rew # Store transition in the replay buffer. self.replay_buffer_add(obs_, action, reward_, new_obs_, done, info) obs = new_obs # Save the unnormalized observation if self._vec_normalize_env is not None: obs_ = new_obs_ if writer is not None: ep_rew = np.array([reward_]).reshape((1, -1)) ep_done = np.array([done]).reshape((1, -1)) tf_util.total_episode_reward_logger(self.episode_reward, ep_rew, ep_done, writer, self.num_timesteps) episode_rewards[-1] += reward_ if done: maybe_is_success = info.get('is_success') if maybe_is_success is not None: episode_successes.append(float(maybe_is_success)) if not isinstance(self.env, VecEnv): obs = self.env.reset() episode_rewards.append(0.0) reset = True # Do not train if the warmup phase is not over # or if there are not enough samples in the replay buffer can_sample = self.replay_buffer.can_sample(self.batch_size) if can_sample and self.num_timesteps > self.learning_starts \ and self.num_timesteps % self.train_freq == 0: callback.on_rollout_end() # Minimize the error in Bellman's equation on a batch sampled from replay buffer. # pytype:disable=bad-unpacking if self.prioritized_replay: assert self.beta_schedule is not None, \ "BUG: should be LinearSchedule when self.prioritized_replay True" experience = self.replay_buffer.sample(self.batch_size, beta=self.beta_schedule.value(self.num_timesteps), env=self._vec_normalize_env) (obses_t, actions, rewards, obses_tp1, dones, weights, batch_idxes) = experience else: obses_t, actions, rewards, obses_tp1, dones = self.replay_buffer.sample(self.batch_size, env=self._vec_normalize_env) weights, batch_idxes = np.ones_like(rewards), None # pytype:enable=bad-unpacking if writer is not None: # run loss backprop with summary, but once every 100 steps save the metadata # (memory, compute time, ...) if (1 + self.num_timesteps) % 100 == 0: run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE) run_metadata = tf.RunMetadata() summary, td_errors = self._train_step(obses_t, actions, rewards, obses_tp1, obses_tp1, dones, weights, sess=self.sess, options=run_options, run_metadata=run_metadata) writer.add_run_metadata(run_metadata, 'step%d' % self.num_timesteps) else: summary, td_errors = self._train_step(obses_t, actions, rewards, obses_tp1, obses_tp1, dones, weights, sess=self.sess) writer.add_summary(summary, self.num_timesteps) else: _, td_errors = self._train_step(obses_t, actions, rewards, obses_tp1, obses_tp1, dones, weights, sess=self.sess) if self.prioritized_replay: new_priorities = np.abs(td_errors) + self.prioritized_replay_eps assert isinstance(self.replay_buffer, PrioritizedReplayBuffer) self.replay_buffer.update_priorities(batch_idxes, new_priorities) callback.on_rollout_start() if can_sample and self.num_timesteps > self.learning_starts and \ self.num_timesteps % self.target_network_update_freq == 0: # Update target network periodically. self.update_target(sess=self.sess) if len(episode_rewards[-101:-1]) == 0: mean_100ep_reward = -np.inf else: mean_100ep_reward = round(float(np.mean(episode_rewards[-101:-1])), 1) num_episodes = len(episode_rewards) if self.verbose >= 1 and done and log_interval is not None and len(episode_rewards) % log_interval == 0: logger.record_tabular("steps", self.num_timesteps) logger.record_tabular("episodes", num_episodes) if len(episode_successes) > 0: logger.logkv("success rate", np.mean(episode_successes[-100:])) logger.record_tabular("mean 100 episode reward", mean_100ep_reward) logger.record_tabular("% time spent exploring", int(100 * self.exploration.value(self.num_timesteps))) logger.dump_tabular() callback.on_training_end() return self
[docs] def predict(self, observation, state=None, mask=None, deterministic=True): observation = np.array(observation) vectorized_env = self._is_vectorized_observation(observation, self.observation_space) observation = observation.reshape((-1,) + self.observation_space.shape) with self.sess.as_default(): actions, _, _ = self.step_model.step(observation, deterministic=deterministic) if not vectorized_env: actions = actions[0] return actions, None
def action_probability(self, observation, state=None, mask=None, actions=None, logp=False): observation = np.array(observation) vectorized_env = self._is_vectorized_observation(observation, self.observation_space) observation = observation.reshape((-1,) + self.observation_space.shape) actions_proba = self.proba_step(observation, state, mask) if actions is not None: # comparing the action distribution, to given actions actions = np.array([actions]) assert isinstance(self.action_space, gym.spaces.Discrete) actions = actions.reshape((-1,)) assert observation.shape[0] == actions.shape[0], "Error: batch sizes differ for actions and observations." actions_proba = actions_proba[np.arange(actions.shape[0]), actions] # normalize action proba shape actions_proba = actions_proba.reshape((-1, 1)) if logp: actions_proba = np.log(actions_proba) if not vectorized_env: if state is not None: raise ValueError("Error: The environment must be vectorized when using recurrent policies.") actions_proba = actions_proba[0] return actions_proba def get_parameter_list(self): return self.params
[docs] def save(self, save_path, cloudpickle=False): # params data = { "double_q": self.double_q, "param_noise": self.param_noise, "learning_starts": self.learning_starts, "train_freq": self.train_freq, "prioritized_replay": self.prioritized_replay, "prioritized_replay_eps": self.prioritized_replay_eps, "batch_size": self.batch_size, "target_network_update_freq": self.target_network_update_freq, "prioritized_replay_alpha": self.prioritized_replay_alpha, "prioritized_replay_beta0": self.prioritized_replay_beta0, "prioritized_replay_beta_iters": self.prioritized_replay_beta_iters, "exploration_final_eps": self.exploration_final_eps, "exploration_fraction": self.exploration_fraction, "learning_rate": self.learning_rate, "gamma": self.gamma, "verbose": self.verbose, "observation_space": self.observation_space, "action_space": self.action_space, "policy": self.policy, "n_envs": self.n_envs, "n_cpu_tf_sess": self.n_cpu_tf_sess, "seed": self.seed, "_vectorize_action": self._vectorize_action, "policy_kwargs": self.policy_kwargs } params_to_save = self.get_parameters() self._save_to_file(save_path, data=data, params=params_to_save, cloudpickle=cloudpickle)