Source code for neorl.hybrid.ackde

#    This file is part of NEORL.

#    Copyright (c) 2021 Exelon Corporation and MIT Nuclear Science and Engineering
#    NEORL is free software: you can redistribute it and/or modify
#    it under the terms of the MIT LICENSE

#    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
#    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
#    SOFTWARE.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#"""
#Created on Tue Feb 25 14:42:24 2020
#
#@author: Majdi Radaideh
#"""

#Implementation of RL-informed DE (ACKTR-DE)
#Based on papers:

#Radaideh, M. I., & Shirvan, K. (2021). Rule-based reinforcement learning 
#methodology to inform evolutionary algorithms for constrained optimization 
#of engineering applications. Knowledge-Based Systems, 217, 106836.

import warnings
warnings.filterwarnings("ignore")
import random
import pandas as pd
import numpy as np
from neorl.evolu.discrete import mutate_discrete, encode_grid_to_discrete, decode_discrete_to_grid
from neorl import DE
from neorl import ACKTR, MlpPolicy, RLLogger
from neorl.utils.seeding import set_neorl_seed

def encode_grid_individual_to_discrete(individual, bounds):
    
    new_indv=[]
    for i, key in enumerate(bounds):
        if bounds[key][0]=='grid':
            int_val=bounds[key][1].index(individual[i])
            new_indv.append(int_val)
        else:
            new_indv.append(individual[i])
    
    return new_indv

