Source code for neorl.rl.make_env

#    This file is part of NEORL.

#    Copyright (c) 2021 Exelon Corporation and MIT Nuclear Science and Engineering
#    NEORL is free software: you can redistribute it and/or modify
#    it under the terms of the MIT LICENSE

#    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
#    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
#    SOFTWARE.

# -*- coding: utf-8 -*-
#"""
#Created on Mon Jul 12 19:12:55 2021
#
#@author: majdi
#"""

import sys, uuid
from neorl.evolu.discrete import encode_grid_to_discrete, decode_discrete_to_grid
from neorl.rl.baselines.shared import set_global_seeds
from neorl.rl.baselines.shared.vec_env import SubprocVecEnv
import numpy as np
import gym
from gym.spaces import Box, MultiDiscrete, Discrete
import random


def globalize(func):
   #"""
   #multiprocessing trick to paralllelize nested functions in python 
   #(un-picklable objects!)
   #"""
   def result(*args, **kwargs):
       return -func(*args, **kwargs)
   result.__name__ = result.__qualname__ = uuid.uuid4().hex
   setattr(sys.modules[result.__module__], result.__name__, result)
   return result

def action_map(norm_action, ub, lb, ub_norm, lb_norm):
    #"""
    #Map a nromalized action `norm_action` from a small range [lb_norm, ub_norm] 
    #to a new range [lb, ub].
    #Ex: Convert norm_action from [-1, 1] to [-100, 100]
    #"""
    d=len(norm_action)
    NormRange = np.array([ub_norm-lb_norm]*d)
    OrigRange = ub - lb
    NormMin=np.array([lb_norm]*d)  
    OrigMin = lb
    new_action = ((norm_action - NormMin) * OrigRange / NormRange) + OrigMin
    
    return new_action
            
def ensure_discrete(action, var_type):
    #"""
    #Check the variables in a vector `vec` and convert the discrete ones to integer
    #based on their type saved in `var_type`
    #"""
    vec=[]
    for dim in range(len(action)):
        if var_type[dim] == 'int':
            vec.append(int(action[dim]))
        else:
            vec.append(action[dim])
    return vec


def convert_multidiscrete_discrete(bounds):
    #"""
    #For DQN/ACER, convert the multidiscrete vector to a single discrete one
    #to be compatible with DQN/ACER.
    #Input: Provide the bounds dict for all variables.
    #"""
    discrete_list=[]
    for item in bounds:
        discrete_list = discrete_list + list(range(bounds[item][1],bounds[item][2]+1))
    space=list(set(discrete_list))
    
    bounds_map={}
    for i in range(len(space)):
        bounds_map[i]=space[i]
        
    return bounds_map

def convert_actions_multidiscrete(bounds):
    #"""
    #For PPO/ACKTR/A2C, convert the action provided by the user to a multidiscrete vector
    #to be compatible with OpenAI gym multidiscrete space.
    #Input: Provide the bounds dict for all variables.
    #Returns: action_bounds (list) for encoding and bounds_map (dict) for decoding
    #"""
    action_bounds=[]
    
    for item in bounds:
        action_bounds.append(len(list(range(bounds[item][1],bounds[item][2]+1))))
    
    bounds_map={}
    for var, act in zip(bounds,action_bounds):
        bounds_map[var]={}
        act_list=list(range(bounds[var][1],bounds[var][2]+1))
        for i in range(act):
            bounds_map[var][i] = act_list[i]
    
    return action_bounds, bounds_map 

def convert_multidiscrete_actions(action, int_bounds_map):
    #"""
    #For PPO/ACKTR/A2C, convert the action in multidiscrete form 
    #to the real action space defined by the user
    #Input: Provide the action in multidiscrete, and the integer bounds map
    #Returns: decoded action (list)
    #"""
    decoded_action=[]
    
    for act, key in zip(action, int_bounds_map):
        decoded_action.append(int_bounds_map[key][act])
        
    return decoded_action 

