Source code for neorl.rl.baselines.acktr.acktr

import time
import warnings

import tensorflow as tf
from gym.spaces import Box, Discrete

from neorl.rl.baselines.shared import logger
from neorl.rl.baselines.a2c.a2c import A2CRunner
from neorl.rl.baselines.ppo2.ppo2 import Runner as PPO2Runner
from neorl.rl.baselines.shared.tf_util import mse, total_episode_reward_logger
from neorl.rl.baselines.acktr import kfac
from neorl.rl.baselines.shared.schedules import Scheduler
from neorl.rl.baselines.shared import explained_variance, ActorCriticRLModel, tf_util, SetVerbosity, TensorboardWriter
from neorl.rl.baselines.shared.policies import ActorCriticPolicy, RecurrentActorCriticPolicy
from neorl.rl.baselines.shared.math_util import safe_mean

# Filter tensorflow version warnings
import os
# https://stackoverflow.com/questions/40426502/is-there-a-way-to-suppress-the-messages-tensorflow-prints/40426709
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'  # or any {'0', '1', '2'}
import warnings
# https://stackoverflow.com/questions/15777951/how-to-suppress-pandas-future-warning
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=Warning)
import tensorflow as tf
tf.get_logger().setLevel('INFO')
tf.autograph.set_verbosity(0)
import logging
tf.get_logger().setLevel(logging.ERROR)


