Source code for neorl.tune.gridtune

#    This file is part of NEORL.

#    Copyright (c) 2021 Exelon Corporation and MIT Nuclear Science and Engineering
#    NEORL is free software: you can redistribute it and/or modify
#    it under the terms of the MIT LICENSE

#    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
#    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
#    SOFTWARE.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#"""
#Created on Wed Mar  4 11:51:22 2020
#
#@author: majdi
#"""

import logging
import numpy as np
import pandas as pd
import itertools
from multiprocessing import Pool
import joblib
import csv

[docs]class GRIDTUNE: """ A module for grid search for hyperparameter tuning :param param_grid: (dict) the grid (list of possible values) for each hyperparameter provided in a dictionary form. Example: {'x1': [40, 50, 60, 80, 100], 'x2': [0.2, 0.4, 0.8], 'x3': ['blend', 'cx2point']} :param fit: (function) the self-defined fitness function that includes the hyperparameters as input and algorithm score as output """ def __init__(self, param_grid, fit): self.param_grid=param_grid self.fit=fit self.full_grid() def full_grid(self): #This function builds the full multi-dimensional grid self.param_lst=[self.param_grid[item] for item in self.param_grid] self.param_names=[item for item in self.param_grid] #count all possible combinations self.hyperparameter_cases = list(itertools.product(*self.param_lst)) # * here helps passing list of lists to product function # without need to know the size of parameters beforehand def worker(self,x): #This function setup a case object to pass to the Parallel pool caseid=x[0] param_vals=x[1] #form the dictionary for this case case_dict={} case_dict['id']=caseid assert len(param_vals) == len(self.param_names), '--error: it seems the length of the param_names ({}) and param_values ({}) are not equal, cannot proceed'.format(len(self.param_names), len(x)) for name, val in zip(self.param_names, param_vals): case_dict[name]=val try: obj=self.fit(*param_vals) case_dict['score']=obj if self.verbose: print('-------------------------------------------------------------------------------------------') print('TUNE Case {}/{} is completed'.format(caseid, len(self.hyperparameter_cases), case_dict)) print(case_dict) print('-------------------------------------------------------------------------------------------') if self.csvlogger: with open (self.csvlogger, 'a') as csvfile: csvwriter = csv.writer(csvfile, delimiter=',', quoting=csv.QUOTE_MINIMAL, lineterminator = '\n') csvwriter.writerow([case_dict[item] for item in case_dict]) return obj except Exception as e: print(e) logging.exception("message") print('--error: case {} failed during execution'.format(caseid)) print('--error: {} failed'.format(case_dict)) return 'case{}:failed'.format(caseid)
[docs] def tune(self, ncores=1, csvname=None, verbose=True): """ This function starts the tuning process with specified number of processors :param ncores: (int) number of parallel processors (see the **Notes** section below for an important note about parallel execution) :param csvname: (str) the name of the csv file name to save the tuning results (useful for expensive cases as the csv file is updated directly after the case is done) :param verbose: (bool) whether to print updates to the screen or not """ self.ncores=ncores self.csvlogger=csvname self.verbose=verbose if self.verbose: print('***************************************************************') print('****************Grid Search is Running*************************') print('***************************************************************') if self.ncores > 1: print('--- Running in parallel with {} cores'.format(self.ncores)) if self.csvlogger: headers=['id'] + self.param_names + ['score'] with open (self.csvlogger, 'w') as csvfile: csvwriter = csv.writer(csvfile, delimiter=',', quoting=csv.QUOTE_MINIMAL, lineterminator = '\n') csvwriter.writerow(headers) core_lst=[] for i in range (len(self.hyperparameter_cases)): core_lst.append([i+1, self.hyperparameter_cases[i]]) if self.ncores > 1: #p=Pool(self.ncores) #results = p.map(self.worker, core_lst) #p.close() #p.join() with joblib.Parallel(n_jobs=self.ncores) as parallel: results=parallel(joblib.delayed(self.worker)(item) for item in core_lst) else: results=[] for item in core_lst: results.append(self.worker(item)) gridres = pd.DataFrame(self.hyperparameter_cases, columns=self.param_names) gridres.index += 1 gridres['score'] = results #gridres = gridres.sort_values(['score'], axis='index', ascending=False) return gridres