# This file is part of NEORL.
# Copyright (c) 2021 Exelon Corporation and MIT Nuclear Science and Engineering
# NEORL is free software: you can redistribute it and/or modify
# it under the terms of the MIT LICENSE
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -*- coding: utf-8 -*-
#"""
#Created on Sun Jun 14 13:45:54 2020
#
#@author: Majdi and Paul
#"""
import random
import math
import numpy as np
import copy
import joblib
from neorl.evolu.discrete import mutate_discrete, encode_grid_to_discrete
from neorl.evolu.discrete import decode_discrete_to_grid, encode_grid_indv_to_discrete
from neorl.utils.seeding import set_neorl_seed
from neorl.utils.tools import get_population, check_mixed_individual
[docs]class SA:
"""
Parallel Simulated Annealing
:param mode: (str) problem type, either ``min`` for minimization problem or ``max`` for maximization
:param bounds: (dict) input parameter type and lower/upper bounds in dictionary form. Example: ``bounds={'x1': ['int', 1, 4], 'x2': ['float', 0.1, 0.8], 'x3': ['float', 2.2, 6.2]}``
:param fit: (function) the fitness function
:param cooling: (str) cooling schedule, choose ``fast``, ``boltzmann``, ``cauchy``,``equilibrium``. The ``equilibrium`` mode is only valid with ``ncores > 1`` (See **Notes** below)
:param chain_size: (int) number of individuals to evaluate in the chain every generation (e.g. like ``npop`` for other algorithms)
:param Tmax: (int) initial/maximum temperature
:param Tmin: (int) final/minimum temperature
:param chi: (float or list of floats) probability of perturbing every attribute of the input ``x``, ONLY used if ``move_func=None``.
For ``ncores > 1``, if a scalar is provided, constant value is used across all ``ncores``. If a list of size ``ncores``
is provided, each core/chain uses different value of ``chi`` (See **Notes** below)
:param move_func: (function) custom self-defined function that controls how to perturb the input space during annealing (See **Notes** below)
:param reinforce_best: (str) an option to control the starting individual of the chain at every generation. Choose ``None``, ``hard``, ``soft`` (See **Notes** below).
:param lmbda: (float) ONLY used if ``cooling = equilibrium``, control the cooling rate, and the speed at which the algorithm converges.
:param alpha: (float) ONLY used if ``cooling = equilibrium``, control the initial temperature of the cooling schedule.
:param threshold: (float) ONLY used if ``cooling = equilibrium``. The threshold (in %) for the acceptance rate of solution under which the algorithm stops running.
:param ncores: (int) number of parallel processors (``ncores > 1`` is required for ``cooling = equilibrium``)
:param seed: (int) random seed for sampling
"""
def __init__ (self, mode, bounds, fit, cooling='fast', chain_size=10,
Tmax=10000, Tmin=1, chi=0.1, move_func=None, reinforce_best='soft',
lmbda = 1.5, alpha = 1.5, threshold = 10 ,ncores=1, seed=None):
set_neorl_seed(seed)
self.seed=seed
#--mir
self.mode=mode
if mode == 'max':
self.fit=fit
elif mode == 'min':
def fitness_wrapper(*args, **kwargs):
return -fit(*args, **kwargs)
self.fit=fitness_wrapper
else:
raise ValueError('--error: The mode entered by user is invalid, use either `min` or `max`')
self.bounds=bounds
self.reinforce_best=reinforce_best
self.ncores=ncores
self.npop=chain_size
self.threshold = threshold
#assert npop % self.ncores == 0, 'The number of population (npop) to run must be divisible by ncores, {} mod {} != 0'.format(npop,self.ncores)
self.Tmax=Tmax
self.Tmin=Tmin
if isinstance(chi, list):
assert len(chi) == ncores, 'The list of chi values ({}) MUST equal ncores ({})'.format(len(chi),self.ncores)
self.chi=chi
elif type(chi) == float or type(chi) == int:
self.chi=[chi]*ncores
else:
raise Exception ('for chi, either list of floats or scalar float are allowed')
assert cooling in ['fast', 'boltzmann', 'cauchy', 'equilibrium'], '--error: invalid cooling is provided'
assert reinforce_best in ['soft', 'hard', None], '--error: invalid `reinforce_best` option is provided'
self.cooling=cooling
self.equilib_deactivate=False
if self.cooling == 'equilibrium' and self.ncores == 1:#Paul. The equilibrium cooling is only available with multiple chains working at the same time.
print("-- warning: equilibrium cooling is implemented ONLY for ncores > 1. The cooling is changed to default cooling --> 'fast'")
self.cooling = 'fast'
self.equilib_deactivate=True
if not self.cooling=='equilibrium':# Paul
self.T=Tmax #initialize T
else:
self.lmbda = lmbda
self.alpha = alpha
self.move_func=move_func
if self.move_func is None:
self.move=self.def_move
else:
self.move=move_func
#infer variable types
self.var_type = np.array([bounds[item][0] for item in bounds])
#mir-grid
if "grid" in self.var_type:
self.grid_flag=True
self.orig_bounds=bounds #keep original bounds for decoding
#print('--debug: grid parameter type is found in the space')
self.bounds, self.bounds_map=encode_grid_to_discrete(self.bounds) #encoding grid to int
#define var_types again by converting grid to int
self.var_type = np.array([self.bounds[item][0] for item in self.bounds])
else:
self.grid_flag=False
self.bounds = bounds
self.orig_bounds=bounds
self.dim = len(bounds)
self.lb=[self.bounds[item][1] for item in self.bounds]
self.ub=[self.bounds[item][2] for item in self.bounds]
def GenInd(self, bounds):
#"""
#Individual generator
#Input:
# -bounds (dict): input paramter type and lower/upper bounds in dictionary form
#Returns:
# -individual (list): individual position
#"""
content=[]
for key in bounds:
if bounds[key][0] == 'int':
content.append(random.randint(bounds[key][1], bounds[key][2]))
elif bounds[key][0] == 'float':
content.append(random.uniform(bounds[key][1], bounds[key][2]))
elif bounds[key][0] == 'grid':
content.append(random.sample(bounds[key][1],1)[0])
else:
raise Exception ('unknown data type is given, either int, float, or grid are allowed for parameter bounds')
ind=list(content)
return ind
def sampler(self,bound):
#"""
#This function takes input as [type, low, high] and returns a sample
#This is to sample the input parameters during optimisation
#"""
if bound[0] == 'int':
sample=random.randint(bound[1], bound[2])
elif bound[0] == 'float':
sample=random.uniform(bound[1], bound[2])
elif bound[0] == 'grid':
sample=random.sample(bound[1],1)[0]
else:
raise Exception ('unknown data type is given, either int, float, or grid are allowed for parameter bounds')
return sample
def ensure_bounds(self, vec): # bounds check
vec_new = []
for i, (key, val) in enumerate(self.bounds.items()):
# less than minimum
if vec[i] < self.bounds[key][1]:
vec_new.append(self.bounds[key][1])
# more than maximum
if vec[i] > self.bounds[key][2]:
vec_new.append(self.bounds[key][2])
# fine
if self.bounds[key][1] <= vec[i] <= self.bounds[key][2]:
vec_new.append(vec[i])
return vec_new
def fit_worker(self, x):
#This worker is for parallel calculations of the GWO
# Clip the wolf with position outside the lower/upper bounds and return same position
x=self.ensure_bounds(x)
if self.grid_flag:
#decode the individual back to the int/float/grid mixed space
x=decode_discrete_to_grid(x,self.orig_bounds,self.bounds_map)
# Calculate objective function for each search agent
fitness = self.fit(x)
return fitness
def def_move(self, x, chi):
#"""
#This function is to perturb x attributes by probability chi
#Inputs:
# x: input vector
# chi: perturbation probablity between 0 and 1
#Returns: perturbed vector x
#"""
i=0
x_new=x.copy()
for item in self.bounds:
if random.random() < chi:
sample = self.sampler(self.bounds[item])
while x_new[i] == sample and (self.bounds[item][1] != self.bounds[item][2]):
sample = self.sampler(self.bounds[item])
x_new[i] = sample
i+=1
return x_new
def temp(self,step):
#"""
# Function to anneal temperature
#"""
if self.cooling=='fast':
Tfac = -np.log(float(self.Tmax) / self.Tmin)
T = self.Tmax * np.exp( Tfac * step / self.steps)
elif self.cooling=='boltzmann':
T = self.Tmax / np.log(step + 1)
elif self.cooling=='cauchy':
T = self.Tmax / (step + 1)
# Paul
elif self.cooling=='equilibrium':
accept_rate = np.mean(self.accepts)
sigma = np.std(self.accepted_energy)
g_rho = 4 * accept_rate * (1 - accept_rate)**2 / ((2 - accept_rate)**2)
T = np.max([1 / (1 / self.T + self.lmbda * (1 / sigma) * (self.T / sigma)**2 * g_rho),self.Tmin])
else:
raise Exception ('--error: unknown cooling mode is entered, fast, boltzmann, cauchy, or equilibrium are ONLY allowed')
return T
def chain_object(self,inp):
#"""
#This function is a multiprocessing object, used to be passed to Pool, that respresents
#an individual SA chain.
#Input:
# inp: a list contains the following inputs in order
# inp[0] --> x0: initial guess to chain
# inp[1] --> E0: initial energy of x0
# inp[2] --> min_step: min step to start this chain
# inp[3] --> max_step: max step to terminate this chain
# inp[4] --> core_seed: seed for this chain
#returns:
# x_best, E_best: best obtained from this chain
# T: last temperature for this chain
# accepts, rejects, improves for this chain
#"""
x_prev=copy.deepcopy(inp[0])
x_best=copy.deepcopy(x_prev)
E_prev=inp[1]
E_best=inp[1]
min_step=inp[2]
max_step=inp[3]
core_seed=inp[4]
if not (self.seed is None):
random.seed(self.seed + core_seed)
np.random.seed(self.seed + core_seed)
rejects=0; accepts=0; improves=0
k=min_step
if self.cooling == 'equilibrium':#Paul.
T = copy.deepcopy(self.T)#Paul
accepted_energy = []#Paul
while k <= max_step:
if self.cooling != 'equilibrium': #Paul. if updated here may not return nan in first few steps
T=self.temp(step=k)
if self.move_func is None:
x=copy.deepcopy(self.move(x=x_prev,chi=self.chi[core_seed-1]))
else:
x=copy.deepcopy(self.move(x=x_prev))
x=self.ensure_bounds(x)
#SA is programmed to maximize reward
E=self.fit_worker(x)
dE = E - E_prev
#-----------------------------------
# Improve/Accept/Reject
#-----------------------------------
if dE > 0: #improvement
improves += 1
accepts += 1
x_prev = copy.deepcopy(x)
E_prev = E
if E > E_best:
x_best = copy.deepcopy(x)
E_best = E
if self.cooling == 'equilibrium':# Paul
accepted_energy.append(E)
elif np.exp(dE/T) >= random.random(): #accept the state
accepts += 1
x_prev = copy.deepcopy(x)
E_prev = E
if self.cooling == 'equilibrium':# Paul
accepted_energy.append(E)
else:
# Reject the new solution (maintain the current state!)
rejects += 1
k+=1
if self.cooling == 'equilibrium':#Paul.
return x_prev, E_prev, T, accepts, rejects, improves, x_best, E_best,accepted_energy
else:
return x_prev, E_prev, T, accepts, rejects, improves, x_best, E_best
def chain(self, x0, E0, step0):
#"""
#This function creates ``ncores`` independent SA chains with same initial guess x0, E0 and
#runs them via multiprocessing Pool.
#Input:
# x0: initial input guess (comes from previous annealing chains or from replay memory)
# E0: energy/fitness value of x0
# step0: is the first time step to use for temperature annealing
# npop: total number of individuals to be evaluated in this annealing stage
#returns:
# x_best, E_best, and T obtained from this annealing stage from all chains
#"""
#Append and prepare the input list to be passed as input to Pool.map
core_list=[]
core_step_min=step0
for j in range(1,self.ncores+1):
core_step_max=step0+j*self.npop-1
core_list.append([x0[j-1], E0[j-1], core_step_min, core_step_max, j])
core_step_min=core_step_max+1
if self.ncores > 1:
with joblib.Parallel(n_jobs=self.ncores) as parallel:
results=parallel(joblib.delayed(self.chain_object)(item) for item in core_list)
else:
results=[]
results.append(list(self.chain_object(core_list[0])))
# Determine the index and the best solution from all chains
#best_index=[y[0] for y in results].index(min([y[0] for y in results]))
self.x_last=[item[0] for item in results]
self.E_last=[item[1] for item in results]
self.T=results[-1][2] # get the temperature of the last chain
self.accepts=[np.round(item[3]/self.npop*100,1) for item in results] #convert to rate
self.rejects=[np.round(item[4]/self.npop*100,1) for item in results] #convert to rate
self.improves=[np.round(item[5]/self.npop*100,1) for item in results] #convert to rate
if self.cooling == 'equilibrium':#Paul
self.accepted_energy = [item[8] for item in results]#Paul
self.accepted_energy = [elem for energy in self.accepted_energy for elem in energy]
if np.std(self.accepted_energy) == 0: # prevent the std to be 0
pass
else:
self.T=self.temp(step=None)
self.x_best, self.E_best=[item[6] for item in results], [item[7] for item in results]
return self.x_last, self.E_last, self.T, self.accepts, self.rejects, self.improves, self.x_best, self.E_best
def InitChains(self, x0=None):
#initialize the chain and run them in parallel (these samples will be used to initialize the annealing process)
#Establish the chain
if x0:
if self.ncores==1:
if not any(isinstance(el, list) for el in x0): #ensure a list of list is submitted for ncores=1
x0=[x0]
assert len(x0) == self.ncores, '--error: the length of x0 ({}) MUST equal the number of ncores/parallel chains ({})'.format(len(x0), self.ncores)
for i in range(len(x0)):
check_mixed_individual(x=x0[i], bounds=self.orig_bounds) #assert the type provided is consistent
if self.grid_flag:
x0[i] = encode_grid_indv_to_discrete(x0[i], bounds=self.orig_bounds, bounds_map=self.bounds_map)
else:
x0=[]
for i in range (self.ncores):
x0.append(self.GenInd(self.bounds))
#Evaluate the swarm
if self.ncores > 1: #evaluate chain in parallel
core_list=[]
for ind in x0:
core_list.append(ind)
with joblib.Parallel(n_jobs=self.ncores) as parallel:
E0=parallel(joblib.delayed(self.fit_worker)(item) for item in core_list)
if self.cooling == 'equilibrium': # Paul
self.T = np.max([self.alpha * np.std(E0),self.Tmin,1e-10])
else: #evaluate swarm in series
E0=[]
for ind in x0:
fitness=self.fit_worker(ind)
E0.append(fitness)
return x0, E0 #return initial guess and initial fitness
[docs] def evolute(self, ngen, x0=None, verbose=False):
"""
This function evolutes the SA algorithm for number of generations.
:param ngen: (int) number of generations to evolute
:param x0: (list of lists) initial samples to start the evolution (``len(x0)`` must be equal to ``ncores``)
:param verbose: (int) print statistics to screen
:return: (tuple) (best individual, best fitness, and dictionary containing major search results)
"""
#chain statistics
#self.accept=0
#self.reject=0
#self.improve=0
stat={'x':[], 'fitness':[], 'T':[], 'accept':[], 'reject':[], 'improve':[]}
E_opt=-np.inf
step0=1
self.steps=ngen * self.npop
if x0:
if isinstance(x0[0], list):
if not all(len(item) == len(x0[0]) for item in x0):
raise Exception ('--error: the variable x0 must be a list of lists, and all internal lists must have same length.')
else:
x0=[x0]
assert len(x0) == self.ncores, '--error: Length of initial guesses x0 ({}) for chains do not equal to ncores or # of chains ({})'.format(len(x0), self.ncores)
assert len(x0[0]) == len(self.bounds), '--error: Length of every list in x0 ({}) do not equal to the size of parameter space in bounds ({})'.format(len(x0[0]), len(self.bounds))
xinit, Einit=self.InitChains(x0=x0)
else:
xinit, Einit=self.InitChains()
x_next=copy.deepcopy(xinit)
E_next=copy.deepcopy(Einit)
ngen=int(ngen/self.ncores)
for i in range (ngen):
#if self.cooling == 'equilibrium':# Paul
# self.accepted_energy = [] # initialize list of accepted energy to empty at step i
x_next,E_next,self.T, acc, rej, imp, x_best, E_best=self.chain(x0=x_next, E0=E_next, step0=step0)
step0=step0+self.npop*self.ncores
arg_max=np.argmax(E_best)
stat['x'].append(x_best[arg_max])
if self.mode=='max':
stat['fitness'].append(max(E_best))
else:
stat['fitness'].append(-max(E_best))
stat['T'].append(self.T)
stat['accept'].append(acc[arg_max])
stat['reject'].append(rej[arg_max])
stat['improve'].append(imp[arg_max])
if max(E_best) > E_opt:
E_opt=max(E_best)
x_opt=copy.deepcopy(x_best[arg_max])
if self.cooling == 'equilibrium': # Paul
if np.mean(self.accepts) <= self.threshold: # help prevent the std to be 0
print("--warning: The SA stopped because the average acceptance rate throughout the chain {} % falls below the threshold {} %".format(np.mean(self.accepts),self.threshold))
break
elif np.mean(self.accepts) == 0:
print("--warning: The SA stopped because the average acceptance rate throughout the chain {} % reaches 0%".format(np.mean(self.accepts),self.threshold))
break
# Paul
if self.reinforce_best == 'hard':
x_next=[x_opt]*self.ncores
E_next=[E_opt]*self.ncores
elif self.reinforce_best == 'soft':
sampling = np.zeros(self.ncores)
normalization = 1 / np.sum(np.exp(- np.abs(E_next) / self.T)) # cte to generate a probability
sampling[0] = np.exp(- abs(E_next[0]) / self.T) # utilize last accepted solution to generate the sampling
if self.ncores != 1:
rho = np.random.uniform(0,1,size = self.ncores) # sample random number between 0 and 1
else:
rho = [1] # probability of choosing itself is 1
for i in range(1,self.ncores):
sampling[i] = sampling[i - 1] + np.exp(- abs(E_next[i]) / self.T)
sampling = sampling * normalization
for count,prob in enumerate(rho):
if prob == 1:
index = self.ncores - 1
elif math.isnan(sampling[count]):# exponantial can lead to numerical erros
index = np.argmax(E_next)
else:
if prob <= sampling[0]:
index = 0
else:
index = np.where(sampling > rho)[0][0]
x_next[count] = x_best[index] # re-initialize with the best ever found by the index'th Markov Chain
E_next[count] = E_best[index]
#mir-grid
if self.grid_flag:
x_opt_correct = decode_discrete_to_grid(x_opt, self.orig_bounds, self.bounds_map)
else:
x_opt_correct = x_opt
if verbose:
print('***********************************************************************')
print('SA step {}/{}, T={}, Ncores={}, Cooling={}, Reinforce={}'.format(step0-1,self.steps,np.round(self.T), self.ncores, self.cooling, self.reinforce_best))
print('***********************************************************************')
print('Statistics for the {} parallel chains'.format(self.ncores))
if self.mode=='max':
print('Best fitness:', np.round(max(E_best),6))
else:
print('Best fitness:', -np.round(max(E_best),6))
print('Best individual:', x_opt_correct)
print('Acceptance Rate (%):', acc)
print('Rejection Rate (%):', rej)
print('Improvment Rate (%):', imp)
print('***********************************************************************')
#--mir
if self.mode=='max':
self.E_opt_correct=E_opt
else:
self.E_opt_correct=-E_opt
if verbose:
print('------------------------ SA Summary --------------------------')
print('Best fitness (y) found:', self.E_opt_correct)
print('Best individual (x) found:', x_opt_correct)
print('--------------------------------------------------------------')
if self.equilib_deactivate: #Paul.
print("-- warning: equilibrium cooling is implemented ONLY for ncores > 1. The cooling is changed to default cooling --> 'fast'")
return x_opt_correct, self.E_opt_correct, stat