[docs]class ACKTR(ActorCriticRLModel): """ The ACKTR (Actor Critic using Kronecker-Factored Trust Region) model class4 :param policy: (ActorCriticPolicy or str) The policy model to use (e.g. MlpPolicy) :param env: (NEORL environment or Gym environment) The environment to learn with PPO, either use NEORL method ``CreateEnvironment`` (see **below**) or construct your custom Gym environment :param gamma: (float) Discount factor :param n_steps: (int) The number of steps to run for each environment :param ent_coef: (float) The weight for the entropy loss :param vf_coef: (float) The weight for the loss on the value function :param vf_fisher_coef: (float) The weight for the fisher loss on the value function :param learning_rate: (float) The initial learning rate for the RMS prop optimizer :param max_grad_norm: (float) The clipping value for the maximum gradient :param kfac_clip: (float) gradient clipping for Kullback-Leibler :param lr_schedule: (str) The type of scheduler for the learning rate update ('linear', 'constant', 'double_linear_con', 'middle_drop' or 'double_middle_drop') :param verbose: (int) the verbosity level: 0 none, 1 training information, 2 tensorflow debug :param seed: (int) Seed for the pseudo-random generators (python, numpy, tensorflow). If None (default), use random seed. """ #:param async_eigen_decomp: (bool) Use async eigen decomposition #:param kfac_update: (int) update kfac after kfac_update steps #:param gae_lambda: (float) Factor for trade-off of bias vs variance for Generalized Advantage Estimator # If None (default), then the classic advantage will be used instead of GAE def __init__(self, policy, env, gamma=0.99, n_steps=20, ent_coef=0.01, vf_coef=0.25, vf_fisher_coef=1.0, learning_rate=0.25, max_grad_norm=0.5, kfac_clip=0.001, lr_schedule='linear', verbose=0, seed=None, _init_setup_model=True): self.n_steps = n_steps self.gamma = gamma self.ent_coef = ent_coef self.vf_coef = vf_coef self.vf_fisher_coef = vf_fisher_coef self.kfac_clip = kfac_clip self.max_grad_norm = max_grad_norm self.learning_rate = learning_rate self.lr_schedule = lr_schedule self.tensorboard_log = None self.full_tensorboard_log = False policy_kwargs=None n_cpu_tf_sess=1 self.async_eigen_decomp = False self.kfac_update = 1 self.gae_lambda = None self.actions_ph = None self.advs_ph = None self.rewards_ph = None self.learning_rate_ph = None self.step_model = None self.train_model = None self.entropy = None self.pg_loss = None self.vf_loss = None self.pg_fisher = None self.vf_fisher = None self.joint_fisher = None self.grads_check = None self.optim = None self.train_op = None self.q_runner = None self.learning_rate_schedule = None self.proba_step = None self.value = None self.initial_state = None self.n_batch = None self.summary = None self.trained = False self.continuous_actions = False super(ACKTR, self).__init__(policy=policy, env=env, verbose=verbose, requires_vec_env=True, _init_setup_model=_init_setup_model, policy_kwargs=policy_kwargs, seed=seed, n_cpu_tf_sess=n_cpu_tf_sess) if _init_setup_model: self.setup_model() def _make_runner(self): if self.gae_lambda is not None: return PPO2Runner( env=self.env, model=self, n_steps=self.n_steps, gamma=self.gamma, lam=self.gae_lambda) else: return A2CRunner( self.env, self, n_steps=self.n_steps, gamma=self.gamma) def _get_pretrain_placeholders(self): policy = self.train_model if isinstance(self.action_space, Discrete): return policy.obs_ph, self.actions_ph, policy.policy return policy.obs_ph, self.actions_ph, policy.deterministic_action def setup_model(self): with SetVerbosity(self.verbose): assert issubclass(self.policy, ActorCriticPolicy), "Error: the input policy for the ACKTR model must be " \ "an instance of common.policies.ActorCriticPolicy." # Enable continuous actions tricks (normalized advantage) self.continuous_actions = isinstance(self.action_space, Box) self.graph = tf.Graph() with self.graph.as_default(): self.set_random_seed(self.seed) self.sess = tf_util.make_session(num_cpu=self.n_cpu_tf_sess, graph=self.graph) n_batch_step = None n_batch_train = None if issubclass(self.policy, RecurrentActorCriticPolicy): n_batch_step = self.n_envs n_batch_train = self.n_envs * self.n_steps step_model = self.policy(self.sess, self.observation_space, self.action_space, self.n_envs, 1, n_batch_step, reuse=False, **self.policy_kwargs) self.params = params = tf_util.get_trainable_vars("model") with tf.variable_scope("train_model", reuse=True, custom_getter=tf_util.outer_scope_getter("train_model")): train_model = self.policy(self.sess, self.observation_space, self.action_space, self.n_envs, self.n_steps, n_batch_train, reuse=True, **self.policy_kwargs) with tf.variable_scope("loss", reuse=False, custom_getter=tf_util.outer_scope_getter("loss")): self.advs_ph = advs_ph = tf.placeholder(tf.float32, [None]) self.rewards_ph = rewards_ph = tf.placeholder(tf.float32, [None]) self.learning_rate_ph = learning_rate_ph = tf.placeholder(tf.float32, []) self.actions_ph = train_model.pdtype.sample_placeholder([None]) neg_log_prob = train_model.proba_distribution.neglogp(self.actions_ph) # training loss pg_loss = tf.reduce_mean(advs_ph * neg_log_prob) self.entropy = entropy = tf.reduce_mean(train_model.proba_distribution.entropy()) self.pg_loss = pg_loss = pg_loss - self.ent_coef * entropy self.vf_loss = vf_loss = mse(tf.squeeze(train_model.value_fn), rewards_ph) train_loss = pg_loss + self.vf_coef * vf_loss # Fisher loss construction self.pg_fisher = pg_fisher_loss = -tf.reduce_mean(neg_log_prob) sample_net = train_model.value_fn + tf.random_normal(tf.shape(train_model.value_fn)) self.vf_fisher = vf_fisher_loss = - self.vf_fisher_coef * tf.reduce_mean( tf.pow(train_model.value_fn - tf.stop_gradient(sample_net), 2)) self.joint_fisher = pg_fisher_loss + vf_fisher_loss tf.summary.scalar('entropy_loss', self.entropy) tf.summary.scalar('policy_gradient_loss', pg_loss) tf.summary.scalar('policy_gradient_fisher_loss', pg_fisher_loss) tf.summary.scalar('value_function_loss', self.vf_loss) tf.summary.scalar('value_function_fisher_loss', vf_fisher_loss) tf.summary.scalar('loss', train_loss) self.grads_check = tf.gradients(train_loss, params) with tf.variable_scope("input_info", reuse=False): tf.summary.scalar('discounted_rewards', tf.reduce_mean(self.rewards_ph)) tf.summary.scalar('learning_rate', tf.reduce_mean(self.learning_rate_ph)) tf.summary.scalar('advantage', tf.reduce_mean(self.advs_ph)) if self.full_tensorboard_log: tf.summary.histogram('discounted_rewards', self.rewards_ph) tf.summary.histogram('learning_rate', self.learning_rate_ph) tf.summary.histogram('advantage', self.advs_ph) if tf_util.is_image(self.observation_space): tf.summary.image('observation', train_model.obs_ph) else: tf.summary.histogram('observation', train_model.obs_ph) with tf.variable_scope("kfac", reuse=False, custom_getter=tf_util.outer_scope_getter("kfac")): with tf.device('/gpu:0'): self.optim = optim = kfac.KfacOptimizer(learning_rate=learning_rate_ph, clip_kl=self.kfac_clip, momentum=0.9, kfac_update=self.kfac_update, epsilon=0.01, stats_decay=0.99, async_eigen_decomp=self.async_eigen_decomp, cold_iter=10, max_grad_norm=self.max_grad_norm, verbose=self.verbose) optim.compute_and_apply_stats(self.joint_fisher, var_list=params) self.train_model = train_model self.step_model = step_model self.step = step_model.step self.proba_step = step_model.proba_step self.value = step_model.value self.initial_state = step_model.initial_state tf.global_variables_initializer().run(session=self.sess) self.summary = tf.summary.merge_all() def _train_step(self, obs, states, rewards, masks, actions, values, update, writer): """ applies a training step to the model :param obs: ([float]) The input observations :param states: ([float]) The states (used for recurrent policies) :param rewards: ([float]) The rewards from the environment :param masks: ([bool]) Whether or not the episode is over (used for recurrent policies) :param actions: ([float]) The actions taken :param values: ([float]) The logits values :param update: (int) the current step iteration :param writer: (TensorFlow Summary.writer) the writer for tensorboard :return: (float, float, float) policy loss, value loss, policy entropy """ advs = rewards - values # Normalize advantage (used in the original continuous version) if self.continuous_actions: advs = (advs - advs.mean()) / (advs.std() + 1e-8) current_lr = None assert len(obs) > 0, "Error: the observation input array cannot be empty" # Note: in the original continuous version, # the stepsize was automatically tuned computing the kl div # and comparing it to the desired one for _ in range(len(obs)): current_lr = self.learning_rate_schedule.value() td_map = { self.train_model.obs_ph: obs, self.actions_ph: actions, self.advs_ph: advs, self.rewards_ph: rewards, self.learning_rate_ph: current_lr } if states is not None: td_map[self.train_model.states_ph] = states td_map[self.train_model.dones_ph] = masks if writer is not None: # run loss backprop with summary, but once every 10 runs save the metadata (memory, compute time, ...) if self.full_tensorboard_log and (1 + update) % 10 == 0: run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE) run_metadata = tf.RunMetadata() summary, policy_loss, value_loss, policy_entropy, _ = self.sess.run( [self.summary, self.pg_loss, self.vf_loss, self.entropy, self.train_op], td_map, options=run_options, run_metadata=run_metadata) writer.add_run_metadata(run_metadata, 'step%d' % (update * (self.n_batch + 1))) else: summary, policy_loss, value_loss, policy_entropy, _ = self.sess.run( [self.summary, self.pg_loss, self.vf_loss, self.entropy, self.train_op], td_map) writer.add_summary(summary, update * (self.n_batch + 1)) else: policy_loss, value_loss, policy_entropy, _ = self.sess.run( [self.pg_loss, self.vf_loss, self.entropy, self.train_op], td_map) return policy_loss, value_loss, policy_entropy
[docs] def learn(self, total_timesteps, callback=None, log_interval=100, tb_log_name="ACKTR", reset_num_timesteps=True): new_tb_log = self._init_num_timesteps(reset_num_timesteps) callback = self._init_callback(callback) with SetVerbosity(self.verbose), TensorboardWriter(self.graph, self.tensorboard_log, tb_log_name, new_tb_log) \ as writer: self._setup_learn() self.n_batch = self.n_envs * self.n_steps self.learning_rate_schedule = Scheduler(initial_value=self.learning_rate, n_values=total_timesteps, schedule=self.lr_schedule) # FIFO queue of the q_runner thread is closed at the end of the learn function. # As a result, it needs to be redefinied at every call with self.graph.as_default(): with tf.variable_scope("kfac_apply", reuse=self.trained, custom_getter=tf_util.outer_scope_getter("kfac_apply")): # Some of the variables are not in a scope when they are create # so we make a note of any previously uninitialized variables tf_vars = tf.global_variables() is_uninitialized = self.sess.run([tf.is_variable_initialized(var) for var in tf_vars]) old_uninitialized_vars = [v for (v, f) in zip(tf_vars, is_uninitialized) if not f] self.train_op, self.q_runner = self.optim.apply_gradients(list(zip(self.grads_check, self.params))) # then we check for new uninitialized variables and initialize them tf_vars = tf.global_variables() is_uninitialized = self.sess.run([tf.is_variable_initialized(var) for var in tf_vars]) new_uninitialized_vars = [v for (v, f) in zip(tf_vars, is_uninitialized) if not f and v not in old_uninitialized_vars] if len(new_uninitialized_vars) != 0: self.sess.run(tf.variables_initializer(new_uninitialized_vars)) self.trained = True t_start = time.time() coord = tf.train.Coordinator() if self.q_runner is not None: enqueue_threads = self.q_runner.create_threads(self.sess, coord=coord, start=True) else: enqueue_threads = [] callback.on_training_start(locals(), globals()) for update in range(1, total_timesteps // self.n_batch + 1): callback.on_rollout_start() # pytype:disable=bad-unpacking # true_reward is the reward without discount if isinstance(self.runner, PPO2Runner): # We are using GAE rollout = self.runner.run(callback) obs, returns, masks, actions, values, _, states, ep_infos, true_reward = rollout else: rollout = self.runner.run(callback) obs, states, returns, masks, actions, values, ep_infos, true_reward = rollout # pytype:enable=bad-unpacking callback.update_locals(locals()) callback.on_rollout_end() # Early stopping due to the callback if not self.runner.continue_training: break self.ep_info_buf.extend(ep_infos) policy_loss, value_loss, policy_entropy = self._train_step(obs, states, returns, masks, actions, values, self.num_timesteps // (self.n_batch + 1), writer) n_seconds = time.time() - t_start fps = int((update * self.n_batch) / n_seconds) if writer is not None: total_episode_reward_logger(self.episode_reward, true_reward.reshape((self.n_envs, self.n_steps)), masks.reshape((self.n_envs, self.n_steps)), writer, self.num_timesteps) if self.verbose >= 1 and (update % log_interval == 0 or update == 1): explained_var = explained_variance(values, returns) logger.record_tabular("nupdates", update) logger.record_tabular("total_timesteps", self.num_timesteps) logger.record_tabular("fps", fps) logger.record_tabular("policy_entropy", float(policy_entropy)) logger.record_tabular("policy_loss", float(policy_loss)) logger.record_tabular("value_loss", float(value_loss)) logger.record_tabular("explained_variance", float(explained_var)) if len(self.ep_info_buf) > 0 and len(self.ep_info_buf[0]) > 0: logger.logkv('ep_reward_mean', safe_mean([ep_info['r'] for ep_info in self.ep_info_buf])) logger.logkv('ep_len_mean', safe_mean([ep_info['l'] for ep_info in self.ep_info_buf])) logger.dump_tabular() coord.request_stop() coord.join(enqueue_threads) callback.on_training_end() return self
[docs] def save(self, save_path, cloudpickle=False): data = { "gamma": self.gamma, "gae_lambda": self.gae_lambda, "n_steps": self.n_steps, "vf_coef": self.vf_coef, "ent_coef": self.ent_coef, "vf_fisher_coef": self.vf_fisher_coef, "max_grad_norm": self.max_grad_norm, "learning_rate": self.learning_rate, "kfac_clip": self.kfac_clip, "lr_schedule": self.lr_schedule, "verbose": self.verbose, "policy": self.policy, "observation_space": self.observation_space, "action_space": self.action_space, "n_envs": self.n_envs, "n_cpu_tf_sess": self.n_cpu_tf_sess, "seed": self.seed, "kfac_update": self.kfac_update, "_vectorize_action": self._vectorize_action, "policy_kwargs": self.policy_kwargs } params_to_save = self.get_parameters() self._save_to_file(save_path, data=data, params=params_to_save, cloudpickle=cloudpickle)