class BaseEnvironment(gym.Env):
    #"""
    #A module to construct a fitness environment for certain algorithms 
    #that follow reinforcement learning approach of optimization
    #
    #:param method: (str) the supported algorithms, choose either: ``dqn``, ``ppo``, ``acktr``, ``acer``, ``a2c``, ``rneat``, ``fneat``
    #:param fit: (function) the fitness function
    #:param bounds: (dict) input parameter type and lower/upper bounds in dictionary form. Example: ``bounds={'x1': ['int', 1, 4], 'x2': ['float', 0.1, 0.8], 'x3': ['float', 2.2, 6.2]}``
    #:param mode: (str) problem type, either ``min`` for minimization problem or ``max`` for maximization (RL is default to ``max``)
    #:param episode_length: (int): number of individuals to evaluate before resetting the environment to random initial guess. 
    #"""
    def __init__(self, method, fit, bounds, mode='max', episode_length=50):

        if method not in ['ppo', 'a2c', 'acer', 'acktr', 'dqn', 'neat', 'rneat', 'fneat']:
            raise ValueError ('--error: unknown RL method is provided, choose from: ppo, a2c, acer, acktr, dqn, neat or rneat, fneat')
        self.episode_length=episode_length
        self.var_type = np.array([bounds[item][0] for item in bounds])
        self.nx=len(bounds)
        self.method=method
        
        #mir-grid
        if "grid" in self.var_type:
            self.grid_flag=True
            self.orig_bounds=bounds  #keep original bounds for decoding
            print('--debug: grid parameter type is found in the space')
            self.bounds, self.bounds_map=encode_grid_to_discrete(bounds) #encoding grid to int
            #define var_types again by converting grid to int
            self.var_type = np.array([self.bounds[item][0] for item in self.bounds])
        else:
            self.grid_flag=False
            self.bounds = bounds

        self.lb=np.array([self.bounds[item][1] for item in self.bounds])
        self.ub=np.array([self.bounds[item][2] for item in self.bounds])
        
        #for PPO/A2C/ACKTR gaussian policy, keep action range normalized, then map it back
        #Recommended---> -1/1 or -2/2        
        self.lb_norm=-1
        self.ub_norm=1
        if all([item == 'int' for item in self.var_type]):   #discrete optimization
            if method in ['ppo', 'a2c', 'acktr', 'rneat', 'fneat']:
                self.action_bounds, self.int_bounds_map=convert_actions_multidiscrete(self.bounds)
                self.action_space = MultiDiscrete(self.action_bounds)
                self.observation_space = Box(low=self.lb, high=self.ub, dtype=int)
                self.cont_map_flag=False
                self.int_map_flag=True
            elif method in ['acer', 'dqn']:
                self.discrete_map=convert_multidiscrete_discrete(self.bounds)
                self.action_space = Discrete(len(self.discrete_map))
                self.observation_space = Box(low=self.lb, high=self.ub, dtype=int)
                self.cont_map_flag=False
                self.int_map_flag=False
        else:
            if method in ['ppo', 'a2c', 'acktr', 'rneat', 'fneat']:
                self.action_space = Box(low=self.lb_norm, high=self.ub_norm, shape=(self.nx,))
                self.observation_space = Box(low=self.lb, high=self.ub)
                self.cont_map_flag=True
                self.int_map_flag=False
            elif method in ['acer', 'dqn']:
                self.cont_map_flag=False
                self.int_map_flag=False
                raise Exception ('--error: the method {} does not support continuous spaces, please use ppo, a2c, or acktr'.format(method))
                
        #--mir
        self.mode=mode
        if mode == 'max':
            self.fit=fit
        elif mode == 'min':
            self.fit = globalize(lambda x: fit(x))  #use the function globalize to serialize the nested fit
        else:
            raise ValueError('--error: The mode entered by user is invalid, use either `min` or `max`')
        
        self.reset()
        self.done=False
        self.counter = 0
        self.index=0

    def seed(self, seed_id):
        np.random.seed(seed_id)
        random.seed(seed_id)
        
    def step(self, action):
        state, action, reward=self.action_mapper(action)
        self.counter += 1
        if self.counter == self.episode_length:
            self.done=True
            self.counter = 0
        
        #print(state, action, reward)
        return state, reward, self.done, {'x':action}
    
    def reset(self):
        self.done=False
        if self.method in ['ppo', 'a2c', 'acktr', 'rneat', 'fneat']:
            init_state=self.action_space.sample()
        elif self.method in ['acer', 'dqn']:
            init_state = self.observation_space.sample()
            self.full_action=init_state.copy()
        else:
            pass
        return init_state

    def render(self, mode='human'):
        pass
    
    def action_mapper(self, action):
        
        if self.method in ['ppo', 'a2c', 'acktr', 'rneat', 'fneat']:
            #--------------------------
            # cont./discrete methods
            #---------------------------
            if self.cont_map_flag:
                action=action_map(norm_action=action, 
                                  lb=self.lb, ub=self.ub, 
                                  lb_norm=self.lb_norm, ub_norm=self.ub_norm)
                
            if self.int_map_flag:  #this flag converts multidiscrete action to the real space
                action=convert_multidiscrete_actions(action=action, int_bounds_map=self.int_bounds_map)
                
            if 'int' in self.var_type:
                action=ensure_discrete(action=action, var_type=self.var_type)
            
            action = self.ensure_bounds(action)   #checking the bounds
            
            if self.grid_flag:
                #decode the individual back to the int/float/grid mixed space
                decoded_action=decode_discrete_to_grid(action,self.orig_bounds,self.bounds_map)
                reward=self.fit(decoded_action)  #calc reward based on decoded action
                state=action.copy()   #save the state as the original undecoded action (for further procecessing)
                action=decoded_action  #now for logging, return the action as the decoded action
            else:
                #calculate reward and use state as action
                reward=self.fit(action)  
                state=action.copy()
                
            
        elif self.method in ['acer', 'dqn']:
            #--------------------------
            # discrete methods
            #---------------------------
            if self.index < self.nx:
                decoded_action=self.discrete_map[action]
                
                if decoded_action >= self.lb[self.index] and decoded_action <= self.ub[self.index]:
                    self.full_action[self.index]=decoded_action
                else:
                    #print(self.index, decoded_action, 'random guess')
                    self.full_action[self.index]=random.randint(self.lb[self.index],self.ub[self.index] )
                self.index += 1
            else:
                self.index = 0   #start a new loop over the individual
                self.full_action=self.observation_space.sample()
    
            if self.grid_flag:
                #decode the individual back to the int/float/grid mixed space
                self.decoded_action=decode_discrete_to_grid(self.full_action,self.orig_bounds,self.bounds_map) #convert integer to categorical
                reward=self.fit(self.decoded_action)  #calc reward based on decoded action
                state=self.full_action.copy()        #save the state as the original undecoded action (for further procecessing)   
                action=self.decoded_action.copy()   #now for logging, return the action as the decoded action
            else:
                action=self.full_action.copy()   #returned the full action for logging
                reward=self.fit(action)          #calc reward based on the action (float + int)
                state=action.copy()              #save the state as the action
            
        return state, action, reward

    def ensure_bounds(self, vec): # bounds check
    
        vec_new = []
    
        for i, (key, val) in enumerate(self.bounds.items()):
            # less than minimum 
            if vec[i] < self.bounds[key][1]:
                vec_new.append(self.bounds[key][1])
            # more than maximum
            if vec[i] > self.bounds[key][2]:
                vec_new.append(self.bounds[key][2])
            # fine
            if self.bounds[key][1] <= vec[i] <= self.bounds[key][2]:
                vec_new.append(vec[i])
        
        return np.array(vec_new)
    
    

