Source code for neorl.evolu.bat

#    This file is part of NEORL.

#    Copyright (c) 2021 Exelon Corporation and MIT Nuclear Science and Engineering
#    NEORL is free software: you can redistribute it and/or modify
#    it under the terms of the MIT LICENSE

#    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
#    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
#    SOFTWARE.

# -*- coding: utf-8 -*-
#"""
#Created on Fri Jun 18 19:45:06 2021
#
#@author: Devin Seyler and Majdi Radaideh
#"""

import random
import numpy as np
import math
import joblib
from neorl.evolu.discrete import mutate_discrete, encode_grid_to_discrete 
from neorl.evolu.discrete import decode_discrete_to_grid, encode_grid_indv_to_discrete
from neorl.utils.seeding import set_neorl_seed
from neorl.utils.tools import get_population, check_mixed_individual

#Main reference of the BAT algorithm:
#Xie, J., Zhou, Y., & Chen, H. (2013). A novel bat algorithm based on 
#differential operator and Levy flights trajectory. 
#Computational intelligence and neuroscience, 2013.

[docs]class BAT(object): """ BAT Algorithm :param mode: (str) problem type, either ``min`` for minimization problem or ``max`` for maximization :param bounds: (dict) input parameter type and lower/upper bounds in dictionary form. Example: ``bounds={'x1': ['float', 0.1, 0.8], 'x2': ['float', 2.2, 6.2]}`` :param fit: (function) the fitness function :param nbats: (int): number of bats in the population :param fmin: (float): minimum value of the pulse frequency :param fmax: (float): maximum value of the pulse frequency :param A: (float) initial value of the loudness rate :param r0: (float) asymptotic value of the pulse rate :param alpha: (float) decay factor of loudness ``A``, i.e. A approaches 0 by the end of evolution if ``alpha < 1`` :param gamma: (float) exponential factor of the pulse rate ``r``, i.e. ``r`` increases abruptly at the beginning and then converges to ``r0`` by the end of evolution :param levy: (bool): a flag to activate Levy flight steps of the bat to increase bat diversity :param int_transform: (str): method of handling int/discrete variables, choose from: ``nearest_int``, ``sigmoid``, ``minmax``. :param ncores: (int) number of parallel processors (must be ``<= nbats``) :param seed: (int) random seed for sampling """ def __init__(self, mode, bounds, fit, nbats=50, fmin=0, fmax=1, A=0.5, r0=0.5, alpha=1.0, gamma=0.9, levy='False', int_transform='nearest_int', ncores=1, seed=None): set_neorl_seed(seed) assert ncores <= nbats, '--error: ncores ({}) must be less than or equal to nbats ({})'.format(ncores, nbats) assert nbats >= 5, '--error: number of bats must be more than 5 for this algorithm' #--mir self.mode=mode if mode == 'min': self.fit=fit elif mode == 'max': def fitness_wrapper(*args, **kwargs): return -fit(*args, **kwargs) self.fit=fitness_wrapper else: raise ValueError('--error: The mode entered by user is invalid, use either `min` or `max`') self.int_transform=int_transform if int_transform not in ["nearest_int", "sigmoid", "minmax"]: raise ValueError('--error: int_transform entered by user is invalid, must be `nearest_int`, `sigmoid`, or `minmax`') self.bounds=bounds self.ncores = ncores self.nbats=nbats self.fmax=fmax self.fmin=fmin self.A=A self.alpha=alpha self.gamma=gamma self.r0=r0 self.levy_flight=levy #infer variable types self.var_type = np.array([bounds[item][0] for item in bounds]) #mir-grid if "grid" in self.var_type: self.grid_flag=True self.orig_bounds=bounds #keep original bounds for decoding #print('--debug: grid parameter type is found in the space') self.bounds, self.bounds_map=encode_grid_to_discrete(self.bounds) #encoding grid to int #define var_types again by converting grid to int self.var_type = np.array([self.bounds[item][0] for item in self.bounds]) else: self.grid_flag=False self.bounds = bounds self.orig_bounds=bounds self.dim = len(bounds) self.lb=np.array([self.bounds[item][1] for item in self.bounds]) self.ub=np.array([self.bounds[item][2] for item in self.bounds]) def init_sample(self, bounds): #sample initializer indv=[] for key in bounds: if bounds[key][0] == 'int': indv.append(random.randint(bounds[key][1], bounds[key][2])) elif bounds[key][0] == 'float': indv.append(random.uniform(bounds[key][1], bounds[key][2])) # elif bounds[key][0] == 'grid': # indv.append(random.sample(bounds[key][1],1)[0]) else: raise Exception ('unknown data type is given, either int, float, or grid are allowed for parameter bounds') return np.array(indv) def eval_bats(self, position_array): #--------------------- # Fitness calcs #--------------------- core_lst=[] for case in range (0, position_array.shape[0]): core_lst.append(position_array[case, :]) if self.ncores > 1: with joblib.Parallel(n_jobs=self.ncores) as parallel: fitness_lst=parallel(joblib.delayed(self.fit_worker)(item) for item in core_lst) else: fitness_lst=[] for item in core_lst: fitness_lst.append(self.fit_worker(item)) return fitness_lst def select(self, pos, fit): #this function selects the best fitness and position in a population best_fit=np.min(fit) min_idx=np.argmin(fit) best_pos=pos[min_idx,:] return best_pos, best_fit def ensure_bounds(self, vec, bounds): vec_new = [] # cycle through each variable in vector for i, (key, val) in enumerate(bounds.items()): # variable exceedes the minimum boundary if vec[i] < bounds[key][1]: vec_new.append(bounds[key][1]) # variable exceedes the maximum boundary if vec[i] > bounds[key][2]: vec_new.append(bounds[key][2]) # the variable is fine if bounds[key][1] <= vec[i] <= bounds[key][2]: vec_new.append(vec[i]) return vec_new def fit_worker(self, x): #This worker is for parallel calculations # Clip the bat with position outside the lower/upper bounds and return same position x=self.ensure_bounds(x,self.bounds) if self.grid_flag: #decode the individual back to the int/float/grid mixed space x=decode_discrete_to_grid(x,self.orig_bounds,self.bounds_map) # Calculate objective function for each search agent fitness = self.fit(x) return fitness def ensure_discrete(self, vec): #""" #to mutate a vector if discrete variables exist #handy function to be used three times within BAT phases #Params: #vec - bat position in vector/list form #Return: #vec - updated bat position vector with discrete values #""" for dim in range(self.dim): if self.var_type[dim] == 'int': vec[dim] = mutate_discrete(x_ij=vec[dim], x_min=min(vec), x_max=max(vec), lb=self.lb[dim], ub=self.ub[dim], alpha=self.a, method=self.int_transform, ) return vec def Levy(self, dim): #function to return levy step beta = 1.5 sigma = ( math.gamma(1 + beta) * math.sin(math.pi * beta / 2) / (math.gamma((1 + beta) / 2) * beta * 2 ** ((beta - 1) / 2)) ) ** (1 / beta) u = 0.01 * np.random.randn(dim) * sigma v = np.random.randn(dim) zz = np.power(np.absolute(v), (1 / beta)) step = np.divide(u, zz) return step
[docs] def evolute(self, ngen, x0=None, verbose=False): """ This function evolutes the BAT algorithm for number of generations. :param ngen: (int) number of generations to evolute :param x0: (list of lists) initial position of the bats (must be of same size as ``nbats``) :param verbose: (bool) print statistics to screen :return: (tuple) (best individual, best fitness, and dictionary containing major search results) """ self.history = {'local_fitness':[], 'global_fitness':[], 'A': [], 'r': []} self.fbest=float("inf") self.verbose=verbose self.Positions = np.zeros((self.nbats, self.dim)) self.r=self.r0 if x0: assert len(x0) == self.nbats, '--error: the length of x0 ({}) MUST equal the number of bats in the group ({})'.format(len(x0), self.nbats) for i in range(self.nbats): check_mixed_individual(x=x0[i], bounds=self.orig_bounds) #assert the type provided is consistent if self.grid_flag: self.Positions[i,:] = encode_grid_indv_to_discrete(x0[i], bounds=self.orig_bounds, bounds_map=self.bounds_map) else: self.Positions[i,:] = x0[i] else: # Initialize the positions of bats for i in range(self.nbats): self.Positions[i,:]=self.init_sample(self.bounds) #Initialize and evaluate the first bat population fitness0=self.eval_bats(position_array=self.Positions) x0, f0=self.select(pos=self.Positions,fit=fitness0) self.xbest=np.copy(x0) # Main BAT loop for l in range(0, ngen): self.a= 1 - l * ((1) / ngen) #mir: a decreases linearly between 1 to 0, for discrete mutation #------------------------------------------------------ # Stage 1A: Loop over all bats to update the positions #------------------------------------------------------ for i in range(0, self.nbats): if self.levy_flight: #Eq.(11) make a levy flight jump to increase population diversity self.Positions[i,:]=self.Positions[i,:]+np.multiply(np.random.randn(self.dim), self.Levy(self.dim)) #Eq.(8)-(10) f1=((self.fmin-self.fmax)*l/ngen+self.fmax)*random.random() f2=((self.fmax-self.fmin)*l/ngen+self.fmin)*random.random() betas=random.sample(list(range(0,self.nbats)),4) self.Positions[i, :]=self.xbest+f1*(self.Positions[betas[0],:]-self.Positions[betas[1],:]) +f2*(self.Positions[betas[2],:]-self.Positions[betas[3],:]) self.Positions[i, :] = self.ensure_bounds(self.Positions[i , :], self.bounds) self.Positions[i, :] = self.ensure_discrete(self.Positions[i , :]) #----------------------- #Stage 1B: evaluation #----------------------- fitness1=self.eval_bats(position_array=self.Positions) x1, f1=self.select(pos=self.Positions,fit=fitness1) if f1 <= self.fbest: self.fbest=f1 self.xbest=x1.copy() #--------------------------------- #Stage 2A: Generate new positions #--------------------------------- for i in range(0, self.nbats): # Pulse rate if random.random() > self.r: self.Positions[i, :] = self.xbest + self.A * np.random.uniform(-1,1,self.dim) self.Positions[i, :] = self.ensure_bounds(self.Positions[i , :], self.bounds) self.Positions[i, :] = self.ensure_discrete(self.Positions[i , :]) #----------------------- #Stage 2B: evaluation #----------------------- fitness2=self.eval_bats(position_array=self.Positions) x2, f2=self.select(pos=self.Positions,fit=fitness2) if f2 <= self.fbest: self.fbest=f2 self.xbest=x2.copy() #--------------------------------- #Stage 3A: Generate new positions #--------------------------------- for i in range(0, self.nbats): # loudness effect if random.random() < self.A: self.Positions[i, :] = self.xbest + self.r * np.random.uniform(-1,1,self.dim) self.Positions[i, :] = self.ensure_bounds(self.Positions[i , :], self.bounds) self.Positions[i, :] = self.ensure_discrete(self.Positions[i , :]) #----------------------- #Stage 3B: evaluation #----------------------- fitness3=self.eval_bats(position_array=self.Positions) x3, f3=self.select(pos=self.Positions,fit=fitness3) if f3 <= self.fbest: self.fbest=f3 self.xbest=x3.copy() #--------------------------------- #Stage 4: Check and update A/r #--------------------------------- if min(f1, f2, f3) <= self.fbest: #update A self.A = self.alpha*self.A #update r self.r = self.r0*(1-math.exp(-self.gamma*l)) #--------------------------------- #Stage 5: post-processing #--------------------------------- #--mir if self.mode=='max': self.fitness_best_correct=-self.fbest self.local_fitness = -min(f1 , f2 , f3) else: self.fitness_best_correct=self.fbest self.local_fitness = min(f1 , f2 , f3) self.last_pop=self.Positions.copy() self.last_fit=np.array(fitness3).copy() self.best_position=self.xbest.copy() self.history['local_fitness'].append(self.local_fitness) self.history['global_fitness'].append(self.fitness_best_correct) self.history['A'].append(self.A) self.history['r'].append(self.r) # Print statistics if self.verbose and i % self.nbats: print('^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^') print('BAT step {}/{}, nbats={}, Ncores={}'.format((l+1)*self.nbats, ngen*self.nbats, self.nbats, self.ncores)) print('^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^') print('Best Bat Fitness:', np.round(self.fitness_best_correct,6)) if self.grid_flag: self.bat_decoded = decode_discrete_to_grid(self.best_position, self.orig_bounds, self.bounds_map) print('Best Bat Position:', self.bat_decoded) else: print('Best Bat Position:', self.best_position) print('Loudness A:', self.A) print('Pulse rate r:', self.r) print('^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^') #mir-grid if self.grid_flag: self.bat_correct = decode_discrete_to_grid(self.best_position, self.orig_bounds, self.bounds_map) else: self.bat_correct = self.best_position if self.verbose: print('------------------------ BAT Summary --------------------------') print('Best fitness (y) found:', self.fitness_best_correct) print('Best individual (x) found:', self.bat_correct) print('--------------------------------------------------------------') if self.mode=='max': self.last_fit=-self.last_fit #--mir return the last population for restart calculations if self.grid_flag: self.history['last_pop'] = get_population(self.last_pop, fits=self.last_fit, grid_flag=True, bounds=self.orig_bounds, bounds_map=self.bounds_map) else: self.history['last_pop'] = get_population(self.last_pop, fits=self.last_fit, grid_flag=False) return self.bat_correct, self.fitness_best_correct, self.history