[docs]class ACKDE(object): """ A ACKTR-informed DE Neuroevolution module :param mode: (str) problem type, either ``min`` for minimization problem or ``max`` for maximization :param fit: (function) the fitness function to be used with DE :param env: (NEORL environment or Gym environment) The environment to learn with ACKTR, either use NEORL method ``CreateEnvironment`` (see **below**) or construct your custom Gym environment. :param bounds: (dict) input parameter type and lower/upper bounds in dictionary form. Example: ``bounds={'x1': ['int', 1, 4], 'x2': ['float', 0.1, 0.8], 'x3': ['float', 2.2, 6.2]}`` :param npop: (int): population size of DE :param npop_rl: (int): number of RL/ACKTR individuals to use in DE population (``npop_rl < npop``) :param init_pop_rl: (bool) flag to initialize DE population with ACKTR individuals :param hyperparam: (dict) dictionary of DE hyperparameters (``F``, ``CR``) and ACKTR hyperparameters (``n_steps``, ``gamma``, ``learning_rate``, ``ent_coef``, ``vf_coef``, ``vf_fisher_coef``, ``kfac_clip``, ``max_grad_norm``, ``lr_schedule``) :param seed: (int) random seed for sampling """ def __init__ (self, mode, fit, env, bounds, npop=60, npop_rl=6, init_pop_rl=True, hyperparam={}, seed=None): self.seed = seed set_neorl_seed(self.seed) assert npop_rl < npop, '--error: the size of RL individuals `npop_rl` MUST be less than `npop`' self.mode=mode self.bounds=bounds self.fit=fit self.env=env self.npop=npop self.npop_rl=npop_rl self.init_pop_rl=init_pop_rl #--mir self.mode=mode #infer variable types self.var_type = np.array([bounds[item][0] for item in bounds]) self.bounds=bounds #mir-grid if "grid" in self.var_type: self.grid_flag=True else: self.grid_flag=False self.dim = len(bounds) self.var_names=[item for item in self.bounds] self.hyperparam = hyperparam #ACKTR hyperparameters self.n_steps = hyperparam['n_steps'] if 'n_steps' in hyperparam else 20 self.gamma = hyperparam['gamma'] if 'gamma' in hyperparam else 0.99 self.ent_coef = hyperparam['ent_coef'] if 'ent_coef' in hyperparam else 0.01 self.learning_rate = hyperparam['learning_rate'] if 'learning_rate' in hyperparam else 0.25 self.vf_coef = hyperparam['vf_coef'] if 'vf_coef' in hyperparam else 0.25 self.vf_fisher_coef = hyperparam['vf_fisher_coef'] if 'vf_fisher_coef' in hyperparam else 1.0 self.max_grad_norm = hyperparam['max_grad_norm'] if 'max_grad_norm' in hyperparam else 0.5 self.kfac_clip = hyperparam['kfac_clip'] if 'kfac_clip' in hyperparam else 0.001 self.lr_schedule = hyperparam['lr_schedule'] if 'lr_schedule' in hyperparam else 'linear' #DE hyperparameters self.F = hyperparam['F'] if 'F' in hyperparam else 0.5 self.CR = hyperparam['CR'] if 'CR' in hyperparam else 0.3 #will be activated after using `learn` method self.ACKTR_RUN_FLAG=False
[docs] def learn(self, total_timesteps, rl_filter=100, verbose=False): """ This function starts the learning of ACKTR algorithm for number of timesteps to create individuals for evolutionary search :param total_timesteps: (int) number of timesteps to run :param rl_filter: (int) number of top individuals to keep from the full RL search :param verbose: (bool) print statistics to screen :return: (dataframe) dataframe of individuals/fitness sorted from best to worst """ self.ACKTR_RUN_FLAG=True print('---------------------------------------------------------------------------------') print('------------------------------- ACKTR-DE is Running -------------------------------') print('---------------------------------------------------------------------------------') try: ncores=len(self.env.get_attr('mode')) print('Paralell RL is running with {} cores'.format(ncores)) self.env.mode=self.env.get_attr('mode')[0] except: try: self.env.mode # ncores=1 print('Serial RL is running with {} core'.format(ncores)) except: self.env.mode = 'max' # or some other default value. print('--warning: Problem mode defined in the RL enviroment is', self.env.mode) print('--warning: Problem mode defined in the ACKDE class is', self.mode) if self.env.mode == self.mode: print('--warning: Both problem modes match') else: raise ValueError('The two problem modes do not match, alg terminates') print('------------------------------- Part I: ACKTR is collecting data -------------------------------') cb=RLLogger(check_freq=1) acktr = ACKTR(MlpPolicy, env=self.env, n_steps=self.n_steps, gamma=self.gamma, ent_coef=self.ent_coef, vf_coef=self.vf_coef, vf_fisher_coef=self.vf_fisher_coef, max_grad_norm=self.max_grad_norm, kfac_clip=self.kfac_clip, lr_schedule=self.lr_schedule, seed=self.seed, verbose=verbose) #run ACKTR acktr.learn(total_timesteps=total_timesteps, callback=cb) rl_data=pd.DataFrame(cb.x_hist, columns=self.var_names) #get the RL invidiuals assert len(cb.x_hist) == len(cb.r_hist), '--error: the length of reward hist ({}) and individual list ({}) must be the same, evolutionary run cannot continue'.format(len(cb.r_hist), len(cb.x_hist)) rl_data["score"]=cb.r_hist #append thier fitness/score as new column #sort the dataframe to filter the best if self.mode == 'min': self.sorted_df=rl_data.sort_values(['score'], axis='index', ascending=True) else: self.sorted_df=rl_data.sort_values(['score'], axis='index', ascending=False) #check the shape of RL data before filtering the top `rl_filter` if self.sorted_df.shape[0] < rl_filter: print('--warning: the number of samples collected by RL ({}) is less than rl_filter ({}), so all samples are passed to DE'.format(self.sorted_df.shape[0], rl_filter)) self.data=self.sorted_df.values[:,:-1] #get rid of the score column else: self.data=self.sorted_df.values[:rl_filter,:-1] #get rid of the score column if verbose: print('--Top 10 individuals found by the RL search') print(self.sorted_df.head(10)) #decode the data before using it with DE if self.grid_flag: for i in range(self.data.shape[0]): self.data[i,:]=encode_grid_individual_to_discrete(self.data[i,:], bounds=self.bounds) return self.sorted_df
[docs] def evolute(self, ngen, ncores=1, verbose=False): """ This function evolutes the DE algorithm for number of generations with guidance from RL individuals. :param ngen: (int) number of generations to evolute :param ncores: (int) number of parallel processors to use with DE :param verbose: (bool) print statistics to screen :return: (tuple) (best individual, best fitness, and a list of fitness history) """ print('------------------------------- Part II: DE is running and informed by ACKTR -------------------------------') if not self.ACKTR_RUN_FLAG: raise Exception('--error: The user is attempting to run DE before ACKTR, please use .learn first to leverage ACKTR, then use .evolute') rl_kwargs={'npop_rl': self.npop_rl, 'init_pop_rl': self.init_pop_rl, 'RLdata': self.data} if self.init_pop_rl: x0=[] for i in range(self.npop): idx=random.randint(0,self.data.shape[0]-1) x0.append(list(self.data[idx,:])) else: x0=None de=DE(mode=self.mode, bounds=self.bounds, fit=self.fit, npop=self.npop, F=self.F, CR=self.CR, ncores=ncores, int_transform='nearest_int', seed=self.seed, **rl_kwargs) x_best, y_best, de_hist=de.evolute(ngen=ngen, x0=x0, verbose=verbose) print('************************* ACKTR-DE Summary *************************') print('Best fitness (y) found:', x_best) print('Best individual (x) found:', y_best) print('******************************************************************') return x_best, y_best, de_hist