Source code for neorl.evolu.pso

#    This file is part of NEORL.

#    Copyright (c) 2021 Exelon Corporation and MIT Nuclear Science and Engineering
#    NEORL is free software: you can redistribute it and/or modify
#    it under the terms of the MIT LICENSE

#    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
#    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
#    SOFTWARE.

# -*- coding: utf-8 -*-
#Created on Mon Jun 15 19:37:04 2020
#@author: Majdi Radaideh

import random
import numpy as np
from collections import defaultdict
import copy
import time
import joblib
from neorl.evolu.discrete import mutate_discrete, encode_grid_to_discrete 
from neorl.evolu.discrete import decode_discrete_to_grid, encode_grid_indv_to_discrete
from neorl.utils.seeding import set_neorl_seed
from neorl.utils.tools import get_population, check_mixed_individual

[docs]class PSO: """ Parallel Particle Swarm Optimisaion (PSO) module :param mode: (str) problem type, either "min" for minimization problem or "max" for maximization :param bounds: (dict) input parameter type and lower/upper bounds in dictionary form. Example: ``bounds={'x1': ['int', 1, 4], 'x2': ['float', 0.1, 0.8], 'x3': ['float', 2.2, 6.2]}`` :param fit: (function) the fitness function :param npar: (int) number of particles in the swarm :param c1: (float) cognitive speed constant :param c2: (float) social speed constant :param speed_mech: (str) type of speed mechanism to update particle velocity, choose between ``constric``, ``timew``, ``globw``. :param ncores: (int) number of parallel processors :param seed: (int) random seed for sampling """ def __init__ (self, mode, bounds, fit, npar=50, c1=2.05, c2=2.05, speed_mech='constric', ncores=1, seed=None): set_neorl_seed(seed) self.bounds=bounds self.npar=npar #--mir self.mode=mode if mode == 'max': self.fit=fit elif mode == 'min': def fitness_wrapper(*args, **kwargs): return -fit(*args, **kwargs) self.fit=fitness_wrapper else: raise ValueError('--error: The mode entered by user is invalid, use either `min` or `max`') self.ncores=ncores self.speed_mech=speed_mech self.c1=c1 self.c2=c2 self.size=len(bounds) self.v0=0.1 # factor to intialize the speed if self.speed_mech=='constric': phi=self.c1+self.c2 self.w=2/np.abs(2-phi-np.sqrt(phi**2-4*phi)) elif self.speed_mech=='timew': self.wmin=0.4 self.wmax=0.9 self.w=self.wmax elif self.speed_mech=='globw': pass else: raise ('only timew, globw, or constric are allowed for speed_mech, the mechanism used is not defined') assert self.ncores >=1, "Number of cores must be more than or equal 1" #infer variable types self.datatype = np.array([bounds[item][0] for item in bounds]) #mir-grid if "grid" in self.datatype: self.grid_flag=True self.orig_bounds=bounds #keep original bounds for decoding #print('--debug: grid parameter type is found in the space') self.bounds, self.bounds_map=encode_grid_to_discrete(self.bounds) #encoding grid to int #define var_types again by converting grid to int self.datatype = np.array([self.bounds[item][0] for item in self.bounds]) else: self.grid_flag=False self.bounds = bounds self.orig_bounds=bounds self.low = np.array([self.bounds[item][1] for item in self.bounds]) self.up = np.array([self.bounds[item][2] for item in self.bounds]) def GenParticle(self, bounds): #""" #Particle generator #Input: # -bounds (dict): input paramter type and lower/upper bounds in dictionary form #Returns: # -particle (list): particle position # -speed (list): particle speed #""" content=[] for key in bounds: if bounds[key][0] == 'int': content.append(random.randint(bounds[key][1], bounds[key][2])) elif bounds[key][0] == 'float': content.append(random.uniform(bounds[key][1], bounds[key][2])) elif bounds[key][0] == 'grid': content.append(random.sample(bounds[key][1],1)[0]) else: raise Exception ('unknown data type is given, either int, float, or grid are allowed for parameter bounds') particle=list(content) speed = list(self.v0*np.array(content)) return particle, speed def ensure_bounds(self, vec): vec_new = [] # cycle through each variable in vector for i, (key, val) in enumerate(self.bounds.items()): # variable exceedes the minimum boundary if vec[i] < self.bounds[key][1]: vec_new.append(self.bounds[key][1]) # variable exceedes the maximum boundary if vec[i] > self.bounds[key][2]: vec_new.append(self.bounds[key][2]) # the variable is fine if self.bounds[key][1] <= vec[i] <= self.bounds[key][2]: vec_new.append(vec[i]) return vec_new def fit_worker(self, x): #""" #Evaluates fitness of an individual. #""" x=self.ensure_bounds(x) #mir-grid if self.grid_flag: #decode the individual back to the int/float/grid mixed space x=decode_discrete_to_grid(x,self.orig_bounds,self.bounds_map) fitness = self.fit(x) return fitness def InitSwarm(self, x0=None, verbose=False): #""" #Swarm intializer #Inputs: # -warmup (int): number of individuals to create and evaluate initially #Returns # -pop (dict): initial swarm in a dictionary form, looks like this: # # pop={particle key: [particle position, particle velocity, particle fitness]} # pop={0: [[1,2,3,4,5], [0.1,0.2,0.3,0.4,0.5], 1.2], # ... # 99: [[1.1,2.1,3.1,4.1,5.1], [0.1,0.2,0.3,0.4,0.5], 5.2]} # #""" #initialize the swarm and velocity and run them in parallel (these samples will be used to initialize the swarm) pop=defaultdict(list) # dict key runs from 0 to self.npar-1 # index 0: individual, index 1: velocity, index 2: fitness #Establish the swarm if x0: if verbose: print('The first particle provided by the user:', x0[0]) print('The last particle provided by the user:', x0[-1]) for i in range(len(x0)): check_mixed_individual(x=x0[i], bounds=self.orig_bounds) #assert the type provided is consistent if self.grid_flag: x_encoded=encode_grid_indv_to_discrete(x0[i], bounds=self.orig_bounds, bounds_map=self.bounds_map) speed = list(self.v0*np.array(x_encoded)) pop[i].append(x_encoded) else: pop[i].append(x0[i]) speed = list(self.v0*np.array(x0[i])) pop[i].append(speed) else: for i in range (self.npar): particle, speed=self.GenParticle(self.bounds) pop[i].append(particle) pop[i].append(speed) #Evaluate the swarm if self.ncores > 1: #evaluate swarm in parallel core_list=[] for particle in pop: core_list.append(pop[particle][0]) with joblib.Parallel(n_jobs=self.ncores) as parallel: fitness=parallel(joblib.delayed(self.fit_worker)(item) for item in core_list) [pop[particle].append(fitness[particle]) for particle in range(len(pop))] else: #evaluate swarm in series for particle in pop: fitness=self.fit_worker(pop[particle][0]) pop[particle].append(fitness) #Setup the local position and fitness for PSO calculations local_pos=[] local_fit=[] for particle in pop: local_pos.append(pop[particle][0]) local_fit.append(pop[particle][2]) return pop, local_pos, local_fit #return final pop dictionary with particle, velocity, and fitness def UpdateParticle(self, particle, local_pos, local_fit): #""" #Function that updates the particle speed and position based on the #best local positon of the particle and best global position of the swarm #The function works with both int or float variables #Input: # particle (list of lists): # particle[0] (list) = current position # particle[1] (list) = speed # particle[2] (list) = current fit # local_pos (list): best local position observed for this particle # #Return: # new_particle (list of lists): modified particle with same structure as `particle` #""" new_particle=copy.deepcopy(particle) for i in range (self.size): r1=random.random() r2=random.random() #********************** #Update Speed #********************** speed_cognitive=self.c1*r1*(local_pos[i]-particle[0][i]) speed_social=self.c2*r2*(self.swm_pos[i]-particle[0][i]) if self.speed_mech=='constric': new_particle[1][i]=self.w*(particle[1][i]+speed_cognitive+speed_social) #print('constric', self.w) elif self.speed_mech=='timew': new_particle[1][i]=self.w*particle[1][i]+speed_cognitive+speed_social elif self.speed_mech=='globw': try: self.w=1.1-self.swm_fit/local_fit except: self.w=random.uniform(0.1,0.9) #when division by zero occurs for "w" #print('globw', self.w) new_particle[1][i]=self.w*particle[1][i]+speed_cognitive+speed_social else: raise ('only constric, timew, globw, are allowed for speed_mech, the mechanism used is not defined') #*********************************** #Update Position based on data type #************************************ if self.datatype[i].strip() == 'float': new_particle[0][i]=particle[0][i]+new_particle[1][i] # adjust maximum position if necessary if new_particle[0][i] > self.up[i]: new_particle[0][i]=self.up[i] # adjust minimum position if neseccary if new_particle[0][i] < self.low[i]: new_particle[0][i]=self.low[i] elif self.datatype[i].strip() == 'int': move_cond=(random.random() < self._sigmoid(new_particle[1][i])) * 1 #returns binary 0/1 if move_cond: #perturb this parameter if int(self.low[i]) == int(self.up[i]): new_particle[0][i] = int(self.low[i]) else: # make a list of possiblities after excluding the current value to enforce mutation choices=list(range(self.low[i],self.up[i]+1)) choices.remove(new_particle[0][i]) # randint is NOT used here since it could re-draw the same integer value, choice is used instead new_particle[0][i] = random.choice(choices) else: raise Exception('The particle position in PSO cannot be modified, either int/float data types are allowed') return new_particle def _sigmoid(self, x): #""" #Helper method for the sigmoid function #Input: # x (scalar or numpy.ndarray): input attribute(s) #Returns: # scalar or numpy.ndarray: output sigmoid computation #""" return 1 / (1 + np.exp(-x)) def select(self, pop, k=1): #""" #Reorder the swarm and select the best `k` particles from it #Input: # -pop (dict): swarm of particles # -k (int): number of particles to survive [ k < len(pop) ] #Returns: # -best_dict (dict): dictionary of the best k particles in the swarm #""" pop=list(pop.items()) pop.sort(key=lambda e: e[1][2], reverse=True) sorted_dict=dict(pop[:k]) #Next block creates a new dict where keys are reset to 0 ... k in order to avoid unordered keys after sort best_dict=defaultdict(list) index=0 for key in sorted_dict: for j in range (3): #5 refers to the properties of each particle in order #0: current pos, 1: speed, 2: current fitness, 3: best local pos, 4: best local fitness best_dict[index].append(sorted_dict[key][j]) index+=1 sorted_dict.clear() return best_dict def GenSwarm(self, swm): #""" #Generate the new swarm (offspring) based on the old swarm, #by looping and updating all particles #Input: # swm (dict): current swarm #Return: # offspring (dict): new updated swarm #""" offspring = defaultdict(list) for i in range(len(swm)): offspring[i] = self.UpdateParticle(particle=swm[i], local_pos=self.local_pos[i], local_fit=self.local_fit[i]) offspring[i][2] = 0 #this fitness item is set to zero since it is not evaluated yet by the fitness return offspring
[docs] def evolute(self, ngen, x0=None, verbose=False, **kwargs): """ This function evolutes the PSO algorithm for number of generations. :param ngen: (int) number of generations to evolute :param x0: (list of lists) the initial position of the swarm particles :param verbose: (bool) print statistics to screen :return: (tuple) (best individual, best fitness, and a list of fitness history) """ self.pso_hist={} self.pso_hist['mean_speed']=[] self.best_scores=[] if x0: #get the initial swarm position from the user, it has to be #print('-- Using The Initial PSO Swarm from the User') assert len(x0) == self.npar, '--error: the length of x0 ({}) (initial swarm) must equal to number of particles ({})'.format(len(x0), self.npar) swarm, self.local_pos, self.local_fit=self.InitSwarm(x0=x0, verbose=verbose) else: #print('-- Using A Random Initial PSO Swarm') #generate the initial swarm internally, assign all variables swarm, self.local_pos, self.local_fit=self.InitSwarm(verbose=verbose) swm0=self.select(swarm, k=1) self.swm_pos=swm0[0][0] self.swm_fit=swm0[0][2] #----------------------------- # Begin the evolution process #----------------------------- for gen in range(1, ngen + 1): #--Vary the particles and generate new offspring/swarm offspring = self.GenSwarm(swm=swarm) #*************************** #Parallel: Evaluate the particles # with multiprocessign Pool #*************************** if self.ncores > 1: t0=time.time() core_list=[] for key in offspring: core_list.append(offspring[key][0]) with joblib.Parallel(n_jobs=self.ncores) as parallel: fitness=parallel(joblib.delayed(self.fit_worker)(item) for item in core_list) self.partime=time.time()-t0 #print('PSO:', self.partime) #append fitness data to the swarm dict for k in range(len(offspring)): offspring[k][2] = fitness[k] #check and update local best for par in range (len(fitness)): if fitness[par] > self.local_fit[par]: self.local_pos[par]= copy.deepcopy(offspring[par][0]) self.local_fit[par]= fitness[par] #check and update global/swarm best if fitness[par] > self.swm_fit: self.swm_fit = fitness[par] self.swm_pos=copy.deepcopy(offspring[par][0]) #*************************** #Serial: no Pool #*************************** else: for par in range(len(offspring)): fitness=self.fit_worker(offspring[par][0]) offspring[par][2]=fitness #check and update local best if fitness > self.local_fit[par]: self.local_pos[par]= copy.deepcopy(offspring[par][0]) self.local_fit[par]= fitness #check and update global/swarm best if fitness > self.swm_fit: self.swm_fit = fitness self.swm_pos=copy.deepcopy(offspring[par][0]) self.partime=0 if self.speed_mech=='timew': if 'w' in kwargs: #take it from external vector assert len(kwargs["w"]) == ngen, '--error: the length of `w` in kwargs must equal to ngen' self.w=kwargs["w"][gen-1] else: #w is annealed from 0.9 to 0.4 step=gen*self.npar totsteps=ngen*self.npar self.w = self.wmax - (self.wmax-self.wmin)*step/totsteps #print('timew', self.w) fit_best=np.max([offspring[item][2] for item in offspring]) #get the max fitness for this generation self.best_scores.append(fit_best) swarm = copy.deepcopy(offspring) #--mir if self.mode=='min': self.swm_fit_correct=-self.swm_fit else: self.swm_fit_correct=self.swm_fit # Print data mean_speed=[np.mean(swarm[i][1]) for i in swarm] self.pso_hist['mean_speed'].append(np.mean(mean_speed)) if verbose: print('^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^') print('PSO step {}/{}, C1={}, C2={}, W={}, Particles={}, Ncores={}'.format(gen*self.npar, ngen*self.npar, np.round(self.c1,2), np.round(self.c2,2), np.round(self.w,2), self.npar, self.ncores)) print('^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^') print('Statistics for generation {}'.format(gen)) print('Best Swarm Fitness:', np.round(self.swm_fit_correct,6)) if self.grid_flag: self.swm_decoded = decode_discrete_to_grid(self.swm_pos, self.orig_bounds, self.bounds_map) print('Best Swarm Position:', self.swm_decoded,6) else: print('Best Swarm Position:', self.swm_pos) print('Max Speed:', np.round(np.max(mean_speed),3)) print('Min Speed:', np.round(np.min(mean_speed),3)) print('Average Speed:', np.round(np.mean(mean_speed),3)) if self.speed_mech=='timew': print('w:', self.w) print('^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^') #Select and order the last population self.population=copy.deepcopy(self.select(pop=swarm, k=self.npar)) # Append local data to the final swarm population, items [3] and [4] in the dictionary for par in range(len(swarm)): swarm[par].append(self.local_pos[par]) swarm[par].append(self.local_fit[par]) #mir-grid if self.grid_flag: self.swm_pos_correct=decode_discrete_to_grid(self.swm_pos,self.orig_bounds,self.bounds_map) else: self.swm_pos_correct=self.swm_pos if verbose: print('------------------------ PSO Summary --------------------------') print('Best fitness (y) found:', self.swm_fit_correct) print('Best individual (x) found:', self.swm_pos_correct) print('--------------------------------------------------------------') #--mir return the last population for restart calculations if self.grid_flag: self.pso_hist['last_pop'] = get_population(self.population, grid_flag=True, bounds=self.orig_bounds, bounds_map=self.bounds_map) else: self.pso_hist['last_pop'] = get_population(self.population, grid_flag=False) if self.mode == 'min': self.best_scores=[-item for item in self.best_scores] self.pso_hist['global_fitness'] = np.minimum.accumulate(self.best_scores) self.pso_hist['last_pop']['fitness'] *= -1 else: self.pso_hist['global_fitness'] = np.maximum.accumulate(self.best_scores) self.pso_hist['local_fitness'] = self.best_scores return self.swm_pos_correct, self.swm_fit_correct, self.pso_hist