# This file is part of NEORL.
# Copyright (c) 2021 Exelon Corporation and MIT Nuclear Science and Engineering
# NEORL is free software: you can redistribute it and/or modify
# it under the terms of the MIT LICENSE
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -*- coding: utf-8 -*-
#"""
#Created on Mon Jun 15 19:37:04 2020
#
#@author: Majdi
#"""
import random
import numpy as np
from collections import defaultdict
import copy
import joblib
from neorl.evolu.crossover import cxES2point, cxESBlend
from neorl.evolu.discrete import mutate_discrete, encode_grid_to_discrete
from neorl.evolu.discrete import decode_discrete_to_grid, encode_grid_indv_to_discrete
from neorl.utils.seeding import set_neorl_seed
from neorl.utils.tools import get_population, check_mixed_individual
[docs]class ES:
"""
Parallel Evolution Strategies
:param mode: (str) problem type, either ``min`` for minimization problem or ``max`` for maximization
:param bounds: (dict) input parameter type and lower/upper bounds in dictionary form. Example: ``bounds={'x1': ['int', 1, 4], 'x2': ['float', 0.1, 0.8], 'x3': ['float', 2.2, 6.2]}``
:param lambda\_: (int) total number of individuals in the population
:param mu: (int): number of individuals to survive to the next generation, mu < lambda\_
:param cxmode: (str): the crossover mode, either 'cx2point' or 'blend'
:param alpha: (float) Extent of the blending between [0,1], the blend crossover randomly selects a child in the range [x1-alpha(x2-x1), x2+alpha(x2-x1)] (Only used for cxmode='blend')
:param cxpb: (float) population crossover probability between [0,1]
:param mutpb: (float) population mutation probability between [0,1]
:param smin: (float): minimum bound for the strategy vector
:param smax: (float): maximum bound for the strategy vector
:param ncores: (int) number of parallel processors
:param seed: (int) random seed for sampling
"""
def __init__ (self, mode, bounds, fit, lambda_=60, mu=30, cxmode='cx2point',
alpha=0.5, cxpb=0.6, mutpb=0.3, smin=0.01, smax=0.5, clip=True, ncores=1, seed=None, **kwargs):
set_neorl_seed(seed)
#-----------------------------------------------------
#a special block for RL-informed GA/ES
if 'npop_rl' in kwargs or 'init_pop_rl' in kwargs or 'RLdata' in kwargs:
print('--warning: npop_rl and init_pop_rl are passed to ES, so RL-informed ES mode is activated')
self.npop_rl=kwargs['npop_rl']
self.init_pop_rl=kwargs['init_pop_rl']
self.RLdata=kwargs['RLdata']
self.RLmode=True
else:
self.RLmode=False
#-----------------------------------------------------
self.bounds=bounds
self.nx=len(bounds.keys())
#--mir
self.mode=mode
if mode == 'max':
self.fit=fit
elif mode == 'min':
def fitness_wrapper(*args, **kwargs):
return -fit(*args, **kwargs)
self.fit=fitness_wrapper
else:
raise ValueError('--error: The mode entered by user is invalid, use either `min` or `max`')
self.ncores=ncores
self.smin=smin
self.smax=smax
self.cxpb=cxpb
self.mutpb=mutpb
self.alpha=alpha
self.mu=mu
self.lambda_=lambda_
self.cxmode=cxmode
self.clip=clip
if not self.cxmode in ['cx2point', 'blend']:
raise ValueError('--error: the cxmode selected (`{}`) is not available in ES, either choose `cx2point` or `blend`'.format(self.cxmode))
assert self.mu <= self.lambda_, "mu (selected population) must be less than lambda (full population)"
assert (self.cxpb + self.mutpb) <= 1.0, "The sum of the cxpb and mutpb must be smaller or equal to 1.0"
assert self.ncores >=1, "Number of cores must be more than or equal 1"
self.lb=[]
self.ub=[]
self.datatype=[]
#infer variable types
self.datatype = np.array([bounds[item][0] for item in bounds])
#mir-grid
if "grid" in self.datatype:
self.grid_flag=True
self.orig_bounds=bounds #keep original bounds for decoding
#print('--debug: grid parameter type is found in the space')
self.bounds, self.bounds_map=encode_grid_to_discrete(self.bounds) #encoding grid to int
#define var_types again by converting grid to int
self.datatype = np.array([self.bounds[item][0] for item in self.bounds])
else:
self.grid_flag=False
self.bounds = bounds
self.orig_bounds=bounds
self.lb = np.array([self.bounds[item][1] for item in self.bounds])
self.ub = np.array([self.bounds[item][2] for item in self.bounds])
def GenES(self, bounds):
#"""
#Individual generator
#Inputs:
# -bounds (dict): input paramter lower/upper bounds in dictionary form
#Returns
# -ind (list): an individual vector with values sampled from bounds
# -strategy (list): the strategy vector with values between smin and smax
#"""
content=[]
for key in bounds:
if bounds[key][0] == 'int':
content.append(random.randint(bounds[key][1], bounds[key][2]))
elif bounds[key][0] == 'float':
content.append(random.uniform(bounds[key][1], bounds[key][2]))
elif bounds[key][0] == 'grid':
content.append(random.sample(bounds[key][1],1)[0])
else:
raise Exception ('unknown data type is given, either int, float, or grid are allowed for parameter bounds')
ind=list(content)
strategy = [random.uniform(self.smin,self.smax) for _ in range(self.nx)]
return ind, strategy
def init_pop(self, x0=None, verbose=False):
#"""
#Population intializer
#Inputs:
# -warmup (int): number of individuals to create and evaluate initially
#Returns
# -pop (dict): initial population in a dictionary form, looks like this
#"""
#initialize the population and strategy and run them in parallel (these samples will be used to initialize the memory)
pop=defaultdict(list)
# dict key runs from 0 to warmup-1
# index 0: individual, index 1: strategy, index 2: fitness
# Example:
#"""
#pop={key: [ind, strategy, fitness]}
#pop={0: [[1,2,3,4,5], [0.1,0.2,0.3,0.4,0.5], 1.2],
# ...
# 99: [[1.1,2.1,3.1,4.1,5.1], [0.1,0.2,0.3,0.4,0.5], 5.2]}
#"""
if x0:
if verbose:
print('The first particle provided by the user:', x0[0])
print('The last particle provided by the user:', x0[-1])
for i in range(len(x0)):
check_mixed_individual(x=x0[i], bounds=self.orig_bounds) #assert the type provided is consistent
if self.grid_flag:
x_encoded=encode_grid_indv_to_discrete(x0[i], bounds=self.orig_bounds, bounds_map=self.bounds_map)
pop[i].append(x_encoded)
else:
pop[i].append(x0[i])
strategy = [random.uniform(self.smin,self.smax) for _ in range(self.nx)]
pop[i].append(strategy)
else:
for i in range (self.lambda_):
ind, strategy=self.GenES(self.bounds)
pop[i].append(ind)
pop[i].append(strategy)
if self.ncores > 1: #evaluate warmup in parallel
core_list=[]
for key in pop:
core_list.append(pop[key][0])
with joblib.Parallel(n_jobs=self.ncores) as parallel:
fitness=parallel(joblib.delayed(self.fit_worker)(item) for item in core_list)
[pop[ind].append(fitness[ind]) for ind in range(len(pop))]
else: #evaluate warmup in series
for key in pop:
fitness=self.fit_worker(pop[key][0])
pop[key].append(fitness)
return pop #return final pop dictionary with ind, strategy, and fitness
def ensure_bounds(self, vec): # bounds check
vec_new = []
for i, (key, val) in enumerate(self.bounds.items()):
# less than minimum
if vec[i] < self.bounds[key][1]:
vec_new.append(self.bounds[key][1])
# more than maximum
if vec[i] > self.bounds[key][2]:
vec_new.append(self.bounds[key][2])
# fine
if self.bounds[key][1] <= vec[i] <= self.bounds[key][2]:
vec_new.append(vec[i])
return vec_new
def ensure_discrete(self, ind):
ind=self.ensure_bounds(ind)
for i in range(len(ind)):
if self.datatype[i] == 'int':
ind[i] = int(ind[i])
return ind
def fit_worker(self, x):
#"""
#Evaluates fitness of an individual.
#"""
#mir-grid
if self.grid_flag:
#decode the individual back to the int/float/grid mixed space
x=decode_discrete_to_grid(x,self.orig_bounds,self.bounds_map)
fitness = self.fit(x)
return fitness
def select(self, pop, k=1):
#"""
#Select function sorts the population from max to min based on fitness and select k best
#Inputs:
# pop (dict): population in dictionary structure
# k (int): top k individuals are selected
#Returns:
# best_dict (dict): the new ordered dictionary with top k selected
#"""
pop=list(pop.items())
pop.sort(key=lambda e: e[1][2], reverse=True)
sorted_dict=dict(pop[:k])
#This block creates a new dict where keys are reset to 0 ... k in order to avoid unordered keys after sort
best_dict=defaultdict(list)
index=0
for key in sorted_dict:
best_dict[index].append(sorted_dict[key][0])
best_dict[index].append(sorted_dict[key][1])
best_dict[index].append(sorted_dict[key][2])
index+=1
sorted_dict.clear()
return best_dict
def mutES(self, ind, strat):
#"""Mutate an evolution strategy according to mixed Discrete/Continuous mutation rules
#attribute as described in [Li2013].
#The function mutates discrete/float variables according to their type as indicated in self.bounds
#.. Li, Rui, et al. "Mixed integer evolution strategies for parameter optimization."
# Evolutionary computation 21.1 (2013): 29-64.
#Inputs:
# -ind (list): individual to be mutated.
# -strat (list): individual strategy to be mutated.
#Returns:
# -ind (list): new individual after mutatation
# -strat (list): individual strategy after mutatation
#"""
# Infer the datatype, lower/upper bounds from self.bounds for flexible usage
size = len(ind)
tau=1/np.sqrt(2*size)
tau_prime=1/np.sqrt(2*np.sqrt(size))
ind=self.ensure_bounds(ind) #keep it for choice.remove(x) to work
for i in range(size):
#--------------------------
# Discrete ES Mutation
#--------------------------
if self.datatype[i] == 'int':
#check first the integers falls within lower/upper boundaries
norm=random.gauss(0,1)
# modify the ind strategy
strat[i] = 1/(1+(1-strat[i])/strat[i]*np.exp(-tau*norm-tau_prime*random.gauss(0,1)))
#make a transformation of strategy to ensure it is between smin,smax
y=(strat[i]-self.smin)/(self.smax-self.smin)
if np.floor(y) % 2 == 0:
y_prime=np.abs(y-np.floor(y))
else:
y_prime=1-np.abs(y-np.floor(y))
strat[i] = self.smin + (self.smax-self.smin) * y_prime
# check if this attribute is mutated based on the updated strategy
if random.random() < strat[i]:
if int(self.lb[i]) == int(self.ub[i]):
ind[i] = int(self.lb[i])
else:
# make a list of possiblities after excluding the current value to enforce mutation
choices=list(range(int(self.lb[i]),int(self.ub[i])+1))
choices.remove(int(ind[i]))
# randint is NOT used here since it could re-draw the same integer value, choice is used instead
ind[i] = random.choice(choices)
else:
ind[i]=int(ind[i])
#--------------------------
# Continuous ES Mutation
#--------------------------
elif self.datatype[i] == 'float':
norm=random.gauss(0,1)
strat[i] *= np.exp(tau*norm + tau_prime * random.gauss(0, 1)) #normal mutation of strategy
ind[i] += strat[i] * random.gauss(0, 1) # update the individual position
else:
raise Exception ('ES mutation strategy works with either int/float datatypes, the type provided cannot be interpreted')
ind=self.ensure_bounds(ind)
strat=list(np.clip(strat, self.smin, self.smax))
return ind, strat
def GenOffspring(self, pop):
#"""
#
#This function generates the offspring by applying crossover, mutation **or** reproduction.
#The sum of both probabilities self.cxpb and self.mutpb must be in [0,1]
#The reproduction probability is 1 - cxpb - mutpb
#The new offspring goes for fitness evaluation
#Inputs:
# pop (dict): population in dictionary structure
#Returns:
# offspring (dict): new modified population in dictionary structure
#"""
pop_indices=list(range(0,len(pop)))
offspring = defaultdict(list)
for i in range(self.lambda_):
rn = random.random()
#------------------------------
# Crossover
#------------------------------
if rn < self.cxpb:
index1, index2 = random.sample(pop_indices,2)
if self.cxmode.strip() =='cx2point':
ind1, ind2, strat1, strat2 = cxES2point(ind1=list(pop[index1][0]),ind2=list(pop[index2][0]),
strat1=list(pop[index1][1]),strat2=list(pop[index2][1]))
elif self.cxmode.strip() == 'blend':
ind1, ind2, strat1, strat2=cxESBlend(ind1=list(pop[index1][0]), ind2=list(pop[index2][0]),
strat1=list(pop[index1][1]),strat2=list(pop[index2][1]),
alpha=self.alpha)
else:
raise ValueError('--error: the cxmode selected (`{}`) is not available in ES, either choose `cx2point` or `blend`'.format(self.cxmode))
ind1=self.ensure_bounds(ind1)
ind2=self.ensure_bounds(ind2)
ind1=self.ensure_discrete(ind1) #check discrete variables after crossover
ind2=self.ensure_discrete(ind2) #check discrete variables after crossover
offspring[i].append(ind1)
offspring[i].append(strat1)
#print('crossover is done for sample {} between {} and {}'.format(i,index1,index2))
#------------------------------
# Mutation
#------------------------------
elif rn < self.cxpb + self.mutpb: # Apply mutation
index = random.choice(pop_indices)
ind, strat=self.mutES(ind=list(pop[index][0]), strat=list(pop[index][1]))
offspring[i].append(ind)
offspring[i].append(strat)
#print('mutation is done for sample {} based on {}'.format(i,index))
#------------------------------
# Reproduction from population
#------------------------------
else:
index=random.choice(pop_indices)
pop[index][0]=self.ensure_discrete(pop[index][0])
offspring[i].append(pop[index][0])
offspring[i].append(pop[index][1])
#print('reproduction is done for sample {} based on {}'.format(i,index))
if self.clip:
for item in offspring:
offspring[item][1]=list(np.clip(offspring[item][1], self.smin, self.smax))
return offspring
def mix_population(self, pop):
#function used to mix RL with ES population when RLmode is True.
kbs_append=False
indices=random.sample(range(self.RLdata.shape[0]),self.npop_rl)
last_index=list(pop.keys())[-1]
for i in range (len(indices)):
ind_vec=list(self.RLdata[indices[i],:])
if kbs_append: #append RL individuals to the population
strategy = [random.uniform(self.smin,self.smax) for _ in range(len(self.lb))]
kbs_ind=[ind_vec,strategy,0]
pop[last_index+(i+1)]=kbs_ind
else: #replace the worst individuals in pop with RL individuals
pop[last_index-i][0]=ind_vec
return pop
[docs] def evolute(self, ngen, x0=None, verbose=False):
"""
This function evolutes the ES algorithm for number of generations.
:param ngen: (int) number of generations to evolute
:param x0: (list of lists) the initial position of the swarm particles
:param verbose: (bool) print statistics to screen
:return: (tuple) (best individual, best fitness, and a list of fitness history)
"""
self.es_hist={}
self.es_hist['mean_strategy']=[]
self.y_opt=-np.inf
self.best_scores=[]
self.best_indvs=[]
if x0:
assert len(x0) == self.lambda_, '--error: the length of x0 ({}) (initial population) must equal to the size of lambda ({})'.format(len(x0), self.lambda_)
self.population=self.init_pop(x0=x0, verbose=verbose)
else:
self.population=self.init_pop(verbose=verbose)
# Begin the evolution process
for gen in range(1, ngen + 1):
# Vary the population and generate new offspring
offspring = self.GenOffspring(pop=self.population)
# Evaluate the individuals with an invalid fitness with multiprocessign Pool
# create and run the Pool
if self.ncores > 1:
core_list=[]
for key in offspring:
core_list.append(offspring[key][0])
with joblib.Parallel(n_jobs=self.ncores) as parallel:
fitness=parallel(joblib.delayed(self.fit_worker)(item) for item in core_list)
[offspring[ind].append(fitness[ind]) for ind in range(len(offspring))]
else: #serial calcs
for ind in range(len(offspring)):
fitness=self.fit_worker(offspring[ind][0])
offspring[ind].append(fitness)
# Select the next generation population
self.population = copy.deepcopy(self.select(pop=offspring, k=self.mu))
if self.RLmode: #perform RL informed ES
self.population=self.mix_population(self.population)
inds, rwd=[self.population[i][0] for i in self.population], [self.population[i][2] for i in self.population]
self.best_scores.append(np.max(rwd))
arg_max=np.argmax(rwd)
self.best_indvs.append(inds[arg_max])
if rwd[arg_max] > self.y_opt:
self.y_opt=rwd[arg_max]
self.x_opt=copy.deepcopy(inds[arg_max])
#--mir
if self.mode=='min':
self.y_opt_correct=-self.y_opt
else:
self.y_opt_correct=self.y_opt
#mir-grid
if self.grid_flag:
self.x_opt_correct=decode_discrete_to_grid(self.x_opt,self.orig_bounds,self.bounds_map)
else:
self.x_opt_correct=self.x_opt
mean_strategy=[np.mean(self.population[i][1]) for i in self.population]
self.es_hist['mean_strategy'].append(np.mean(mean_strategy))
if verbose:
print('##############################################################################')
print('ES step {}/{}, CX={}, MUT={}, MU={}, LAMBDA={}, Ncores={}'.format(gen*self.lambda_,ngen*self.lambda_, np.round(self.cxpb,2), np.round(self.mutpb,2), self.mu, self.lambda_, self.ncores))
print('##############################################################################')
print('Statistics for generation {}'.format(gen))
print('Best Fitness:', np.round(np.max(rwd),6) if self.mode == 'max' else -np.round(np.max(rwd),6))
print('Best Individual:', inds[0] if not self.grid_flag else decode_discrete_to_grid(inds[0],self.orig_bounds,self.bounds_map))
print('Max Strategy:', np.round(np.max(mean_strategy),3))
print('Min Strategy:', np.round(np.min(mean_strategy),3))
print('Average Strategy:', np.round(np.mean(mean_strategy),3))
print('##############################################################################')
if verbose:
print('------------------------ ES Summary --------------------------')
print('Best fitness (y) found:', self.y_opt_correct)
print('Best individual (x) found:', self.x_opt_correct)
print('--------------------------------------------------------------')
#--mir return the last population for restart calculations
if self.grid_flag:
self.es_hist['last_pop'] = get_population(offspring, grid_flag=True,
bounds=self.orig_bounds, bounds_map=self.bounds_map)
else:
self.es_hist['last_pop'] = get_population(offspring, grid_flag=False)
if self.mode == 'min':
self.best_scores=[-item for item in self.best_scores]
self.es_hist['global_fitness'] = np.minimum.accumulate(self.best_scores)
self.es_hist['last_pop']['fitness'] *= -1
else:
self.es_hist['global_fitness'] = np.maximum.accumulate(self.best_scores)
self.es_hist['local_fitness'] = self.best_scores
return self.x_opt_correct, self.y_opt_correct, self.es_hist