Source code for neorl.evolu.hho

#    This file is part of NEORL.

#    Copyright (c) 2021 Exelon Corporation and MIT Nuclear Science and Engineering
#    NEORL is free software: you can redistribute it and/or modify
#    it under the terms of the MIT LICENSE

#    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
#    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
#    SOFTWARE.

# -*- coding: utf-8 -*-
#"""
#Created on Thu Dec  3 14:42:29 2020
#
#@author: Katelin Du
#"""


# Harris hawks optimization: Algorithm and applications
# Ali Asghar Heidari, Seyedali Mirjalili, Hossam Faris, 
# Ibrahim Aljarah, Majdi Mafarja, Huiling Chen
# Future Generation Computer Systems,
# DOI: https://doi.org/10.1016/j.future.2019.02.028

import random
import numpy as np
import math
import time
import joblib
import matplotlib.pyplot as plt
from neorl.evolu.discrete import mutate_discrete, encode_grid_to_discrete 
from neorl.evolu.discrete import decode_discrete_to_grid, encode_grid_indv_to_discrete
from neorl.utils.seeding import set_neorl_seed
from neorl.utils.tools import get_population, check_mixed_individual

[docs]class HHO(object): """ Harris Hawks Optimizer :param mode: (str) problem type, either "min" for minimization problem or "max" for maximization :param bounds: (dict) input parameter type and lower/upper bounds in dictionary form. Example: ``bounds={'x1': ['int', 1, 4], 'x2': ['float', 0.1, 0.8], 'x3': ['float', 2.2, 6.2]}`` :param fit: (function) the fitness function :param nhawks: (int): number of the hawks in the group :param int_transform: (str): method of handling int/discrete variables, choose from: ``nearest_int``, ``sigmoid``, ``minmax``. :param ncores: (int) number of parallel processors (must be ``<= nhawks``) :param seed: (int) random seed for sampling """ def __init__(self, mode, bounds, fit, nhawks, int_transform='nearest_int', ncores=1, seed=None): self.seed = seed set_neorl_seed(self.seed) #--mir assert mode == 'min' or mode == 'max', "Mode must be 'max' or 'min'." self.mode=mode if mode == 'min': self.fit=fit elif mode == 'max': def fitness_wrapper(*args, **kwargs): return -fit(*args, **kwargs) self.fit=fitness_wrapper else: raise ValueError('--error: The mode entered by user is invalid, use either `min` or `max`') self.int_transform=int_transform self.ncores = ncores self.nhawks = nhawks self.dim = len(bounds) self.bounds = bounds #infer variable types self.var_type = np.array([bounds[item][0] for item in bounds]) #mir-grid if "grid" in self.var_type: self.grid_flag=True self.orig_bounds=bounds #keep original bounds for decoding #print('--debug: grid parameter type is found in the space') self.bounds, self.bounds_map=encode_grid_to_discrete(self.bounds) #encoding grid to int #define var_types again by converting grid to int self.var_type = np.array([self.bounds[item][0] for item in self.bounds]) else: self.grid_flag=False self.bounds = bounds self.orig_bounds=bounds self.lb = np.array([self.bounds[item][1] for item in self.bounds]) self.ub = np.array([self.bounds[item][2] for item in self.bounds])
[docs] def evolute(self, ngen, x0=None, verbose=False, **kwargs): """ This function evolutes the HHO algorithm for number of generations. :param ngen: (int) number of generations to evolute :param x0: (list of lists) initial position of the hawks (must be of same size as ``nhawks``) :param verbose: (bool) print statistics to screen :return: (tuple) (best individual, best fitness, and dictionary containing major search results) """ set_neorl_seed(self.seed) self.history = {'local_fitness':[], 'global_fitness':[]} self.rabbit_energy = float("inf") self.rabbit_location = np.zeros(self.dim) self.verbose = verbose ################################## # Set initial locations of hawks # ################################## self.hawk_positions = np.zeros((self.nhawks, self.dim)) if x0: assert len(x0) == self.nhawks, '--error: the length of x0 ({}) MUST equal the number of hawks in the group ({})'.format(len(x0), self.nhawks) for i in range(self.nhawks): check_mixed_individual(x=x0[i], bounds=self.orig_bounds) #assert the type provided is consistent if self.grid_flag: self.hawk_positions[i,:] = encode_grid_indv_to_discrete(x0[i], bounds=self.orig_bounds, bounds_map=self.bounds_map) else: self.hawk_positions[i,:] = x0[i] else: # self.hawk_positions = np.asarray([x * (self.ub - self.lb) + self.lb for x in np.random.uniform(0, 1, (self.nhawks, self.dim))]) for hawk_i in range(self.nhawks): self.hawk_positions[hawk_i, :] = self.init_sample() self.best_scores=[] for t in range(ngen): self.a= 1 - t * ((1) / ngen) #mir: a decreases linearly between 1 to 0, for discrete mutation if 'E1' in kwargs: #take it from external vector assert len(kwargs["E1"]) == ngen, '--error: the length of `E1` in kwargs must equal to ngen' self.E1=kwargs["E1"][t] else: #E1 is annealed from 2 to 0 self.E1 = 2 * (1 - (t / ngen)) # factor to show the decreasing energy of rabbit ########################### # Evaluate hawk fitnesses # ########################### fitness_lst = self.eval_hawks() #for logging self.prev_pop=self.hawk_positions.copy() self.prev_fits=np.array(fitness_lst) self.best_scores.append(np.min(fitness_lst)) ####################################################################### # Update rabbit energy and rabbit location based on best hawk fitness # ####################################################################### for i, fitness in enumerate(fitness_lst): if fitness < self.rabbit_energy: self.rabbit_energy = fitness self.rabbit_location = self.hawk_positions[i, :].copy() ##################################################### # Update best global and local fitnesses in history # ##################################################### if self.mode=='max': self.best_global_fitness = -self.rabbit_energy else: self.best_global_fitness = self.rabbit_energy if self.verbose and t % self.nhawks: # change depending on how often message should be displayed print(f'HHO step {t*self.nhawks}/{ngen*self.nhawks}, nhawks={self.nhawks}, ncores={self.ncores}') print('Best global fitness:', np.round(self.best_global_fitness, 6)) #mir-grid if self.grid_flag: self.rabbit_decoded=decode_discrete_to_grid(self.rabbit_location,self.orig_bounds,self.bounds_map) print('Best rabbit position:', self.rabbit_decoded) else: print('Best rabbit position:', np.round(self.rabbit_location, 6)) print('E1:', self.E1) print() ################################ # Update the location of hawks # ################################ self.update_hawks(fitness_lst) # now self.hawk_positions is updated for hawk_i in range(self.nhawks): #mir: this bound check line is needed to ensure that choices.remove option to work self.hawk_positions[hawk_i, :] = self.ensure_bounds(self.hawk_positions[hawk_i, :], self.bounds) for dim in range(self.dim): if self.var_type[dim] == 'int': self.hawk_positions[hawk_i, dim] = mutate_discrete(x_ij=self.hawk_positions[hawk_i, dim], x_min=self.hawk_positions[hawk_i, :].min(), x_max=self.hawk_positions[hawk_i, :].max(), lb=self.lb[dim], ub=self.ub[dim], alpha=self.a, method=self.int_transform) #mir-grid if self.grid_flag: self.rabbit_correct=decode_discrete_to_grid(self.rabbit_location,self.orig_bounds,self.bounds_map) else: self.rabbit_correct=self.rabbit_location if self.verbose: print('------------------------ HHO Summary --------------------------') print('Function:', self.fit.__name__) print('Best fitness (y) found:', self.best_global_fitness) print('Best individual (x) found:', self.rabbit_correct) print('-------------------------------------------------------------- \n \n') if self.mode=='max': self.prev_fits=-self.prev_fits self.best_scores=[-item for item in self.best_scores] self.history['global_fitness'] = np.maximum.accumulate(self.best_scores) else: self.history['global_fitness'] = np.minimum.accumulate(self.best_scores) self.history['local_fitness'] = self.best_scores #--mir return the last population for restart calculations if self.grid_flag: self.history['last_pop'] = get_population(self.prev_pop, fits=self.prev_fits, grid_flag=True, bounds=self.orig_bounds, bounds_map=self.bounds_map) else: self.history['last_pop'] = get_population(self.prev_pop, fits=self.prev_fits, grid_flag=False) return self.rabbit_correct, self.best_global_fitness, self.history
def ensure_bounds(self, vec, bounds): vec_new = [] # cycle through each variable in vector for i, (key, val) in enumerate(bounds.items()): # variable exceedes the minimum boundary if vec[i] < bounds[key][1]: vec_new.append(bounds[key][1]) # variable exceedes the maximum boundary if vec[i] > bounds[key][2]: vec_new.append(bounds[key][2]) # the variable is fine if bounds[key][1] <= vec[i] <= bounds[key][2]: vec_new.append(vec[i]) return vec_new def init_sample(self): #""" #Initialize a hawk location. #Return: #array - hawk position #""" hawk_pos = [] for key in self.bounds: if self.bounds[key][0] == 'int': hawk_pos.append(random.randint(self.bounds[key][1], self.bounds[key][2])) elif self.bounds[key][0] == 'float': hawk_pos.append(random.uniform(self.bounds[key][1], self.bounds[key][2])) # elif self.bounds[key][0] == 'grid': # hawk_pos.append(random.sample(self.bounds[key][1],1)[0]) else: raise Exception ('unknown data type is given; either int, float, or grid are allowed for parameter bounds') return np.array(hawk_pos) def eval_hawks(self): #""" #Evaluate fitness of hawks with parallel processing. #Return: #list - hawk fitnesses #""" #print(self.hawk_positions) if self.ncores > 1: with joblib.Parallel(n_jobs=self.ncores) as parallel: fitness_lst = parallel(joblib.delayed(self.fit_worker)(self.hawk_positions[i, :]) for i in range(self.nhawks)) else: fitness_lst = [] for i in range(self.nhawks): fitness_lst.append(self.fit_worker(self.hawk_positions[i, :])) return fitness_lst def update_hawks(self, fitness_lst): #""" #Update hawk positions according to HHO rules. #Params: #cur_gen - current generation of HHO #ngen - total number of generations in HHO #fitness_lst - list of hawk fitnesses in current generation #Return: #None #""" for i in range(self.nhawks): E0 = 2 * random.random() - 1 rabbit_escape_energy = self.E1 * E0 # escaping energy of rabbit: eq. (3) ############################## # Exploration phase: eq. (1) # ############################## if abs(rabbit_escape_energy) >= 1: self.exploration(i) ############################################################### # Exploitation phase: attacking the rabbit using 4 strategies # ############################################################### elif abs(rabbit_escape_energy) < 1: r = random.random() # probablity of each event ########################################## # Phase 1: surprise pounce (seven kills) # ########################################## if r >= 0.5: self.surprise_pounce(i, rabbit_escape_energy) ############################################################# # Phase 2: performing team rapid dives (leapfrog movements) # ############################################################# elif r < 0.5: self.team_rapid_dives(i, fitness_lst[i], rabbit_escape_energy) def exploration(self, hawk_index): #""" #Exploration phase based on two perching strategies: # 1. new hawk location based on other hawks' locations # 2. new hawk location generated randomly #Params: #hawk_index - index of hawk being updated #Return: #None #""" q = random.random() rand_hawk = math.floor(self.nhawks * random.random()) rand_hawk_pos = self.hawk_positions[rand_hawk, :] # location of random hawk if q < 0.5: # perch based on other family members self.hawk_positions[hawk_index, :] = rand_hawk_pos \ - random.random() * abs(rand_hawk_pos - 2 * random.random() * self.hawk_positions[hawk_index, :]) else: # perch on a random tall tree (random site inside group's home range) self.hawk_positions[hawk_index, :] = (self.rabbit_location - self.hawk_positions.mean(0)) \ - random.random() * ((self.ub - self.lb) * random.random() + self.lb) def surprise_pounce(self, hawk_index, escape_energy): #""" #Exploitation phase 1 based on two strategies: # 1. Hard besiege without team rapid dives # 2. Soft besiege without team rapid dives #Params: #hawk_index - index of hawk being updated #escape_energy - escape energy of the rabbit #Return: #None #""" ######################## # Hard besiege eq. (6) # ######################## if abs(escape_energy) < 0.5: self.hawk_positions[hawk_index, :] = (self.rabbit_location) \ - escape_energy * abs(self.rabbit_location - self.hawk_positions[hawk_index, :]) ######################## # Soft besiege eq. (4) # ######################## elif abs(escape_energy) >= 0.5: J = 2 * (1 - random.random()) # random jump strength of the rabbit; 1 < J < 2 #mir: the index was "i" here, which should be hawk_index self.hawk_positions[hawk_index, :] = (self.rabbit_location - self.hawk_positions[hawk_index, :]) \ - escape_energy * abs(J * self.rabbit_location - self.hawk_positions[hawk_index, :]) def team_rapid_dives(self, hawk_index, hawk_fitness, escape_energy): #""" #Exploitation phase 2 based on two strategies: # 1. Hard besiege with team rapid dives # 2. Soft besiege with team rapid dives #Params: #hawk_index - index of hawk being updated #hawk_fitness - fitness of current hawk #escape_energy - escape energy of the rabbit #Return: #None #""" ######################### # Soft besiege eq. (10) # ######################### if abs(escape_energy) >= 0.5: J = 2 * (1 - random.random()) Y = self.rabbit_location \ - escape_energy * abs(J * self.rabbit_location - self.hawk_positions[hawk_index, :]) #mir: uses a built-in function #(order is important for choices.remove(), first check bounds, then check discrete) Y=self.ensure_bounds(Y, self.bounds) Y=self.ensure_discrete(Y) if self.fit_worker(Y) < hawk_fitness: # improved move? self.hawk_positions[hawk_index, :] = Y.copy() else: # hawks perform self.levy-based short rapid dives around the rabbit Z = self.rabbit_location \ - escape_energy * abs(J * self.rabbit_location - self.hawk_positions[hawk_index, :]) \ + np.multiply(np.random.randn(self.dim), self.levy()) #mir--- #(order is important for choices.remove(), first check bounds, then check discrete) Z=self.ensure_bounds(Z, self.bounds) Z=self.ensure_discrete(Z) if self.fit_worker(Z) < hawk_fitness: self.hawk_positions[hawk_index, :] = Z.copy() ######################### # Hard besiege eq. (11) # ######################### elif abs(escape_energy) < 0.5: J = 2 * (1 - random.random()) Y = self.rabbit_location \ - escape_energy * abs(J * self.rabbit_location - self.hawk_positions.mean(0)) #---mir #(order is important for choices.remove(), first check bounds, then check discrete) Y=self.ensure_bounds(Y, self.bounds) Y=self.ensure_discrete(Y) if self.fit_worker(Y) < hawk_fitness: # improved move? self.hawk_positions[hawk_index, :] = Y.copy() else: # Perform levy-based short rapid dives around the rabbit Z = self.rabbit_location \ - escape_energy * abs(J * self.rabbit_location - self.hawk_positions.mean(0)) \ + np.multiply(np.random.randn(self.dim), self.levy()) #--mir #(order is important for choices.remove(), first check bounds, then check discrete) Z=self.ensure_bounds(Z, self.bounds) Z=self.ensure_discrete(Z) if self.fit_worker(Z) < hawk_fitness: self.hawk_positions[hawk_index, :] = Z.copy() def fit_worker(self, hawk_pos): #""" #Evaluates fitness of a hawk. #Params: #hawk_pos - hawk position vector #Return: #float - hawk fitness #""" #mir--- hawk_pos=self.ensure_bounds(hawk_pos, self.bounds) #mir-grid if self.grid_flag: #decode the individual back to the int/float/grid mixed space hawk_pos=decode_discrete_to_grid(hawk_pos,self.orig_bounds,self.bounds_map) fitness = self.fit(hawk_pos) return fitness def ensure_discrete(self, vec): #""" #to mutate a vector if discrete variables exist #handy function to be used four times within HHO phases #Params: #vec - hawk position in vector/list form #Return: #vec - updated hawk position vector with discrete values #""" for dim in range(self.dim): if self.var_type[dim] == 'int': vec[dim] = mutate_discrete(x_ij=vec[dim], x_min=min(vec), x_max=max(vec), lb=self.lb[dim], ub=self.ub[dim], alpha=self.a, method=self.int_transform, ) return vec def levy(self): #""" #Evaluates the levy flight (LF) function eq. (9). #Return: #float - result of levy flight function #""" beta = 1.5 sigma = (math.gamma(1 + beta) * math.sin(math.pi * beta / 2) / (math.gamma((1 + beta) / 2) * beta * 2 ** ((beta - 1) / 2))) ** (1 / beta) u = 0.01 * np.random.randn(self.dim) * sigma v = np.random.randn(self.dim) zz = np.power(np.absolute(v), (1 / beta)) step = np.divide(u, zz) return step