Source code for tamaas.dumpers

# -*- mode:python; coding: utf-8 -*-
#
# Copyright (©) 2016-2024 EPFL (École Polytechnique Fédérale de Lausanne),
# Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides)
# Copyright (©) 2020-2024 Lucas Frérot
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

"""Dumpers for the class :py:class:`Model <tamaas._tamaas.Model>`."""

from pathlib import Path
import glob
import json
import io
import typing as ts
from collections.abc import Collection

import numpy as np

from .. import __version__, type_traits
from .._tamaas import (
    ModelDumper,
    model_type,
    mpi,
    ModelFactory,
    Model,
)
from ._helper import (
    step_dump,
    directory_dump,
    local_slice,
    _is_surface_field,
    _basic_types,
    file_handler,
    FileType,
    NameType,
)

__all__ = [
    "JSONDumper",
    "FieldDumper",
    "NumpyDumper",
]

_reverse_trait_map = {
    'model_type.' + t.__name__: mtype
    for mtype, t in type_traits.items()
}


def _get_attributes(model: Model):
    """Get model attributes."""
    return {
        'model_type': str(model.type),
        'system_size': model.system_size,
        'discretization': model.global_shape,
        'boundary_fields': model.boundary_fields,
        'program': f"Tamaas {__version__}, DOI:10.21105/joss.02121",
    }


def _create_model(attrs: ts.MutableMapping):
    """Create a model from attribute dictionary."""
    mtype = _reverse_trait_map[attrs['model_type']]

    # netcdf4 converts 1-lists attributes to numbers
    for attr in ['system_size', 'discretization']:
        if not isinstance(attrs[attr], Collection):
            attrs[attr] = [attrs[attr]]
    return ModelFactory.createModel(mtype, attrs['system_size'],
                                    attrs['discretization'])


class MPIIncompatibilityError(RuntimeError):
    """Raised when code is not meant to be executed in MPI environment."""


class ModelError(ValueError):
    """Raised when unexpected model is passed to a dumper with a state."""


class ComponentsError(ValueError):
    """Raised when an unexpected number of components is encountred."""


class _ModelJSONEncoder(json.JSONEncoder):
    """Encode a model to JSON."""

    def default(self, obj):
        """Encode model."""
        if isinstance(obj, Model):
            model = obj
            attrs = _get_attributes(model)
            model_dict = {
                'attrs': attrs,
                'fields': {},
                'operators': [],
            }

            for field in model:
                model_dict['fields'][field] = model[field].tolist()

            for op in model.operators:
                model_dict['operators'].append(op)

            return model_dict

        return json.JSONEncoder.default(self, obj)


