Source code for prms_python.optimizer

# -*- coding: utf-8 -*-
'''
optimizer.py -- holds ``Optimizer`` and ``OptimizationResult`` classes for 
optimization routines and management conducted on PRMS parameters.
'''
from __future__ import print_function
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os, sys, json, re, shutil
import multiprocessing as mp
from copy import copy
from copy import deepcopy
from numpy import log10
from datetime import datetime
from .data import Data
from .parameters import Parameters
from .simulation import Simulation, SimulationSeries
from .util import load_statvar, nash_sutcliffe, percent_bias, rmse

OPJ = os.path.join


[docs]class Optimizer: ''' Container for PRMS parameter optimization and related routines. Currently the ``monte_carlo`` method provides random parameter resampling routines using uniform and normal random variables. Example: >>> from prms_python import Data, Optimizer, Parameters >>> params = Parameters('path/to/parameters') >>> data = Data('path/to/data') >>> control = 'path/to/control' >>> work_directory = 'path/to/create/simulations' >>> optr = Optimizer( params, data, control, work_directory, title='the title', description='desc') >>> measured = 'path/to/measured/csv' >>> statvar_name = 'basin_cfs' # or any other valid statvar >>> params_to_resample = ['dday_intcp', 'dday_slope'] # list of params >>> optr.monte_carlo(measured, params_to_resample, statvar_name) ''' #dic for min/max of parameter allowable ranges, add more when needed param_ranges = {'dday_intcp': (-60.0, 10.0), 'dday_slope': (0.2, 0.9), 'jh_coef': (0.005, 0.06), 'pt_alpha': (1.0, 2.0), 'potet_coef_hru_mo': (1.0, 2.0), 'tmax_index': (-10.0, 110.0), 'tmin_lapse': (-10.0, 10.0), 'soil_moist_max': (0.001, 10.0), 'rain_adj': (0.5, 2.0), 'ppt_rad_adj': (0.0, 0.5), 'radadj_intcp': (0.0, 1.0), 'radadj_slope': (0.0, 1.0), 'radj_sppt': (0.0, 1.0), 'radj_wppt': (0.0, 1.0), 'radmax': (0.1, 1.0) } def __init__(self, parameters, data, control_file, working_dir, title, description=None): if isinstance(parameters, Parameters): self.parameters = parameters else: raise TypeError('parameters must be instance of Parameters') if isinstance(data, Data): self.data = data else: raise TypeError('data must be instance of Data') input_dir = '{}'.format(os.sep).join(control_file.split(os.sep)[:-1]) if not os.path.isfile(OPJ(input_dir, 'statvar.dat')): print('You have no statvar.dat file in your model directory') print('Running PRMS on original data in {} for later comparison'\ .format(input_dir)) sim = Simulation(input_dir) sim.run() if not os.path.isdir(working_dir): os.mkdir(working_dir) self.input_dir = input_dir self.control_file = control_file self.working_dir = working_dir self.title = title self.description = description self.measured_arb = None # for arbitrary output methods self.statvar_name = None self.arb_outputs = []
[docs] def monte_carlo(self, reference_path, param_names, statvar_name, \ stage, n_sims=10, method='uniform', mu_factor=1,\ noise_factor=0.1, nproc=None): ''' The ``monte_carlo`` method of ``Optimizer`` performs parameter random resampling techniques to a set of PRMS parameters and executes and manages the corresponding simulations. Arguments: reference_path (str): path to measured data for optimization param_names (list): list of parameter names to resample statvar_name (str): name of statisical variable output name for optimization stage (str): custom name of optimization stage e.g. 'ddsolrad' Keyword Arguments: n_sims (int): number of simulations to conduct parameter optimization/uncertaitnty analysis. method (str): resampling method for parameters (normal or uniform) mu_factor (float): coefficient to scale mean of the parameter(s) to resample from when using the normal distribution to resample i.e. a value of 1.5 will sample from a normal rv with mean 50% higher than the original parameter mean noise_factor (float): scales the variance of noise to add to parameter values when using normal rv (method='normal') nproc (int): number of processors available to run PRMS simulations Returns: None ''' if '_' in stage: raise ValueError('stage name cannot contain an underscore') # assign the optimization object a copy of measured data for plots self.measured_arb = pd.Series.from_csv(reference_path,\ parse_dates=True) # statistical variable output name self.statvar_name = statvar_name start_time = datetime.now().isoformat() # resample params for all simulations- potential place to serialize params = [] for name in param_names: # create list of lists of resampled params tmp = [] for idx in range(n_sims): tmp.append(resample_param(self.parameters, name, how=method,\ mu_factor=mu_factor, noise_factor=noise_factor)) params.append(list(tmp)) # SimulationSeries comprised of each resampled param set series = SimulationSeries( Simulation.from_data( self.data, _mod_params(self.parameters,\ [params[n][i] for n in range(len(params))],\ param_names), self.control_file, OPJ( self.working_dir, # name of sim: first param and mean value '{0}_{1:.10f}'.format(param_names[0], np.mean(params[0][i])) ) ) for i in range(n_sims) ) if not nproc: nproc = mp.cpu_count() // 2 # run outputs = list(series.run(nproc=nproc).outputs_iter()) self.arb_outputs.extend(outputs) # for current instance- add outputs end_time = datetime.now().isoformat() # json metadata for Monte Carlo run meta = { 'params_adjusted' : param_names, 'statvar_name' : self.statvar_name, 'optimization_title' : self.title, 'optimization_description' : self.description, 'start_datetime' : start_time, 'end_datetime' : end_time, 'measured' : reference_path, 'method' : 'Monte Carlo', 'mu_factor' : mu_factor, 'noise_factor' : noise_factor, 'resample': method, 'sim_dirs' : [], 'stage': stage, 'original_params' : self.parameters.base_file, 'nproc': nproc, 'n_sims' : n_sims } for output in outputs: meta['sim_dirs'].append(output['simulation_dir']) json_outfile = OPJ(self.working_dir, _create_metafile_name(\ self.working_dir, self.title, stage)) with open(json_outfile, 'w') as outf: json.dump(meta, outf, sort_keys=True, indent=4,\ ensure_ascii=False) print('{0}\nOutput information sent to {1}\n'.\ format('-' * 80, json_outfile))
[docs] def plot_optimization(self, freq='daily', method='time_series',\ plot_vars='both', plot_1to1=True, return_fig=False,\ n_plots=4): """ Basic plotting of current optimization results with limited options. Plots measured, original simluated, and optimization simulated variabes Not recommended for plotting results when n_sims is very large, instead use options from an OptimizationResult object, or employ a user-defined method using the result data. Keyword Arguments: freq (str): frequency of time series plots, value can be 'daily' or 'monthly' for solar radiation method (str): 'time_series' for time series sub plot of each simulation alongside measured radiation. Another choice is 'correlation' which plots each measured daily solar radiation value versus the corresponding simulated variable as subplots one for each simulation in the optimization. With coefficients of determiniationi i.e. square of pearson correlation coef. plot_vars (str): what to plot alongside simulated srad: 'meas': plot simulated along with measured swrad 'orig': plot simulated along with the original simulated swrad 'both': plot simulated, with original simulation and measured plot_1to1 (bool): if True plot one to one line on correlation scatter plot, otherwise exclude. return_fig (bool): flag whether to return matplotlib figure Returns: f (:obj:`matplotlib.figure.Figure`): If kwarg return_fig=True, then return copy of the figure that is generated to the user. """ if not self.arb_outputs: raise ValueError('You have not run any optimizations') var_name = self.statvar_name X = self.measured_arb idx = X.index.intersection(load_statvar(self.arb_outputs[0]\ ['statvar'])['{}'.format(var_name)].index) X = X[idx] orig = load_statvar(OPJ(self.input_dir, 'statvar.dat'))['{}'\ .format(var_name)][idx] meas = self.measured_arb[idx] sims = [load_statvar(out['statvar'])['{}'.format(var_name)][idx] for \ out in self.arb_outputs] simdirs = [out['simulation_dir'].split(os.sep)[-1].\ replace('_', ' ') for out in self.arb_outputs] var_name = '{}'.format(self.statvar_name) n = len(self.arb_outputs) # number of simulations to plot # user defined number of subplots from first n_plots results if (n > n_plots): n = n_plots # styles for each plot ms = 4 # markersize for all points orig_sty = dict(linestyle='none',markersize=ms,\ markerfacecolor='none', marker='s',\ markeredgecolor='royalblue', color='royalblue') meas_sty = dict(linestyle='none',markersize=ms+1,\ markerfacecolor='none', marker='1',\ markeredgecolor='k', color='k') sim_sty = dict(linestyle='none',markersize=ms,\ markerfacecolor='none', marker='o',\ markeredgecolor='r', color='r') ## number of subplots and rows (two plots per row) nrow = n//2 # round down if odd n ncol = 2 odd_n = False if n/2. - nrow == 0.5: nrow+=1 # odd number need extra row odd_n = True ######## ## Start plots depnding on key word arguments ######## if freq == 'daily' and method == 'time_series': fig, ax = plt.subplots(n, sharex=True, sharey=True,\ figsize=(12,n*3.5)) axs = ax.ravel() for i,sim in enumerate(sims[:n]): if plot_vars in ('meas', 'both'): axs[i].plot(meas, label='Measured', **meas_sty) if plot_vars in ('orig', 'both'): axs[i].plot(orig, label='Original sim.', **orig_sty) axs[i].plot(sim, **sim_sty) axs[i].set_ylabel('sim: {}'.format(simdirs[i]), fontsize=10) if i == 0: axs[i].legend(markerscale=5, loc='best') fig.subplots_adjust(hspace=0) fig.autofmt_xdate() #monthly means elif freq == 'monthly' and method == 'time_series': # compute monthly means meas = meas.groupby(meas.index.month).mean() orig = orig.groupby(orig.index.month).mean() # change line styles for monthly plots to lines not points for d in (orig_sty, meas_sty, sim_sty): d['linestyle'] = '-' d['marker'] = None fig, ax = plt.subplots(nrows=nrow, ncols=ncol, figsize=(12,n*3.5)) axs = ax.ravel() for i,sim in enumerate(sims[:n]): if plot_vars in ('meas', 'both'): axs[i].plot(meas, label='Measured', **meas_sty) if plot_vars in ('orig', 'both'): axs[i].plot(orig, label='Original sim.', **orig_sty) sim = sim.groupby(sim.index.month).mean() axs[i].plot(sim, **sim_sty) axs[i].set_ylabel('sim: {}\nmean'.format(simdirs[i]),\ fontsize=10) axs[i].set_xlim(0.5,12.5) if i == 0: axs[i].legend(markerscale=5, loc='best') if odd_n: # empty subplot if odd number of simulations fig.delaxes(axs[n]) fig.text(0.5, 0.1, 'month') #x-y scatter elif method == 'correlation': ## figure fig, ax = plt.subplots(nrows=nrow, ncols=ncol, figsize=(12,n*3)) axs = ax.ravel() ## subplot dimensions meas_min = min(X) meas_max = max(X) for i, sim in enumerate(sims[:n]): Y = sim sim_max = max(Y) sim_min = min(Y) m = max(meas_max,sim_max) if plot_1to1: axs[i].plot([0, m], [0, m], 'k--', lw=2) ## one to one line axs[i].set_xlim(meas_min,meas_max) axs[i].set_ylim(sim_min, sim_max) axs[i].plot(X, Y, **sim_sty) axs[i].set_ylabel('sim: {}'.format(simdirs[i])) axs[i].set_xlabel('Measured {}'.format(var_name)) axs[i].text(0.05, 0.95,r'$R^2 = {0:.2f}$'.format(\ X.corr(Y)**2), fontsize=16,\ ha='left', va='center', transform=axs[i].transAxes) if odd_n: # empty subplot if odd number of simulations fig.delaxes(axs[n]) if return_fig: return fig
def _create_metafile_name(out_dir, opt_title, stage): """ Search through output directory where simulations are conducted look for all metadata simulation json files and find out if the current simulation is a replicate. Then use that information to build the correct file name for the output json file. The series are typically run in parallel that is why this step has to be done after running multiple simulations from an optimization stage. Arguments: out_dir (str): path to directory with model results, i.e. location where simulation series outputs and optimization json files are located, aka Optimizer.working_dir opt_title (str): optimization instance title for file search stage (str): stage of optimization, e.g. 'swrad', 'pet' Returns: name (str): file name for the current optimization simulation series metadata json file. E.g 'dry_creek_swrad_opt.json', or if this is the second time you have run an optimization titled 'dry_creek' the next json file will be returned as 'dry_creek_swrad_opt1.json' and so on with integer increments """ meta_re = re.compile(r'^{}_{}_opt(\d*)\.json'.format(opt_title, stage)) reps = [] for f in os.listdir(out_dir): if meta_re.match(f): nrep = meta_re.match(f).group(1) if nrep == '': reps.append(0) else: reps.append(nrep) if not reps: name = '{}_{}_opt.json'.format(opt_title, stage) else: # this is the nth optimization done under the same title n = max(map(int, reps)) + 1 name = '{}_{}_opt{}.json'.format(opt_title, stage, n) return name
[docs]def resample_param(params, param_name, how='uniform', mu_factor=1,\ noise_factor=0.1): """ Resample PRMS parameter by shifting all values by a constant that is taken from a uniform distribution, where the range of the uniform values is equal to the difference between the min and max of the allowable range. The parameter min and max are set in ``Optimizer.param_ranges``. If the resampling method (``how`` argument) is set to 'normal', randomly sample a normal distribution with mean = mean(parameter) X ``mu_factor`` and sigma = param allowable range multiplied by ``noise_factor``. If parameters have array length <= 366 then individual parameter values are resampled otherwise resample all param values at once, e.g. by taking a single random value from the uniform distribution. If they are taking all at once using the normal method then the original values are scaled by mu_factor and a normal random variable with mean=0 and std dev = parameter range X ``noise_factor``. Arguments: params (:class:`prms_python.Parameters`): ``Parameters`` object param_name (str): name of PRMS parameter to resample Keyword Arguments: how (str): distribution to resample parameters from in the case that each parameter element can be resampled (len <=366) Currently works for uniform and normal distributions. noise_factor (float): factor to multiply parameter range by, use the result as the standard deviation for the normal rand. variable used to add element wise noise. i.e. higher noise_factor will result in higher variance. Must be > 0. Returns: ret (:obj:`numpy.ndarry`): ndarray of param after resampling Raises: KeyError: if ``param_name`` not a valid parameter name ValueError: if the parameter range has not been set in ``Optimizer.param_ranges`` """ p_min, p_max = Optimizer.param_ranges.get(param_name,(-1,-1)) # create dictionary of parameter basic info (not values) param_dic = {param['name']: param for param in params.base_params} if not param_dic.get(param_name): raise KeyError('{} is not a valid parameter'.format(param_name)) if p_min == p_max == -1: raise ValueError("""{} has not been added to the dictionary of parameters to resample, add it's allowable min and max value to the Optimizer.param_ranges attribute in Optimizer.py""".format(param_name)) dim_case = None nhru = params.dimensions['nhru'] ndims = param_dic.get(param_name)['ndims'] dimnames = param_dic.get(param_name)['dimnames'] length = param_dic.get(param_name)['length'] param = deepcopy(params[param_name]) # could expand list and check parameter name also e.g. cascade_flg # is a parameter that should not be changed dims_to_not_change = set(['ncascade','ncascdgw','nreach',\ 'nsegment']) if (len(set.intersection(dims_to_not_change, set(dimnames))) > 0): raise ValueError("""{} should not be resampled as it relates to the location of cascade flow parameters.""".format(param_name)) # use param info to get dimension info- e.g. if multidimensional if (ndims == 1 and length <= 366): dim_case = 'resample_each_value' # for smaller param dimensions elif (ndims == 1 and length > 366): dim_case = 'resample_all_values_once' # covers nssr, ngw, etc. elif (ndims == 2 and dimnames[1] == 'nmonths' and \ nhru == params.dimensions[dimnames[0]]): dim_case = 'nhru_nmonths' elif not dim_case: raise ValueError('The {} parameter is not set for resampling'.\ format(param_name)) # #testing purposes # print('name: ', param_name) # print('max_val: ', p_max) # print('min_val: ', p_min) # print('ndims: ', ndims) # print('dimnames: ', dimnames) # print('length: ', length) # print('resample_method: ', dim_case) s = (p_max - p_min) * noise_factor # std_dev (s) default: param_range/10 #do resampling based on param dimensions and sampling distribution if dim_case == 'resample_all_values_once': if how == 'uniform': ret = np.random.uniform(low=p_min, high=p_max, size=param.shape) elif how == 'normal': # scale parameter mean if mu_factor given param *= mu_factor tmp = np.random.normal(0, s, size=param.shape) ret = tmp + param elif dim_case == 'resample_each_value': ret = param if how == 'uniform': ret = np.random.uniform(low=p_min, high=p_max, size=param.shape) elif how == 'normal': # the original value is considered the mean if len(ret.shape) != 0: for i, el in enumerate(param): mu = el * mu_factor ret[i] = np.random.normal(mu, s) else: # single value parameter mu = param * mu_factor ret = np.random.normal(mu, s) # nhru by nmonth dimensional params elif dim_case == 'nhru_nmonths': ret = param if how == 'uniform': for month in range(12): ret[month] = np.random.uniform(low=p_min, high=p_max,\ size=param[0].shape) elif how == 'normal': for i, el in enumerate(param): el *= mu_factor tmp = np.random.normal(0, s, size=el.shape) ret[i] = tmp + el return ret
def _mod_params(parameters, params, param_names): # loop through list of params and assign their values to Parameter instance ret = copy(parameters) for idx, param in enumerate(params): ret[param_names[idx]] = np.array(param) return ret
[docs]class OptimizationResult: """ The ``OptimizationResult`` object serves to collect and manage output from an ``Optimizer`` method. Upon initialization and a given optimization stage that was used when running the Optimizer method, e.g. ``monte_carlo``, the class gathers all JSON metadata that was produced for the given stage. The ``OptimizationResult`` has three main user methods: first ``result_table`` which returns the top n simulations according to four model performance metrics (Nash-Sutcliffe efficiency (NSE), root-mean squared-error (RMSE), percent bias (PBIAS), and the coefficient of determination (COEF_DET) as calculated against measured data. For example the table may look like: >>> ddsolrad_res = OptimizationResult(work_directory, stage=stage) >>> top10 = ddsolrad_res.result_table(freq='monthly',top_n=10) >>> top10 ======================== ======== ======= ========= ======== ddsolrad parameters NSE RMSE PBIAS COEF_DET ======================== ======== ======= ========= ======== orig_params 0.956267 39.4725 -0.885715 0.963116 tmax_index_54.2224631748 0.921626 47.6092 -0.849256 0.94402 tmax_index_44.8823940703 0.879965 58.9194 5.79603 0.922021 tmax_index_47.6835387480 0.764133 82.5918 -4.78896 0.837582 ======================== ======== ======= ========= ======== Second, the ``get_top_ranked_sims`` which returns a dictionary that map key information about the top n ranked simulations, an example returned dictionary may look like: >>> { 'dir_name' : ['pathToSim1', 'pathToSim2'], 'param_path' : ['pathToSim1/input/parameters', 'pathToSim2/input/parameters'], 'statvar_path' : ['pathToSim1/output/statvar.dat', 'pathToSim2/output/statvar.dat'], 'params_adjusted' : [[param_names_sim1], [param_names_sim2]] } The third method of ``OptimizationResult`` is ``archive`` which essentially opens all parameter and statvar files from each simulation of the given stage and archives the parameters that were modified and their modified values and the statistical variable (PRMS time series output) that is associated with the optimization stage. Other ``Optimizer`` simulation metadata is also gathered and new JSON metadata containing only this information is created and written within a newly created "archived" subdirectory within the same directory that the ``Optimizer`` routine managed simulations. The ``OptimizationResult.archive`` method then recursively deletes the simulation data for each of the given stage. """ def __init__(self, working_dir, stage): """ Create an ``OptimizationResult`` instance to manage output and analyse parameter- output relationships as produced by the use of an ``Optimizer`` method of a user defined optimization stage. """ self.working_dir = working_dir self.stage = stage self.metadata_json_paths = self._get_optr_jsons(working_dir, stage) self.total_sims = self._count_total_sims() self.statvar_name = self._get_statvar_name(stage) self.measured = self._get_measured(stage) self.input_dir = self._get_input_dir(stage) self.input_params = self._get_input_params(stage) # if there are more than one input param for given stage if len(self.input_params) > 1: print( """Warning: there were more than one initial parameter sets used for the optimization for stage: {}. Make sure to compare the the correct input params with their corresponding output sims.""".format(stage)) print('\nThis optimization stage used the\ following input parameter files:\n{}'.format('\n'.join(self.input_params))) def _count_total_sims(self): # total number of simulations of given stage in working directory tracked_dirs = [] for f in self.metadata_json_paths[self.stage]: with open(f) as fh: json_data = json.load(fh) tracked_dirs.extend(json_data.get('sim_dirs')) return len(tracked_dirs) def _get_optr_jsons(self, work_dir, stage): """ Retrieve locations of optimization output jsons which contain metadata needed to understand optimization results. Create dictionary of each optimization with stage as key and lists of corresponding json file paths as values. Arguments: work_dir (str): path to directory with model results, i.e. location where simulation series outputs and optimization json files are located, aka Optimizer.working_dir stage (str): the stage ('ddsolrad', 'jhpet', 'flow', etc.) of the optimization in which to gather the jsons Returns: ret (dict): dictionary of stage (keys) and lists of json file paths for that stage (values). """ ret = {} optr_metafile_re = re.compile(r'^.*_{}_opt(\d*)\.json'.\ format(stage)) ret[stage] = [OPJ(work_dir, f) for f in\ os.listdir(work_dir) if\ optr_metafile_re.match(f)] return ret def _get_input_dir(self, stage): # retrieves the input directory from the first json file of given stage json_file = self.metadata_json_paths[stage][0] with open(json_file) as jf: meta_dic = json.load(jf) return '{}'.format(os.sep).join(meta_dic['original_params'].\ split(os.sep)[:-1]) def _get_input_params(self, stage): json_files = self.metadata_json_paths[stage] param_paths = [] for json_file in json_files: with open(json_file) as jf: meta_dic = json.load(jf) param_paths.append(meta_dic['original_params']) return list(set(param_paths)) def _get_sim_dirs(self, stage): jsons = self.metadata_json_paths[stage] json_files = [] sim_dirs = [] for inf in jsons: with open(inf) as json_file: json_files.append(json.load(json_file)) for json_file in json_files: sim_dirs.extend(json_file['sim_dirs']) # list of all simulation directory paths for stage return sim_dirs def _get_measured(self, stage): # only need to open one json file to get this information if not self.metadata_json_paths.get(stage): return # no optimization json files exist for given stage first_json = self.metadata_json_paths[stage][0] with open(first_json) as json_file: json_data = json.load(json_file) measured_series = pd.Series.from_csv(json_data.get('measured'),\ parse_dates=True) return measured_series def _get_statvar_name(self, stage): # only need to open one json file to get this information try: first_json = self.metadata_json_paths[stage][0] except: raise ValueError("""No optimization has been run for stage: {}""".format(stage)) with open(first_json) as json_file: json_data = json.load(json_file) var_name = json_data.get('statvar_name') return var_name def result_table(self, freq='daily', top_n=5, latex=False): ##TODO: add stats for freq options annual (means or sum) sim_dirs = self._get_sim_dirs(self.stage) if top_n >= len(sim_dirs): top_n = len(sim_dirs) + 1 # for returning inclusive last sim sim_names = [path.split(os.sep)[-1] for path in sim_dirs] meas_var = self._get_measured(self.stage) statvar_name = self._get_statvar_name(self.stage) orig_statvar = load_statvar(OPJ(self.input_dir,'statvar.dat'))\ [statvar_name] result_df = pd.DataFrame(columns=\ ['NSE','RMSE','PBIAS','COEF_DET','ABS(PBIAS)']) orig_results = pd.DataFrame(index=['orig_params'],\ columns=['NSE','RMSE','PBIAS','COEF_DET']) # get datetime indices that overlap from measured and simulated sim_out = load_statvar(OPJ(sim_dirs[0], 'outputs', 'statvar.dat'))\ [statvar_name] idx = meas_var.index.intersection(sim_out.index) meas_var = copy(meas_var[idx]) #sim_out = sim_out[idx] orig_statvar = orig_statvar[idx] if freq == 'monthly': meas_mo = meas_var.groupby(meas_var.index.month).mean() orig_mo = orig_statvar.groupby(orig_statvar.index.month).mean() for i, sim in enumerate(sim_dirs): try: sim_out = load_statvar(OPJ(sim, 'outputs', 'statvar.dat'))\ ['{}'.format(statvar_name)] except: # simulation might have been removed or missing pass sim_out = sim_out[idx] if freq == 'daily': result_df.loc[sim_names[i]] = [\ nash_sutcliffe(meas_var, sim_out),\ rmse(meas_var, sim_out),\ percent_bias(meas_var,sim_out),\ meas_var.corr(sim_out)**2,\ np.abs(percent_bias(meas_var, sim_out)) ] orig_results.loc['orig_params'] = [\ nash_sutcliffe(orig_statvar,meas_var),\ rmse(orig_statvar,meas_var),\ percent_bias(orig_statvar,meas_var),\ orig_statvar.corr(meas_var)**2] elif freq == 'monthly': sim_out = sim_out.groupby(sim_out.index.month).mean() result_df.loc[sim_names[i]] = [\ nash_sutcliffe(meas_mo, sim_out),\ rmse(meas_mo, sim_out),\ percent_bias(meas_mo, sim_out),\ meas_mo.corr(sim_out)**2,\ np.abs(percent_bias(meas_mo, sim_out)) ] orig_results.loc['orig_params'] = [\ nash_sutcliffe(orig_mo,meas_mo),\ rmse(orig_mo,meas_mo),\ percent_bias(orig_mo,meas_mo),\ orig_mo.corr(meas_mo)**2] sorted_result = result_df.sort_values(by=['NSE','RMSE','ABS(PBIAS)',\ 'COEF_DET'], ascending=[False,True,True,False]) sorted_result.columns.name = '{} parameters'.format(self.stage) sorted_result = sorted_result[['NSE','RMSE','PBIAS','COEF_DET']] sorted_result = pd.concat([orig_results,sorted_result]) if latex: return sorted_result[:top_n].to_latex(escape=False) else: return sorted_result[:top_n] def get_top_ranked_sims(self, sorted_df): # use result table to make dic with best param and statvar paths # index of table are simulation directory names ret = { 'dir_name' : [], 'param_path' : [], 'statvar_path' : [], 'params_adjusted' : [] } json_paths = self.metadata_json_paths[self.stage] for el in sorted_df.drop('orig_params').index: ret['dir_name'].append(el) ret['param_path'].append(OPJ(self.working_dir,el,'inputs',\ 'parameters')) ret['statvar_path'].append(OPJ(self.working_dir,el,'outputs',\ 'statvar.dat')) for f in json_paths: with open(f) as fh: json_data = json.load(fh) if OPJ(self.working_dir, el) in json_data.get('sim_dirs'): ret['params_adjusted'].append(\ json_data.get('params_adjusted')) return ret
[docs] def archive(self, remove_sims=True, remove_meta=False, metric_freq='daily'): """ Create archive directory to hold json files that contain information of adjusted parameters, model output, and performance metrics for each Optimizer simulation of the OptimizationResult.stage in the OptimizationResult.working_dir. Keyword Arguments: remove_sims (bool): If True recursively delete all folders and files associated with original simulations of the OptimizationResult.stage in the OptimizationResult.working_dir, if False do not delete simulations. remove_meta (bool): Whether to delete original Optimizer JSON metadata files, default is False. metric_freq (Str): Frequency of output metric computation for recording of model performance. Can be 'daily' (default) or 'monthly'. Note, other results can be computed later with archived results. Returns: None """ # create output archive directory archive_dir = OPJ(self.working_dir,"{}_archived".format(self.stage)) if not os.path.isdir(archive_dir): os.mkdir(archive_dir) # create table and use to make mapping dic table = self.result_table(freq=metric_freq,\ top_n=self.total_sims, latex=False) map_dic = self.get_top_ranked_sims(table) metadata_json_paths = self.metadata_json_paths[self.stage] # get measured optimization variable path first_json = metadata_json_paths[0] with open(first_json) as json_file: json_data = json.load(json_file) measured_path = json_data.get('measured') # record info for each simulation and archive to JSONs # pandas series and numpy arrays are converted to Python lists # for JSON serialization for i, sim in enumerate(map_dic.get('dir_name')): json_path = OPJ(archive_dir, '{sim}.json'.format(sim=sim)) try: output_series = load_statvar(OPJ(self.working_dir, sim,\ 'outputs', 'statvar.dat'))\ [self.statvar_name] except: # simulation directory was already removed continue # look for resampling method info for the particular simulation for f in metadata_json_paths: with open(f) as tmp_file: tmp = json.load(tmp_file) if OPJ(self.working_dir, sim) in tmp.get('sim_dirs'): resample = tmp.get('resample') noise_factor = tmp.get('noise_factor') mu_factor = tmp.get('mu_factor') json_data = { 'param_names' : [], 'param_values' : [], 'original_param_path' : self.input_params, 'measured_path' : measured_path, 'output_name' : self.statvar_name, 'output_date_index' : output_series.\ index.astype(str).tolist(), 'output_values' : output_series.values.tolist(), 'metric_freq' : metric_freq, 'resample' : resample, 'stage' : self.stage, 'mu_factor' : mu_factor, 'noise_factor' : noise_factor, 'NSE' : table.loc[sim, 'NSE'], 'RMSE' : table.loc[sim, 'RMSE'], 'PBIAS' : table.loc[sim, 'PBIAS'], 'COEF_DET' : table.loc[sim, 'COEF_DET'] } for param in map_dic.get('params_adjusted')[i]: json_data['param_names'].append(param) json_data['param_values'].append(Parameters(\ map_dic.get('param_path')[i])[param]\ .tolist()) # save JSON file into archive directory with open(json_path, 'w') as outf: json.dump(json_data, outf, sort_keys = True, indent = 4,\ ensure_ascii = False) # recursively delete all simulation directories after archiving if remove_sims: path = OPJ(self.working_dir, sim) for dirpath, dirnames, filenames in os.walk(path,\ topdown=False): shutil.rmtree(dirpath, ignore_errors=True) else: continue # optional delete the original JSON metadata if remove_meta: for meta_file in self.metadata_json_paths[self.stage]: try: os.remove(meta_file) except: continue