# This file is part of NEORL.
# Copyright (c) 2021 Exelon Corporation and MIT Nuclear Science and Engineering
# NEORL is free software: you can redistribute it and/or modify
# it under the terms of the MIT LICENSE
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -*- encoding: utf-8 -*-
#'''
#@File : aco.py
#@Time : 2021/07/30 16:10:59
#@Author : Xubo GU
#@Email : guxubo@alumni.sjtu.edu.cn
#'''
#inspired from
#https://towardsdatascience.com/introduction-to-global-optimization-algorithms-for-continuous-domain-functions-7ad9d01db055
#https://github.com/aidowu1/Py-Optimization-Algorithms/blob/master/AntColonyOptimization/Constants.py
import random
import numpy as np
import joblib
from neorl.utils.seeding import set_neorl_seed
from neorl.utils.tools import get_population
[docs]class ACO(object):
"""
Ant Colony Optimization
:param mode: (str) problem type, either "min" for minimization problem or "max" for maximization
:param bounds: (dict) input parameter type and lower/upper bounds in dictionary form. Example: ``bounds={'x1': ['int', 1, 4], 'x2': ['float', 0.1, 0.8], 'x3': ['float', 2.2, 6.2]}``
:param fit: (function) the fitness function
:param nants: (int) number of total ants
:param narchive: (int) size of archive of best ants (recommended ``narchive < nants``)
:param Q: (float) diversification/intensification factor (see **Notes** below)
:param Z: (float) deviation-distance ratio or pheromone evaporation rate, high Z leads to slow convergence (see **Notes** below).
:param ncores: (int) number of parallel processors
:param seed: (int) random seed for sampling
"""
def __init__(self, mode, fit, bounds, nants=40, narchive=10,
Q=0.5, Z=1.0, ncores=1, seed=None):
assert narchive <= nants, '--error: narchive must be less than or equal nants'
self.seed=seed
set_neorl_seed(self.seed)
self.mode=mode
if mode == 'min':
self.fit=fit
elif mode == 'max':
def fitness_wrapper(*args, **kwargs):
return -fit(*args, **kwargs)
self.fit = fitness_wrapper
else:
raise ValueError('--error: The mode entered by user is invalid, \
use either `min` or `max`')
self.bounds = bounds
self.npop = narchive
self.nvars = len(bounds)
self.nants = nants
self.q = Q
self.z = Z
self.ncores = ncores
self.__final_best_solution = None
self.__probs = None
self.__new_pops = None
self.__random = np.random.RandomState(self.seed)
self.pops_sorted = None
def __computePdf(self) -> object:
#"""
#Computes the PDF values
#"""
points = np.array(range(self.npop), dtype=np.float)
# Solution Weights
w = 1/(np.sqrt(2*np.pi)*self.q*float(self.npop))*np.square(np.exp(-0.5*((points-1)/(self.q*float(self.npop)))))
return w
def __rouletteWheelSelection(self):
#"""
#Roulette wheel selection strategy for selecting the optimal Guassain Kernel
#"""
r = self.__random.rand()
c = np.cumsum(np.reshape(self.__probs, (-1)))
j = np.argwhere(r <= c)[0,0]
return j
def ensure_bounds(self, vec): # bounds check
vec=vec.flatten()
vec_new = []
for i, (key, val) in enumerate(self.bounds.items()):
# less than minimum
if vec[i] < self.bounds[key][1]:
vec_new.append(self.bounds[key][1])
# more than maximum
if vec[i] > self.bounds[key][2]:
vec_new.append(self.bounds[key][2])
# fine
if self.bounds[key][1] <= vec[i] <= self.bounds[key][2]:
vec_new.append(vec[i])
return np.array(vec_new)
[docs] def evolute(self, ngen, x0=None, verbose=False):
"""
This function evolutes the ACO algorithm for number of generations
:param ngen: (int) number of generations to evolute
:param x0: (list of lists) the initial individuals of the population
:param verbose: (bool) print statistics to screen
:return: (tuple) (best individual, best fitness, and list of fitness history)
"""
if x0:
assert len(x0) == self.nants, '--error: the length of x0 ({}) (initial population) must equal to number of ants ({})'.format(len(x0), self.nants)
pops = Populations(self.nants, self.nvars, self.fit, self.bounds, ncores=self.ncores, x0=x0)
else:
pops = Populations(self.nants, self.nvars, self.fit, self.bounds, ncores=self.ncores, x0=None)
fit_hist=[]
self.history = {'local_fitness':[], 'global_fitness':[], 'last_pop':[]}
self.ngen = ngen
self.__best_solutions = [None]*self.ngen
self.pops_sorted = pops.ant_populations # length 10
self.__final_best_solution = self.pops_sorted[0]
self.__w = self.__computePdf()
self.__probs = self.__w/np.sum(self.__w)
self.__means = np.zeros((self.npop, self.nvars))
self.__sigmas = np.zeros((self.npop, self.nvars))
for iter in range(self.ngen):
## self.__constructNewPopulationSolution()
# Means
for _ in range(self.npop):
self.__means[_, :] = np.reshape(self.pops_sorted[_].position, (1, -1))
# Standard Deviation
for l_i in range(self.npop):
d = 0.0
for r_i in range(self.npop):
d += np.abs(self.__means[l_i, :] - self.__means[r_i, :])
self.__sigmas[l_i, :] = (self.z * d) / (self.npop - 1)
self.__new_pops = Populations.createEmptyNewPopulations(self.nants, self.nvars)
for i in range(self.nants):
for j in range(self.nvars):
# Select Gaussian Kernel
k = self.__rouletteWheelSelection()
# Generate Gaussian Random Variable
self.__new_pops[i].position[j] = self.__means[k, j] + self.__sigmas[k, j] * self.__random.randn()
# Apply Variable Bounds
self.__new_pops[i].position=self.ensure_bounds(self.__new_pops[i].position)
# Evaluation
if self.ncores > 1:
with joblib.Parallel(n_jobs=self.ncores) as parallel:
temp_fitness=parallel(joblib.delayed(self.fit)(indv.position) for indv in self.__new_pops)
for i in range(self.nants):
self.__new_pops[i].cost_function = temp_fitness[i]
else:
for i in range(self.nants):
self.__new_pops[i].cost_function = self.fit(self.__new_pops[i].position)
# Merge Main Population (Archive) and New Population (Samples)
self.pops_sorted = self.pops_sorted + self.__new_pops
# Sort Population
self.pops_sorted = sorted(self.pops_sorted, key=lambda x: x.cost_function, reverse=False) # small -> large
# Delete Extra Members
self.pops_sorted = self.pops_sorted[:self.npop]
# Update Best Solution Ever Found
self.__final_best_solution = self.pops_sorted[0]
# Store Best Cost
self.__best_solutions[iter] = self.__final_best_solution
self.last_fit=[item.cost_function for item in self.__new_pops] #for logging
#show the value wrt min/max
if self.mode=='max':
y_print = -float(self.__final_best_solution.cost_function)
self.history['local_fitness'].append(-np.min(self.last_fit))
else:
y_print = float(self.__final_best_solution.cost_function)
self.history['local_fitness'].append(np.min(self.last_fit))
fit_hist.append(y_print)
if verbose:
print('************************************************************')
print('ACOR step {}/{}, Ncores={}'.format(iter+1, self.ngen, self.ncores))
print('************************************************************')
print('Best fitness:', np.round(y_print,6))
print('Best individual:', self.__final_best_solution.position.flatten())
print('Archive mean individual:', self.__means.mean(axis=0))
print('************************************************************')
if verbose:
print('------------------------ ACOR Summary --------------------------')
print('Best fitness (y) found:', y_print)
print('Best individual (x) found:', self.__best_solutions[-1].position.flatten())
print('--------------------------------------------------------------')
self.x_best=self.__best_solutions[-1].position.flatten()
self.y_best=y_print
self.history['global_fitness'] = fit_hist
#obtain last population
self.last_pop=[item.position for item in self.__new_pops]
self.history['last_pop'] = get_population(self.last_pop, self.last_fit)
if self.mode=='max':
self.history['last_pop']['fitness'] *= -1
return self.x_best, self.y_best, self.history
@property
def pops(self):
#"""
#Getter property of the 'self.__pops' attribute
#"""
return self.pops_sorted
@property
def new_pops(self):
#"""
#Getter property of the 'self.__new_pops' attribute
#"""
return self.__new_pops
@property
def final_best_solution(self):
#"""
#Getter property of the 'self.__best_solution' attribute
#"""
return self.__final_best_solution
@property
def best_solutions(self):
#"""
#Getter property of the 'self.__best_solutions' attribute
#"""
return self.__best_solutions
@property
def probs(self):
#"""
#Getter property of the 'self.__probs' attribute
#"""
return self.__probs
@property
def means(self):
#"""
#Getter property of the 'self.__meanss' attribute
#"""
return self.__means
@property
def sigmas(self):
#"""
#Getter property of the 'self.__sigmas' attribute
#"""
return self.__sigmas
class Population(object):
#"""
#Specifies an ACO Population
#"""
def __init__(self, position, cost_function) -> None:
self.position = position
self.cost_function = cost_function
class Populations(object):
#"""
#Specifies Populations of the Ant Colony i.e. the Archive Size
#"""
def __init__(self, n_pop, n_vars, fit, bounds, ncores, x0=None):
#"""
#Constructor
#:param: n_pop: population size
#:param: n_vars: number of variables
#:param: cost_func_ cost function
#:param: bounds: continious domain lower/upper bounds
#"""
self.fit=fit
self.bounds = bounds
self.npop = n_pop
self.__pops = [None]*n_pop
self.pops_sorted = None
self.x0=x0
self.ncores=ncores
def __initializePopulation(self):
#"""
#Initializes the ant colony populations
#"""
position=[]
for i in range(self.npop):
if self.x0 is None:
indv=[]
for key in self.bounds:
if self.bounds[key][0] == 'int':
indv.append(random.randint(self.bounds[key][1], self.bounds[key][2]))
elif self.bounds[key][0] == 'float':
indv.append(random.uniform(self.bounds[key][1], self.bounds[key][2]))
elif self.bounds[key][0] == 'grid':
indv.append(random.sample(self.bounds[key][1],1)[0])
position.append(np.array(indv))
else:
position.append(np.array(self.x0[i]))
if self.ncores > 1:
with joblib.Parallel(n_jobs=self.ncores) as parallel:
cost=parallel(joblib.delayed(self.fit)(indv) for indv in position)
else:
cost=[]
for i in range(self.npop):
cost.append(self.fit(position[i]))
for i in range(self.npop):
self.__pops[i] = Population(position=position[i], cost_function=cost[i])
@property
def ant_populations(self):
#"""
#Getter property to retrieve the 'self.__pops' collection
#"""
self.__initializePopulation()
self.pops_sorted = sorted(self.__pops, key=lambda x: x.cost_function, reverse=False)
return self.pops_sorted
@staticmethod
def createEmptyNewPopulations(n_ants: int, n_vars: int) -> object:
#"""
#Creates an empty collection of ant populations
#:params: n_ants: number of ants
#:params: n_vars: number of variables
#"""
position = np.zeros((n_vars, 1))
empty_populations = [Population(position=position, cost_function=None) for _ in range(n_ants)]
return empty_populations