[docs]class JSONDumper(ModelDumper): """Dumper to JSON."""
[docs] def __init__(self, file_descriptor: FileType): """Construct with file handle.""" super(JSONDumper, self).__init__() self.fd = file_descriptor
@file_handler('w') def _dump_to_file(self, fd: ts.IO[str], model: Model): json.dump(model, fd, cls=_ModelJSONEncoder)
[docs] def dump(self, model: Model): """Dump model.""" self._dump_to_file(self.fd, model)
[docs] @classmethod @file_handler('r') def read(cls, fd: ts.IO[str]): """Read model from file.""" properties = json.load(fd) model = _create_model(properties['attrs']) for name, field in properties['fields'].items(): v = np.asarray(field) if model.type in _basic_types: v = v.reshape(list(v.shape) + [1]) model[name] = v return model
[docs]class FieldDumper(ModelDumper): """Abstract dumper for python classes using fields.""" postfix = "" extension = "" name_format = "{basename}{postfix}.{extension}"
[docs] def __init__(self, basename: NameType, *fields, **kwargs): """Construct with desired fields.""" super(FieldDumper, self).__init__() self.basename = basename self.fields: ts.List[str] = list(fields) self.all_fields: bool = kwargs.get('all_fields', False)
[docs] def add_field(self, field: str): """Add another field to the dump.""" if field not in self.fields: self.fields.append(field)
def _dump_to_file(self, file_descriptor: FileType, model: Model): """Dump to a file (path-like or file handle).""" raise NotImplementedError()
[docs] def get_fields(self, model: Model): """Get the desired fields.""" if not self.all_fields: requested_fields = self.fields else: requested_fields = list(model) return {field: model[field] for field in requested_fields}
[docs] def dump(self, model: Model): """Dump model.""" self._dump_to_file(self.file_path, model)
[docs] @classmethod def read(cls, file_descriptor: FileType): """Read model from file.""" raise NotImplementedError( f'read() method not implemented in {cls.__name__}')
[docs] @classmethod def read_sequence(cls, glob_pattern): """Read models from a file sequence.""" return map(cls.read, glob.iglob(glob_pattern))
@property def file_path(self): """Get the default filename.""" return self.name_format.format(basename=self.basename, postfix=self.postfix, extension=self.extension)
[docs]@directory_dump('numpys') @step_dump class NumpyDumper(FieldDumper): """Dumper to compressed numpy files.""" extension = 'npz' def _dump_to_file(self, file_descriptor: FileType, model: Model): """Save to compressed multi-field Numpy format.""" if mpi.size() > 1: raise MPIIncompatibilityError("NumpyDumper does not function " "at all in parallel") np.savez_compressed(file_descriptor, attrs=json.dumps(_get_attributes(model)), **self.get_fields(model))
[docs] @classmethod def read(cls, file_descriptor: FileType): """Create model from Numpy file.""" data = np.load(file_descriptor, mmap_mode='r') model = _create_model(json.loads(str(data['attrs']))) for k, v in filter(lambda k: k[0] != 'attrs', data.items()): if model.type in _basic_types: v = v.reshape(list(v.shape) + [1]) model[k] = v return model
try: import h5py __all__.append("H5Dumper")
[docs] @directory_dump('hdf5') @step_dump class H5Dumper(FieldDumper): """Dumper to HDF5 file format.""" extension = 'h5'
[docs] def __init__(self, basename: NameType, *fields, **kwargs): super(H5Dumper, self).__init__(basename, *fields, **kwargs) self.chunks = kwargs.get('chunks')
@staticmethod def _hdf5_args(): if mpi.size() > 1: from mpi4py import MPI # noqa mpi_args = dict(driver='mpio', comm=MPI.COMM_WORLD) comp_args = {} # compression does not work in parallel else: mpi_args = {} comp_args = dict(compression='gzip', compression_opts=7) return mpi_args, comp_args def _dump_to_file(self, file_descriptor: FileType, model: Model): """Save to HDF5 with metadata about the model.""" # Setup for MPI if not h5py.get_config().mpi and mpi.size() > 1: raise MPIIncompatibilityError("HDF5 does not have MPI support") mpi_args, comp_args = self._hdf5_args() with h5py.File(file_descriptor, 'w', **mpi_args) as handle: # Writing data for name, field in self.get_fields(model).items(): shape = list(field.shape) if mpi.size() > 1: xdim = 0 if _is_surface_field(field, model) else 1 shape[xdim] = mpi_args['comm'].allreduce(shape[xdim]) dset = handle.create_dataset(name, shape, field.dtype, chunks=self.chunks, **comp_args) s = local_slice(field, model, name in model.boundary_fields) dset[s] = field # Writing metadata for name, attr in _get_attributes(model).items(): handle.attrs[name] = attr
[docs] @classmethod def read(cls, file_descriptor: FileType): """Create model from HDF5 file.""" mpi_args, _ = cls._hdf5_args() with h5py.File(file_descriptor, 'r', **mpi_args) as handle: model = _create_model(handle.attrs) for k, v in handle.items(): v = np.asanyarray(v) if model.type in _basic_types: v = v.reshape(list(v.shape) + [1]) surface_field = \ k in handle.attrs.get('boundary_fields', {}) \ or _is_surface_field(v, model) s = local_slice(v, model, surface_field) if (surface_field and v.ndim == len(model.boundary_shape)) \ or (not surface_field and v.ndim == len(model.shape)): s = s + (np.newaxis, ) model[k] = v[s].copy() return model
except ImportError: pass try: import uvw # noqa __all__ += [ "UVWDumper", "UVWGroupDumper", ]
[docs] @directory_dump('paraview') @step_dump class UVWDumper(FieldDumper): """Dumper to VTK files for elasto-plastic calculations.""" extension = 'vtr' def _dump_to_file(self, file_descriptor: FileType, model: Model): """Dump displacements, plastic deformations and stresses.""" if mpi.size() > 1: raise MPIIncompatibilityError("UVWDumper does not function " "properly in parallel") bdim = len(model.boundary_shape) # Local MPI size lsize = model.shape gsize = mpi.global_shape(model.boundary_shape) gshape = gsize if len(lsize) > bdim: gshape = [model.shape[0]] + gshape # Space coordinates coordinates = [ np.linspace(0, L, N, endpoint=False) for L, N in zip(model.system_size, gshape) ] # If model has subsurfce domain, z-coordinate is always first dimension_indices = np.arange(bdim) if len(lsize) > bdim: dimension_indices += 1 dimension_indices = np.concatenate((dimension_indices, np.asarray([0]))) coordinates[0] = \ np.linspace(0, model.system_size[0], gshape[0]) offset = np.zeros_like(dimension_indices) offset[0] = mpi.local_offset(gsize) rectgrid = uvw.RectilinearGrid if mpi.size() == 1 \ else uvw.parallel.PRectilinearGrid # Creating rectilinear grid with correct order for components coordlist = [ coordinates[i][o:o + lsize[i]] for i, o in zip(dimension_indices, offset) ] grid = rectgrid( file_descriptor, coordlist, compression=True, offsets=offset, ) fields = self.get_fields(model).items() # Iterator over fields we want to dump on system geometry if model.type in {model_type.volume_1d, model_type.volume_2d}: fields_it = filter(lambda t: not t[0] in model.boundary_fields, fields) else: fields_it = iter(fields) for name, field in fields_it: array = uvw.DataArray(field, dimension_indices, name) grid.addPointData(array) grid.write()
[docs] @directory_dump('paraview') class UVWGroupDumper(FieldDumper): """Dumper to ParaViewData files.""" extension = 'pvd'
[docs] def __init__(self, basename: NameType, *fields, **kwargs): """Construct with desired fields.""" super(UVWGroupDumper, self).__init__(basename, *fields, **kwargs) subdir = Path('paraview') / f'{basename}-VTR' subdir.mkdir(parents=True, exist_ok=True) self.uvw_dumper = UVWDumper( Path(f'{basename}-VTR') / basename, *fields, **kwargs) self.group = uvw.ParaViewData(self.file_path, compression=True)
def _dump_to_file(self, file_descriptor, model): self.group.addFile( self.uvw_dumper.file_path.replace('paraview/', ''), timestep=self.uvw_dumper.count, ) self.group.write() self.uvw_dumper.dump(model)
except ImportError: pass try: from netCDF4 import Dataset __all__.append("cls") @directory_dump('netcdf') class NetCDFDumper(FieldDumper): """Dumper to netCDF4 files.""" extension = "nc" time_dim = 'frame' format = 'NETCDF4' def _file_setup(self, grp, model: Model): grp.createDimension(self.time_dim, None) # Attibutes for k, v in _get_attributes(model).items(): grp.setncattr(k, v) # Local dimensions voigt_dim = type_traits[model.type].voigt components = type_traits[model.type].components self._vec = grp.createDimension('spatial', components) self._tens = grp.createDimension('Voigt', voigt_dim) self.model_info = model.global_shape, model.type global_boundary_shape = mpi.global_shape(model.boundary_shape) # Create boundary dimensions for label, size, length in zip("xy", global_boundary_shape, model.boundary_system_size): grp.createDimension(label, size) coord = grp.createVariable(label, 'f8', (label, )) coord[:] = np.linspace(0, length, size, endpoint=False) self._create_variables(grp, model, lambda f: _is_surface_field(f[1], model), global_boundary_shape, "xy") # Create volume dimension if model.type in {model_type.volume_1d, model_type.volume_2d}: size = model.shape[0] grp.createDimension("z", size) coord = grp.createVariable("z", 'f8', ("z", )) coord[:] = np.linspace(0, model.system_size[0], size) self._create_variables( grp, model, lambda f: not _is_surface_field(f[1], model), model.global_shape, "zxy") self.has_setup = True def _set_collective(self, rootgrp): if mpi.size() == 1: return for v in rootgrp.variables.values(): if self.time_dim in v.dimensions: v.set_collective(True) def _dump_to_file(self, file_descriptor: FileType, model: Model): if not isinstance(file_descriptor, io.IOBase): mode = 'a' if Path(file_descriptor).is_file() \ and getattr(self, 'has_setup', False) else 'w' try: with Dataset(file_descriptor, mode, format=self.format, parallel=mpi.size() > 1) as rootgrp: if rootgrp.dimensions == {}: self._file_setup(rootgrp, model) self._set_collective(rootgrp) if self.model_info != (model.global_shape, model.type): raise ModelError(f"Unexpected model {mode}") self._dump_generic(rootgrp, model) except ValueError: raise MPIIncompatibilityError("NetCDF4 has no MPI support") def _create_variables(self, grp, model, predicate, shape, dimensions): field_dim = len(shape) fields = list(filter(predicate, self.get_fields(model).items())) dim_labels = list(dimensions[:field_dim]) for label, data in fields: local_dim = [] # If we have an extra component if data.ndim > field_dim: if data.shape[-1] == self._tens.size: local_dim = [self._tens.name] elif data.shape[-1] == self._vec.size: local_dim = [self._vec.name] else: raise ComponentsError( f"{label} has unexpected number of components " f"({data.shape[-1]})") # Downcasting in case of 128 bit float dtype = data.dtype if data.dtype.str[1:] != 'f16' else 'f8' grp.createVariable(label, dtype, [self.time_dim] + dim_labels + local_dim, zlib=mpi.size() == 0) def _dump_generic(self, grp, model): fields = self.get_fields(model).items() new_frame = len(grp.dimensions[self.time_dim]) for label, data in fields: var = grp[label] slice_in_global = (new_frame, ) + local_slice(data, model) var[slice_in_global] = np.array(data, dtype=var.dtype) @classmethod def _open_read(cls, fd): return Dataset(fd, 'r', format=cls.format, parallel=mpi.size() > 1) @staticmethod def _create_model(rootgrp): attrs = {k: rootgrp.getncattr(k) for k in rootgrp.ncattrs()} return _create_model(attrs) @staticmethod def _set_model_fields(rootgrp, model, frame): dims = rootgrp.dimensions.keys() for k, v in filter(lambda k: k[0] not in dims, rootgrp.variables.items()): v = v[frame, :] if model.type in _basic_types: v = np.asarray(v).reshape(list(v.shape) + [1]) model[k] = v[local_slice(v, model)].copy() @classmethod def read(cls, file_descriptor: FileType): """Create model with last frame.""" with cls._open_read(file_descriptor) as rootgrp: model = cls._create_model(rootgrp) cls._set_model_fields(rootgrp, model, -1) return model @classmethod def read_sequence(cls, file_descriptor: FileType): with cls._open_read(file_descriptor) as rootgrp: model = cls._create_model(rootgrp) for frame in range(len(rootgrp.dimensions[cls.time_dim])): cls._set_model_fields(rootgrp, model, frame) yield model except ImportError: pass