Source code for pyrolite_meltsutil.automation

"""
This submodule contains functions for automated execution of alphamelts 1.9.

Issues
------

    * names are truncated for modifychem melts files?
    * need a timeout so processes can keep going, add unfinished experiments to failed list
"""
import datetime
import itertools
import json
import logging
import time
from pathlib import Path

import numpy as np
from pyrolite.comp.codata import renormalise
from pyrolite.geochem.ind import common_elements, common_oxides
from pyrolite.util.log import ToLogger
from pyrolite.util.multip import combine_choices
from tqdm import tqdm

from ..env import MELTS_Env
from ..meltsfile import dict_to_meltsfile
from ..parse import read_envfile, read_meltsfile
from ..util.log import Handle
from .naming import exp_hash, exp_name
from .org import make_meltsfolder
from .process import MeltsProcess
from .timing import estimate_experiment_duration

logger = Handle(__name__)

__chem__ = common_elements(as_set=True) | common_oxides(as_set=True)


[docs] class MeltsExperiment(object): """ Melts Experiment Object. For a single call to melts, with one set of outputs. Autmatically creates the experiment folder, meltsfile and environment file, runs alphaMELTS and collects the results. Todo ---- * Automated creation of folders for experiment results (see :func:`make_meltsfolder`) * Being able to run melts in an automated way (see :class:`MeltsProcess`) * Compressed export/save function * Post-processing functions for i) validation and ii) plotting """ def __init__( self, name="MeltsExperiment", title="MeltsExperiment", fromdir="./", meltsfile=None, env=None, timeout=None, executable=None, ): self.name = name # folder name self.title = title # meltsfile title self.fromdir = fromdir # create an experiment directory here self.log = [] self.timeout = timeout self.executable = executable if meltsfile is not None: self.set_meltsfile(meltsfile) if env is not None: self.set_envfile(env) else: self.set_envfile(MELTS_Env()) self._make_folder()
[docs] def set_meltsfile(self, meltsfile, **kwargs): """ Set the meltsfile for the experiment. Parameters ------------ meltsfile : :class:`pandas.Series` | :class:`str` | :class:`pathlib.Path` Either a path to a valid melts file, a :class:`pandas.Series`, or a multiline string representation of a melts file object. """ self.meltsfile, self.meltsfilepath = read_meltsfile(meltsfile)
[docs] def set_envfile(self, env): """ Set the environment for the experiment. Parameters ------------ env : :class:`str` | :class:`pathlib.Path` Either a path to a valid environment file, a :class:`pandas.Series`, or a multiline string representation of a environment file object. """ self.envfile, self.envfilepath = read_envfile(env)
def _make_folder(self): """ Create the experiment folder. """ self.folder = make_meltsfolder( name=self.name, title=self.title, meltsfile=self.meltsfile, indir=self.fromdir, env=self.envfile, ) self.meltsfilepath = self.folder / (self.title + ".melts") self.envfilepath = self.folder / "environment.txt"
[docs] def run(self, log=False, superliquidus_start=True): """ Call 'run_alphamelts.command'. """ self.mp = MeltsProcess( meltsfile=str(self.title) + ".melts", env="environment.txt", fromdir=str(self.folder), timeout=self.timeout, executable=self.executable, ) self.mp.write([3, [0, 1][superliquidus_start], 4], wait=True, log=log) self.mp.terminate()
[docs] def cleanup(self): pass
[docs] def process_modifications(cfg): """ Process modifications to an configuration composition. Parameters ----------- cfg : :class:`dict` Configuratiion dictionary. Returns -------- cfg : :class:`dict` Configuratiion dictionary. """ if "modifychem" in cfg: modifications = cfg.pop("modifychem", {}) # remove modify chem ek, mk = set(cfg.keys()), set(modifications.keys()) for k, v in modifications.items(): if not np.isnan(v): cfg[k] = v allchem = (ek | mk) & __chem__ unmodified = (ek - mk) & __chem__ offset = np.nansum(np.array(list(modifications.values()))) for uk in unmodified: cfg[uk] = np.round(cfg[uk] * (100.0 - offset) / 100, 4) return cfg
[docs] class MeltsBatch(object): """ Batch of :class:`MeltsExperiment`, which may represent evaluation over a grid of parameters or configurations. Parameters ----------- comp_df : :class:`pandas.DataFrame` Dataframe of compositions. default_config : :class:`dict` Dictionary of default parameters. config_grid : class:`dict` Dictionary of parameters to systematically vary. fromdir : :class:`str` | :class:`pathlib.Path` Directory to run the set of experiments from, and where the results of each of the experiments will be saved. env : :class:`str` | :class:`pathlib.Path` | :class:`pyrolite_meltsutil.env.MELTS_Env` Environment file to use, if not the default. executable : :class:`str` | :class:`pathlib.Path` Path to an executable to use, if not the default (specifically the path to :code:`run_alphamelts.command` or :code:`run_alphamelts.bat`). timeout : :class:`int` Timeout in seconds after which to try and terminate an experiment. logger : :class:`logging.Logger` Logger to use for logging output, if not the default. Attributes ----------- compositions : :class:`list` of :class:`dict` Compositions to use for the experiments. configs : :class:`list` of :class:`dict` Set of experiment configurations to use. experiments : :class:`list` of :class:`dict` Combination of compositions and configurations to generate a 'grid' of experiments. Todo ------ * Can start with a single composition or multiple compositions in a dataframe * Enable grid search for individual parameters * Improved output logging/reporting * Calculate relative number of calculations to be performed for the est duration This is currently about correct for an isobaric calcuation at 10 degree temperature steps over few hundred degrees - but won't work for different T steps. * Does number precision make a difference? """ def __init__( self, comp_df, default_config={}, config_grid={}, fromdir=Path("./"), env=None, executable=None, timeout=None, logger=logger, ): self.timeout = timeout self.logger = logger self.fromdir = Path(fromdir) if not self.fromdir.exists(): self.fromdir.mkdir(parents=True) self.executable = executable # make a file logger fh = logging.FileHandler(self.fromdir / "autolog.log") fh.setLevel(logging.DEBUG) formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) fh.setFormatter(formatter) self.logger.addHandler(fh) self.default = default_config self.env = env or MELTS_Env() # let's establish the grid of configurations self.configs = [] grid = combine_choices(config_grid) for i in grid: # unique configurations _cfg = {**self.default, **i} if _cfg not in self.configs: self.configs.append(_cfg) self.compositions = comp_df.fillna(0).to_dict("records") # combine these to create full experiment configs exprs = [ {**cfg, **cmp} for (cfg, cmp) in itertools.product(self.configs, self.compositions) ] exprs = [process_modifications(cfg) for cfg in exprs] exphashes = np.array([exp_hash(i) for i in exprs]) _, cnts = np.unique(exphashes, return_counts=True) if (cnts > 1).any(): self.logger.debug("Duplicate experiments detected.") self.experiments = { hsh: (exp_name(expr), expr, self.env) for hsh, expr in zip(exphashes, exprs) } # this ensures that no duplicates are preserved self.est_duration = str( datetime.timedelta(seconds=len(self.experiments) * 15) ) # 6s/run self.logger.info("Estimated Calculation Time: {}".format(self.est_duration))
[docs] def dump(self, experiments=None, to_dir=None): """ Serialize the configuration to a json file. Parameters ----------- experiments : :class:`dict` Dictionary of experiments to be serialized. to_dir : :class:`str` | :class:`pathlib.Path` Directory to export file to. """ to_dir = to_dir or self.fromdir experiments = experiments or self.experiments data = json.dumps( { h: (t, exp, env.dump(unset_variables=False)) for (h, (t, exp, env)) in experiments.items() }, sort_keys=False, ensure_ascii=False, ).encode("utf8") target = Path(to_dir) / "meltsBatchConfig.json" target.parent.mkdir(parents=True, exist_ok=True) # may not exist yet? # consider reading old data and leaving updated version here target.touch(exist_ok=True) with open(target, "wb") as f: f.write(data)
[docs] def run( self, overwrite=False, exclude=[], superliquidus_start=True, timeout=None, log=False, ): self.dump() # Serialize the config first timeout = self.timeout or timeout self.started = time.time() experiments = self.experiments if not overwrite: experiments = { h: (t, exp, env) for h, (t, exp, env) in experiments.items() if not (self.fromdir / h).exists() } self.logger.info("Starting {} Calculations.".format(len(experiments))) paths = [] failed = [] for hsh, (title, exp, env) in tqdm( experiments.items(), file=ToLogger(self.logger), mininterval=2 ): exp_exclude = exclude if "exclude" in exp: exp_exclude += exp.pop("exclude") # remove exclude self.logger.debug("Start {}.".format(title)) meltsfile = dict_to_meltsfile(exp, modes=exp["modes"], exclude=exp_exclude) M = MeltsExperiment( name=hsh, title=title, meltsfile=meltsfile, env=env, fromdir=self.fromdir, timeout=timeout, executable=self.executable, ) try: M.run(superliquidus_start=superliquidus_start, log=log) self.logger.debug("Finished {}.".format(title)) except OSError: try: self.logger.warning("Errored @ {}.".format(M.mp.callstring)) except: pass failed.append(title) # should check if it actually ran here (e.g. timeouts) self.duration = datetime.timedelta(seconds=time.time() - self.started) self.logger.info("Calculations Complete after {}".format(self.duration)) if failed: self.logger.warning("Some calculations errored:") for f in failed: self.logger.warning(f)
[docs] def cleanup(self): pass