Source code for neorl.evolu.xnes

#    This file is part of NEORL.

#    Copyright (c) 2021 Exelon Corporation and MIT Nuclear Science and Engineering
#    NEORL is free software: you can redistribute it and/or modify
#    it under the terms of the MIT LICENSE

#    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
#    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
#    SOFTWARE.

# -*- coding: utf-8 -*-
#"""
#Created on Thu Dec  3 14:42:29 2020
#
#@author: Majdi
#"""


from numpy import dot, eye, asarray, array, trace, exp 
from numpy import mean, sum, argsort, arange
from scipy.stats import multivariate_normal, norm
from scipy.linalg import det, expm
import joblib
import random
import numpy as np
import copy
from neorl.utils.seeding import set_neorl_seed
from neorl.utils.tools import get_population

[docs]class XNES(object): """ Exponential Natural Evolution Strategies :param mode: (str) problem type, either "min" for minimization problem or "max" for maximization :param bounds: (dict) input parameter type and lower/upper bounds in dictionary form. Example: ``bounds={'x1': ['int', 1, 4], 'x2': ['float', 0.1, 0.8], 'x3': ['float', 2.2, 6.2]}`` :param fit: (function) the fitness function :param npop: (int) total number of individuals in the population (default: if None, it will make an approximation, see **Notes** below) :param A: (np.array): initial guess of the covariance matrix A (default: identity matrix, see **Notes** below) :param eta_mu: (float) learning rate for updating the center of the search distribution ``mu`` (see **Notes** below) :param eta_sigma: (float) learning rate for updating the step size ``sigma`` (default: if None, it will make an approximation, see **Notes** below) :param eta_Bmat: (float) learning rate for updating the normalized transformation matrix ``B`` (default: if None, it will make an approximation, see **Notes** below) :param adapt_sampling: (bool): activate the adaption sampling option :param ncores: (int) number of parallel processors :param seed: (int) random seed for sampling """ def __init__(self, mode, bounds, fit, A=None, npop=None, eta_mu=1.0, eta_sigma=None, eta_Bmat=None, adapt_sampling=False, ncores=1, seed=None): set_neorl_seed(seed) patience=100 self.fitness_hom=-np.inf #--mir self.mode=mode if mode == 'max': self.f=fit elif mode == 'min': def fitness_wrapper(*args, **kwargs): return -fit(*args, **kwargs) self.f=fitness_wrapper else: raise ValueError('--error: The mode entered by user is invalid, use either `min` or `max`') self.eta_mu = eta_mu self.use_adasam = adapt_sampling self.ncores = ncores self.bounds=bounds dim = len(bounds) A = np.eye(dim) if A is None else A sigma = abs(det(A))**(1.0/dim) bmat = A*(1.0/sigma) self.dim = dim self.sigma = sigma self.bmat = bmat # default population size and learning rates npop = int(4 + 3*np.log(dim)) if npop is None else npop eta_sigma = 3*(3+np.log(dim))*(1.0/(5*dim*np.sqrt(dim))) if eta_sigma is None else eta_sigma eta_Bmat = 3*(3+np.log(dim))*(1.0/(5*dim*np.sqrt(dim))) if eta_Bmat is None else eta_Bmat self.npop = npop self.eta_sigma = eta_sigma self.eta_bmat = eta_Bmat use_fshape=True # compute utilities if using fitness shaping if use_fshape: a = np.log(1+0.5*npop) utilities = array([max(0, a-np.log(k)) for k in range(1,npop+1)]) utilities /= sum(utilities) utilities -= 1.0/npop # broadcast utilities = utilities[::-1] # ascending order else: utilities = None self.use_fshape = use_fshape self.utilities = utilities # stuff for adasam self.eta_sigma_init = eta_sigma self.sigma_old = None # logging self.fitness_best = -np.inf self.mu_best = None self.done = False self.counter = 0 self.patience = patience self.history = {'eta_sigma':[], 'sigma':[], 'fitness':[]} # do not use options below when one individual in population is used if npop == 1: self.use_fshape = False self.use_adasam = False def init_sample(self, bounds): indv=[] for key in bounds: if bounds[key][0] == 'int': indv.append(random.randint(bounds[key][1], bounds[key][2])) elif bounds[key][0] == 'float': indv.append(random.uniform(bounds[key][1], bounds[key][2])) elif bounds[key][0] == 'grid': indv.append(random.sample(bounds[key][1],1)[0]) else: raise Exception ('unknown data type is given, either int, float, or grid are allowed for parameter bounds') return indv def ensure_bounds(self, vec, bounds): vec_new = [] # cycle through each variable in vector for i, (key, val) in enumerate(bounds.items()): # variable exceedes the minimum boundary if vec[i] < bounds[key][1]: vec_new.append(bounds[key][1]) # variable exceedes the maximum boundary if vec[i] > bounds[key][2]: vec_new.append(bounds[key][2]) # the variable is fine if bounds[key][1] <= vec[i] <= bounds[key][2]: vec_new.append(vec[i]) return vec_new
[docs] def evolute(self, ngen, x0=None, verbose=False): """ This function evolutes the XNES algorithm for number of generations. :param ngen: (int) number of generations to evolute :param x0: (list) initial guess for the search (must be of same size as ``len(bounds)``) :param verbose: (bool) print statistics to screen :return: (tuple) (best individual, best fitness, and dictionary containing major search results) """ f = self.f self.verbose=verbose if x0: assert len(x0) == self.dim, 'the length of x0 ({}) MUST equal the number of parameters in bounds ({})'.format(len(x0), self.dim) self.mu=x0 else: self.mu=self.init_sample(self.bounds) mu, sigma, bmat = self.mu, self.sigma, self.bmat eta_mu, eta_sigma, eta_bmat = self.eta_mu, self.eta_sigma, self.eta_bmat npop = self.npop dim = self.dim sigma_old = self.sigma_old eyemat = eye(dim) with joblib.Parallel(n_jobs=self.ncores) as parallel: for i in range(ngen): s_try = np.random.randn(npop, dim) z_try = mu + sigma * dot(s_try, bmat) # broadcast for k in range (len(z_try)): z_try[k] = self.ensure_bounds(vec=z_try[k], bounds=self.bounds) #print(z_try) f_try = parallel(joblib.delayed(f)(z) for z in z_try) f_try = asarray(f_try) # save if best fitness = mean(f_try) isort = argsort(f_try) f_try = f_try[isort] s_try = s_try[isort] z_try = z_try[isort] for m in range (len(f_try)): if f_try[m] > self.fitness_best: self.fitness_best=f_try[m] self.x_best=copy.deepcopy(z_try[m]) self.last_pop=z_try.copy() self.last_fit=np.array(f_try).copy() if fitness - 1e-8 > self.fitness_best: self.mu_best = mu.copy() self.counter = 0 else: self.counter += 1 #if self.counter > self.patience: # self.done = True # return u_try = self.utilities if self.use_fshape else f_try if self.use_adasam and sigma_old is not None: # sigma_old must be available eta_sigma = self.adasam(eta_sigma, mu, sigma, bmat, sigma_old, z_try) dj_delta = dot(u_try, s_try) dj_mmat = dot(s_try.T, s_try*u_try.reshape(npop,1)) - sum(u_try)*eyemat dj_sigma = trace(dj_mmat)*(1.0/dim) dj_bmat = dj_mmat - dj_sigma*eyemat sigma_old = sigma # update mu += eta_mu * sigma * dot(bmat, dj_delta) sigma *= exp(0.5 * eta_sigma * dj_sigma) bmat = dot(bmat, expm(0.5 * eta_bmat * dj_bmat)) # logging self.history['fitness'].append(self.fitness_best) self.history['sigma'].append(sigma) self.history['eta_sigma'].append(eta_sigma) #--mir if self.mode=='min': self.fitness_best_correct=-self.fitness_best else: self.fitness_best_correct=self.fitness_best # Print data if self.verbose and i % self.npop: print('^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^') print('XNES step {}/{}, NPOP={}, ETA_MU={}, ETA_SIGMA={}, ETA_BMAT={}, Ncores={}'.format((i+1)*self.npop, ngen*self.npop, self.npop, np.round(self.eta_mu,2), np.round(self.eta_sigma,2), np.round(self.eta_bmat,2), self.ncores)) print('^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^') print('Best XNES Fitness:', np.round(self.fitness_best_correct,6)) print('Best XNES Position:', np.round(self.x_best,6)) print('MU:', np.round(mu,3)) print('Sigma:', np.round(sigma,3)) print('BMAT:', np.round(bmat,3)) print('^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^') # keep last results self.mu, self.sigma, self.bmat = mu, sigma, bmat self.eta_sigma = eta_sigma self.sigma_old = sigma_old if self.verbose: print('------------------------ XNES Summary --------------------------') print('Best fitness (y) found:', self.fitness_best_correct) print('Best individual (x) found:', self.x_best) print('--------------------------------------------------------------') #--mir if self.mode=='min': self.history['fitness']=[-item for item in self.history['fitness']] self.last_fit=-self.last_fit self.history['last_pop'] = get_population(self.last_pop, fits=self.last_fit) return self.x_best, self.fitness_best_correct, self.history
def adasam(self, eta_sigma, mu, sigma, bmat, sigma_old, z_try): #Adaptation sampling eta_sigma_init = self.eta_sigma_init dim = self.dim c = .1 rho = 0.5 - 1./(3*(dim+1)) # empirical bbmat = dot(bmat.T, bmat) cov = sigma**2 * bbmat sigma_ = sigma * np.sqrt(sigma*(1./sigma_old)) # increase by 1.5 cov_ = sigma_**2 * bbmat p0 = multivariate_normal.logpdf(z_try, mean=mu, cov=cov) p1 = multivariate_normal.logpdf(z_try, mean=mu, cov=cov_) w = exp(p1-p0) # Mann-Whitney. It is assumed z_try was in ascending order. n = self.npop n_ = sum(w) u_ = sum(w * (arange(n)+0.5)) u_mu = n*n_*0.5 u_sigma = np.sqrt(n*n_*(n+n_+1)/12.) cum = norm.cdf(u_, loc=u_mu, scale=u_sigma) if cum < rho: return (1-c)*eta_sigma + c*eta_sigma_init else: return min(1, (1+c)*eta_sigma)