# This file is part of NEORL.
# Copyright (c) 2021 Exelon Corporation and MIT Nuclear Science and Engineering
# NEORL is free software: you can redistribute it and/or modify
# it under the terms of the MIT LICENSE
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -*- coding: utf-8 -*-
#Created on Sun Jun 14 13:45:54 2020
#@author: Majdi Radaideh
import random
import numpy as np
import joblib
from neorl.evolu.discrete import mutate_discrete, encode_grid_to_discrete
from neorl.evolu.discrete import decode_discrete_to_grid, encode_grid_indv_to_discrete
from neorl.utils.seeding import set_neorl_seed
from neorl.utils.tools import get_population, check_mixed_individual
[docs]class DE:
"""
Parallel Differential Evolution
:param mode: (str) problem type, either "min" for minimization problem or "max" for maximization
:param bounds: (dict) input parameter type and lower/upper bounds in dictionary form. Example: ``bounds={'x1': ['int', 1, 4], 'x2': ['float', 0.1, 0.8], 'x3': ['float', 2.2, 6.2]}``
:param fit: (function) the fitness function
:param npop: (int) number of individuals in the population
:param F: (float) differential/mutation weight between [0,2]
:param CR: (float) crossover probability between [0,1]
:param int_transform: (str): method of handling int/discrete variables, choose from: ``nearest_int``, ``sigmoid``, ``minmax``.
:param ncores: (int) number of parallel processors
:param seed: (int) random seed for sampling
"""
def __init__ (self, mode, bounds, fit, npop=50, F=0.5, CR=0.3,
int_transform='nearest_int', ncores=1, seed=None, **kwargs):
self.seed=seed
set_neorl_seed(self.seed)
#-----------------------------------------------------
#a special block for RL-informed DE
if 'npop_rl' in kwargs or 'init_pop_rl' in kwargs or 'RLdata' in kwargs:
print('--warning: npop_rl and init_pop_rl are passed to DE, so RL-informed DE mode is activated')
self.npop_rl=kwargs['npop_rl']
self.init_pop_rl=kwargs['init_pop_rl']
self.RLdata=kwargs['RLdata']
self.RLmode=True
else:
self.RLmode=False
#-----------------------------------------------------
assert npop > 4, '--error: size of npop must be more than 4'
self.npop=npop
self.bounds=bounds
self.ncores=ncores
#--mir
self.mode=mode
if mode == 'max':
self.fit=fit
elif mode == 'min':
def fitness_wrapper(*args, **kwargs):
return -fit(*args, **kwargs)
self.fit=fitness_wrapper
else:
raise ValueError('--error: The mode entered by user is invalid, use either `min` or `max`')
self.int_transform=int_transform
if int_transform not in ["nearest_int", "sigmoid", "minmax"]:
raise ValueError('--error: int_transform entered by user is invalid, must be `nearest_int`, `sigmoid`, or `minmax`')
self.F=F
self.CR=CR
#infer variable types
self.var_type = np.array([bounds[item][0] for item in bounds])
#mir-grid
if "grid" in self.var_type:
self.grid_flag=True
self.orig_bounds=bounds #keep original bounds for decoding
#print('--debug: grid parameter type is found in the space')
self.bounds, self.bounds_map=encode_grid_to_discrete(self.bounds) #encoding grid to int
#define var_types again by converting grid to int
self.var_type = np.array([self.bounds[item][0] for item in self.bounds])
else:
self.grid_flag=False
self.bounds = bounds
self.orig_bounds=bounds
self.dim = len(bounds)
self.lb=[self.bounds[item][1] for item in self.bounds]
self.ub=[self.bounds[item][2] for item in self.bounds]
def ensure_bounds(self, vec):
vec_new = []
# cycle through each variable in vector
for i, (key, val) in enumerate(self.bounds.items()):
# variable exceedes the minimum boundary
if vec[i] < self.bounds[key][1]:
vec_new.append(self.bounds[key][1])
# variable exceedes the maximum boundary
if vec[i] > self.bounds[key][2]:
vec_new.append(self.bounds[key][2])
# the variable is fine
if self.bounds[key][1] <= vec[i] <= self.bounds[key][2]:
vec_new.append(vec[i])
return vec_new
def GenIndv(self, bounds):
#"""
#Particle generator
#Input:
# -bounds (dict): input paramter type and lower/upper bounds in dictionary form
#Returns:
# -particle (list): particle position
# -speed (list): particle speed
#"""
indv=[]
for key in bounds:
if bounds[key][0] == 'int':
indv.append(random.randint(bounds[key][1], bounds[key][2]))
elif bounds[key][0] == 'float':
indv.append(random.uniform(bounds[key][1], bounds[key][2]))
#elif bounds[key][0] == 'grid':
# indv.append(random.sample(bounds[key][1],1)[0])
else:
raise Exception ('unknown data type is given, either int, float, or grid are allowed for parameter bounds')
return indv
def InitPopulation(self, x0=None, verbose = False):
pop=[]
#Establish the swarm
if x0:
if verbose:
print('The first individual provided by the user:', x0[0])
print('The last individual provided by the user:', x0[-1])
for i in range(self.npop):
check_mixed_individual(x=x0[i], bounds=self.orig_bounds) #assert the type provided is consistent
if self.grid_flag:
pop.append(encode_grid_indv_to_discrete(x0[i], bounds=self.orig_bounds, bounds_map=self.bounds_map))
else:
pop.append(x0[i])
else:
for i in range (self.npop):
indv=self.GenIndv(self.bounds)
pop.append(indv)
return pop
def fit_worker(self, x):
# Clip the wolf with position outside the lower/upper bounds and return same position
x=self.ensure_bounds(x)
if self.grid_flag:
#decode the individual back to the int/float/grid mixed space
x=decode_discrete_to_grid(x,self.orig_bounds,self.bounds_map)
# Calculate objective function for each search agent
fitness = self.fit(x)
return fitness
def ensure_discrete(self, vec):
#"""
#to mutate a vector if discrete variables exist
#Params:
#vec - position in vector/list form
#Return:
#vec - updated position vector with discrete values
#"""
for dim in range(self.dim):
if self.var_type[dim] == 'int':
vec[dim] = mutate_discrete(x_ij=vec[dim],
x_min=min(vec),
x_max=max(vec),
lb=self.lb[dim],
ub=self.ub[dim],
alpha=self.b,
method=self.int_transform,
)
return vec
def mix_population(self, pop, scores):
fit_lst=np.array(scores)
worst_index=fit_lst.argsort()[:self.npop_rl]
rl_indices=random.sample(range(self.RLdata.shape[0]),self.npop_rl)
for i, idx in enumerate(worst_index):
#print(pop[idx], fit_lst[idx])
pop[idx] = list(self.RLdata[rl_indices[i],:])
return pop
[docs] def evolute(self, ngen, x0=None, verbose=False):
"""
This function evolutes the DE algorithm for number of generations.
:param ngen: (int) number of generations to evolute
:param x0: (list of lists) the initial individuals of the population
:param verbose: (bool) print statistics to screen
:return: (tuple) (best individual, best fitness, and a list of fitness history)
"""
set_neorl_seed(self.seed)
self.de_hist={}
#--- INITIALIZE the population
if x0:
assert len(x0) == self.npop, '--error: the length of x0 ({}) (initial population) must equal to number of individuals npop ({})'.format(len(x0), self.npop)
self.population = self.InitPopulation(x0=x0, verbose=verbose)
else:
self.population = self.InitPopulation(verbose=verbose)
# loop through all generations
self.best_scores=[]
for gen in range(1,ngen+1):
#print(population)
gen_scores = [] # score keeping
x_t_lst=[]
v_trial_lst = []
# cycle through each individual in the population
for j in range(0, self.npop):
self.b= 1 - j * ((1) / ngen) #mir: b decreases linearly between 1 to 0, for discrete mutation
#-----------------------------
#Mutation
#-----------------------------
# select three random vector index positions [0, popsize), not including current vector (j)
candidates = list(range(0, self.npop))
candidates.remove(j)
random_index = random.sample(candidates, 3)
x_1 = self.population[random_index[0]]
x_2 = self.population[random_index[1]]
x_3 = self.population[random_index[2]]
x_t = self.population[j] # target individual
# subtract x3 from x2, and create a new vector (x_diff)
x_diff = [x_2_i - x_3_i for x_2_i, x_3_i in zip(x_2, x_3)]
# multiply x_diff by the mutation factor (F) and add to x_1
v_donor = [x_1_i + self.F * x_diff_i for x_1_i, x_diff_i in zip(x_1, x_diff)]
v_donor = self.ensure_bounds(v_donor) #XXX check this line
#-----------------------------
#Recombination
#-----------------------------
v_trial = []
for k in range(len(x_t)):
crossover = random.random()
if crossover <= self.CR:
v_trial.append(v_donor[k])
else:
v_trial.append(x_t[k])
x_t=self.ensure_discrete(x_t)
v_trial=self.ensure_discrete(v_trial)
x_t_lst.append(x_t)
v_trial_lst.append(v_trial)
#--------------------------------
#paralell evaluation
#--------------------------------
if self.ncores > 1:
with joblib.Parallel(n_jobs=self.ncores) as parallel:
score_trial_lst=parallel(joblib.delayed(self.fit_worker)(item) for item in v_trial_lst)
score_target_lst=parallel(joblib.delayed(self.fit_worker)(item) for item in x_t_lst)
else:
score_trial_lst=[]
score_target_lst=[]
for item in v_trial_lst:
score_trial_lst.append(self.fit_worker(item))
for item in x_t_lst:
score_target_lst.append(self.fit_worker(item))
#-----------------------------
#Selection
#-----------------------------
index=0
for (score_trial, score_target, v_trial) in zip(score_trial_lst, score_target_lst, v_trial_lst):
if score_trial > score_target:
self.population[index] = v_trial
gen_scores.append(score_trial)
else:
gen_scores.append(score_target)
index+=1
#-----------------------------
#Fitness saving
#-----------------------------
gen_avg = sum(gen_scores) / self.npop # current generation avg. fitness
y_best = max(gen_scores) # fitness of best individual
x_best = self.population[gen_scores.index(max(gen_scores))] # solution of best individual
self.best_scores.append(y_best)
if self.RLmode:
self.population=self.mix_population(pop=self.population, scores=gen_scores)
#--mir
if self.mode=='min':
y_best_correct=-y_best
gen_avg=-gen_avg
else:
y_best_correct=y_best
if verbose:
print('************************************************************')
print('DE step {}/{}, F={}, CR={}, Ncores={}'.format(gen*self.npop, ngen*self.npop, self.F, self.CR, self.ncores))
print('************************************************************')
print('Best fitness:', np.round(y_best_correct,6))
if self.grid_flag:
x_decoded = decode_discrete_to_grid(x_best, self.orig_bounds, self.bounds_map)
print('Best individual:', x_decoded)
else:
print('Best individual:', x_best)
print('Average fitness:', np.round(gen_avg,6))
print('************************************************************')
#mir-grid
if self.grid_flag:
x_best_correct = decode_discrete_to_grid(x_best, self.orig_bounds, self.bounds_map)
else:
x_best_correct = x_best
if verbose:
print('------------------------ DE Summary --------------------------')
print('Best fitness (y) found:', y_best_correct)
print('Best individual (x) found:', x_best_correct)
print('--------------------------------------------------------------')
#---update final logger
#--mir return the last population for restart calculations
if self.grid_flag:
self.de_hist['last_pop'] = get_population(self.population, fits=gen_scores, grid_flag=True,
bounds=self.orig_bounds, bounds_map=self.bounds_map)
else:
self.de_hist['last_pop'] = get_population(self.population, fits=gen_scores, grid_flag=False)
if self.mode == 'min':
self.best_scores=[-item for item in self.best_scores]
self.de_hist['global_fitness'] = np.minimum.accumulate(self.best_scores)
self.de_hist['last_pop']['fitness'] *= -1
else:
self.de_hist['global_fitness'] = np.maximum.accumulate(self.best_scores)
self.de_hist['local_fitness'] = self.best_scores
return x_best_correct, y_best_correct, self.de_hist