[docs]def CreateEnvironment(method, fit, bounds, ncores=1, mode='max', episode_length=50): """ A module to construct a fitness environment for certain algorithms that follow reinforcement learning approach of optimization :param method: (str) the supported algorithms, choose either: ``dqn``, ``ppo``, ``acktr``, ``acer``, ``a2c``. :param fit: (function) the fitness function :param bounds: (dict) input parameter type and lower/upper bounds in dictionary form. Example: ``bounds={'x1': ['int', 1, 4], 'x2': ['float', 0.1, 0.8], 'x3': ['float', 2.2, 6.2]}`` :param ncores: (int) number of parallel processors :param mode: (str) problem type, either ``min`` for minimization problem or ``max`` for maximization (RL is default to ``max``) :param episode_length: (int): number of individuals to evaluate before resetting the environment to random initial guess. """ def make_env(rank, seed=0): #""" #Utility function for multiprocessed env. # #:param num_env: (int) the number of environment you wish to have in subprocesses #:param seed: (int) the inital seed for RNG #:param rank: (int) index of the subprocess #""" def _init(): env=BaseEnvironment(method=method, fit=fit, bounds=bounds, mode=mode, episode_length=episode_length) env.seed(seed + rank) return env set_global_seeds(seed) return _init if ncores > 1: env = SubprocVecEnv([make_env(i) for i in range(ncores)]) else: env=BaseEnvironment(method=method, fit=fit, bounds=bounds, mode=mode, episode_length=episode_length) return env