"""A module for storing quantum mechanical properties in hdf5 format.
Index
-----
.. currentmodule:: dataCAT
.. autosummary::
create_prop_group
create_prop_dset
update_prop_dset
validate_prop_group
index_to_pandas
prop_to_dataframe
API
---
.. autofunction:: create_prop_group
.. autofunction:: create_prop_dset
.. autofunction:: update_prop_dset
.. autofunction:: validate_prop_group
.. autofunction:: index_to_pandas
.. autofunction:: prop_to_dataframe
"""
from __future__ import annotations
from typing import Union, Sequence, Any, Optional, Dict, TYPE_CHECKING
import h5py
import numpy as np
import pandas as pd
from assertionlib import assertion
if TYPE_CHECKING:
from numpy.typing import DTypeLike
else:
DTypeLike = 'numpy.typing.DTypeLike'
__all__ = ['create_prop_group', 'create_prop_dset', 'update_prop_dset',
'validate_prop_group', 'prop_to_dataframe', 'index_to_pandas']
PROPERTY_DOC = r"""A h5py Group containing an arbitrary number of quantum-mechanical properties.
Attributes
----------
\*args : dataset
An arbitrary user-specified property-containing dataset.
index : attribute
A reference to the dataset used as dimensional scale for all property
datasets embedded within this group.
"""
[docs]def create_prop_group(file: h5py.Group, scale: h5py.Dataset) -> h5py.Group:
r"""Create a group for holding user-specified properties.
.. testsetup:: python
>>> import os
>>> from shutil import copyfile
>>> from dataCAT.testing_utils import HDF5_READ, HDF5_TMP as hdf5_file
>>> if os.path.isfile(hdf5_file):
... os.remove(hdf5_file)
>>> _ = copyfile(HDF5_READ, hdf5_file)
.. code:: python
>>> import h5py
>>> from dataCAT import create_prop_group
>>> hdf5_file = str(...) # doctest: +SKIP
>>> with h5py.File(hdf5_file, 'r+') as f:
... scale = f.create_dataset('index', data=np.arange(10))
... scale.make_scale('index')
...
... group = create_prop_group(f, scale=scale)
... print('group', '=', group)
group = <HDF5 group "/properties" (0 members)>
.. testcleanup:: python
>>> if os.path.isfile(hdf5_file):
... os.remove(hdf5_file)
Parameters
----------
file : :class:`h5py.File` or :class:`h5py.Group`
The File or Group where the new ``"properties"`` group should be created.
scale : :class:`h5py.DataSet`
The dimensional scale which will be attached to all property datasets
created by :func:`dataCAT.create_prop_dset`.
Returns
-------
:class:`h5py.Group`
The newly created group.
"""
# Construct the group
grp = file.create_group('properties', track_order=True)
grp.attrs['index'] = scale.ref
grp.attrs['__doc__'] = np.string_(PROPERTY_DOC)
return grp
[docs]def create_prop_dset(group: h5py.Group, name: str, dtype: DTypeLike = None,
prop_names: Optional[Sequence[str]] = None,
**kwargs: Any) -> h5py.Dataset:
r"""Construct a new dataset for holding a user-defined molecular property.
Examples
--------
In the example below a new dataset is created for storing
solvation energies in water, methanol and ethanol.
.. testsetup:: python
>>> import os
>>> from shutil import copyfile
>>> from dataCAT.testing_utils import HDF5_READ, HDF5_TMP as hdf5_file
>>> if os.path.isfile(hdf5_file):
... os.remove(hdf5_file)
>>> _ = copyfile(HDF5_READ, hdf5_file)
>>> with h5py.File(hdf5_file, 'r+') as f:
... scale = f.create_dataset('index', data=np.arange(10))
... scale.make_scale('index')
... _ = create_prop_group(f, scale=scale)
.. code:: python
>>> import h5py
>>> from dataCAT import create_prop_dset
>>> hdf5_file = str(...) # doctest: +SKIP
>>> with h5py.File(hdf5_file, 'r+') as f:
... group = f['properties']
... prop_names = ['water', 'methanol', 'ethanol']
...
... dset = create_prop_dset(group, 'E_solv', prop_names=prop_names)
... dset_names = group['E_solv_names']
...
... print('group', '=', group)
... print('group["E_solv"]', '=', dset)
... print('group["E_solv_names"]', '=', dset_names)
group = <HDF5 group "/properties" (2 members)>
group["E_solv"] = <HDF5 dataset "E_solv": shape (10, 3), type "<f4">
group["E_solv_names"] = <HDF5 dataset "E_solv_names": shape (3,), type "|S8">
.. testcleanup:: python
>>> import os
>>> if os.path.isfile(hdf5_file):
... os.remove(hdf5_file)
Parameters
----------
group : :class:`h5py.Group`
The ``"properties"`` group where the new dataset will be created.
name : :class:`str`
The name of the new dataset.
prop_names : :class:`Sequence[str]<typing.Sequence>`, optional
The names of each row in the to-be created dataset.
Used for defining the length of the second axis and
will be used as a dimensional scale for aforementioned axis.
If :data:`None`, create a 1D dataset (with no columns) instead.
dtype : dtype-like
The data type of the to-be created dataset.
\**kwargs : :data:`~Any`
Further keyword arguments for the h5py :meth:`~h5py.Group.create_dataset` method.
Returns
-------
:class:`h5py.Dataset`
The newly created dataset.
"""
scale_name = f'{name}_names'
index_ref = group.attrs['index']
index = group.file[index_ref]
index_name = index.name.rsplit('/', 1)[-1]
n = len(index)
# If no prop_names are specified
if prop_names is None:
dset = group.create_dataset(name, shape=(n,), maxshape=(None,), dtype=dtype, **kwargs)
dset.dims[0].label = index_name
dset.dims[0].attach_scale(index)
return dset
# Parse the names
name_array = np.asarray(prop_names, dtype=np.string_)
if name_array.ndim != 1:
raise ValueError("'prop_names' expected None or a 1D array-like object; "
f"observed dimensionality: {name_array.ndim!r}")
# Construct the new datasets
m = len(name_array)
dset = group.create_dataset(
name,
shape=(n, m),
maxshape=(None, m),
dtype=dtype,
fillvalue=(_null_value(dtype) if dtype != object else None),
**kwargs
)
scale = group.create_dataset(scale_name, data=name_array, shape=(m,), dtype=name_array.dtype)
scale.make_scale(scale_name)
# Set the dimensional scale
dset.dims[0].label = index_name
dset.dims[0].attach_scale(index)
dset.dims[1].label = scale_name
dset.dims[1].attach_scale(scale)
return dset
def _null_value(dtype_like: DTypeLike) -> np.generic:
dtype = np.dtype(dtype_like)
generic: type[np.generic] = dtype.type
if issubclass(generic, (np.number, np.bool_)): # Numerical scalars
return generic(False)
elif not issubclass(generic, np.void): # Strings, bytes & datetime64
return generic('')
# Structured dtypes
values = (v[0] for v in dtype.fields.values()) # type: ignore[union-attr]
data = tuple(_null_value(field_dtype) for field_dtype in values)
return np.array(data, dtype=dtype).take(0) # type: ignore[no-any-return]
def _resize_prop_dset(dset: h5py.Dataset) -> None:
"""Ensure that **dset** is as long as its dimensional scale."""
scale = dset.dims[0][0]
n = len(scale)
if n > len(dset):
dset.resize(n, axis=0)
[docs]def update_prop_dset(dset: h5py.Dataset, data: np.ndarray,
index: Union[None, slice, np.ndarray] = None) -> None:
"""Update **dset** at position **index** with **data**.
Parameters
----------
dset : :class:`h5py.Dataset`
The to-be updated h5py dataset.
data : :class:`numpy.ndarray`
An array containing the to-be added data.
index : :class:`slice` or :class:`numpy.ndarray`, optional
The indices of all to-be updated elements in **dset**.
**index** either should be of the same length as **data**.
:rtype: :data:`None`
"""
idx = slice(None) if index is None else index
try:
_resize_prop_dset(dset)
dset[idx] = data
except Exception:
validate_prop_group(dset.parent)
raise
[docs]def validate_prop_group(group: h5py.Group) -> None:
"""Validate the passed hdf5 **group**, ensuring it is compatible with :func:`create_prop_group` and :func:`create_prop_group`.
This method is called automatically when an exception is raised by :func:`update_prop_dset`.
Parameters
----------
group : :class:`h5py.Group`
The to-be validated hdf5 Group.
Raises
------
:exc:`AssertionError`
Raised if the validation process fails.
""" # noqa: E501
assertion.isinstance(group, h5py.Group)
idx_ref = group.attrs['index']
idx = group.file[idx_ref]
iterator = ((k, v) for k, v in group.items() if not k.endswith('_names'))
for name, dset in iterator:
assertion.le(len(dset), len(idx), message=f'{name!r} invalid dataset length')
assertion.contains(dset.dims[0].keys(), 'index', message=f'{name!r} missing dataset scale')
assertion.eq(dset.dims[0]['index'], idx, message=f'{name!r} invalid dataset scale')
[docs]def prop_to_dataframe(dset: h5py.Dataset, dtype: DTypeLike = None) -> pd.DataFrame:
"""Convert the passed property Dataset into a DataFrame.
Examples
--------
.. testsetup:: python
>>> from dataCAT.testing_utils import HDF5_READ as hdf5_file
.. code:: python
>>> import h5py
>>> from dataCAT import prop_to_dataframe
>>> hdf5_file = str(...) # doctest: +SKIP
>>> with h5py.File(hdf5_file, 'r') as f:
... dset = f['ligand/properties/E_solv']
... df = prop_to_dataframe(dset)
... print(df) # doctest: +NORMALIZE_WHITESPACE
E_solv_names water methanol ethanol
ligand ligand anchor
O=C=O O1 -0.918837 -0.151129 -0.177396
O3 -0.221182 -0.261591 -0.712906
CCCO O4 -0.314799 -0.784353 -0.190898
Parameters
----------
dset : :class:`h5py.Dataset`
The property-containing Dataset of interest.
dtype : dtype-like, optional
The data type of the to-be returned DataFrame.
Use :data:`None` to default to the data type of **dset**.
Returns
-------
:class:`pandas.DataFrame`
A DataFrame constructed from the passed **dset**.
""" # noqa: E501
# Construct the index
dim0 = dset.dims[0]
scale0 = dim0[0]
index = index_to_pandas(scale0)
# Construct the columns
if dset.ndim == 1:
full_name = dset.name
name = full_name.rsplit('/', 1)[-1]
columns = pd.Index([name])
else:
dim1 = dset.dims[1]
scale1 = dim1[0]
columns = pd.Index(scale1[:].astype(str), name=dim1.label)
# Create and return the dataframe
if dtype is None:
return pd.DataFrame(dset[:], index=index, columns=columns)
# If possible, let h5py handle the datatype conversion
# This will often fail when dset.dtype consists of variable-length bytes-strings
try:
return pd.DataFrame(dset.astype(dtype)[:], index=index, columns=columns)
except (ValueError, TypeError):
return pd.DataFrame(dset[:].astype(dtype), index=index, columns=columns)
[docs]def index_to_pandas(dset: h5py.Dataset, fields: None | Sequence[str] = None) -> pd.MultiIndex:
"""Construct an MultiIndex from the passed ``index`` dataset.
Examples
--------
.. testsetup:: python
>>> from dataCAT.testing_utils import HDF5_READ as filename
.. code:: python
>>> from dataCAT import index_to_pandas
>>> import h5py
>>> filename = str(...) # doctest: +SKIP
# Convert the entire dataset
>>> with h5py.File(filename, "r") as f:
... dset: h5py.Dataset = f["ligand"]["index"]
... index_to_pandas(dset)
MultiIndex([('O=C=O', 'O1'),
('O=C=O', 'O3'),
( 'CCCO', 'O4')],
names=['ligand', 'ligand anchor'])
# Convert a subset of fields
>>> with h5py.File(filename, "r") as f:
... dset = f["ligand"]["index"]
... index_to_pandas(dset, fields=["ligand"])
MultiIndex([('O=C=O',),
('O=C=O',),
( 'CCCO',)],
names=['ligand'])
Parameters
----------
dset : :class:`h5py.Dataset`
The relevant ``index`` dataset.
fields : :class:`Sequence[str]<collections.abc.Sequence>`
The names of the ``index`` fields that are to-be included in the
returned MultiIndex. If :data:`None`, include all fields.
Returns
-------
:class:`pandas.MultiIndex`
A multi-index constructed from the passed dataset.
"""
# Fast-path for non-void-based datasets
if dset.dtype.fields is None:
if h5py.check_string_dtype(dset.dtype):
ar = dset[:].astype(str)
elif h5py.check_vlen_dtype(dset.dtype):
ar = _vlen_to_tuples(dset[:])
else:
ar = dset[:]
return pd.MultiIndex.from_arrays([ar])
# Parse the `fields` parameter
if fields is None:
field_names = list(dset.dtype.fields.keys())
iterator = ((name, f_dtype) for name, (f_dtype, *_) in dset.dtype.fields.items())
else:
field_names = list(fields)
iterator = ((name, dset.dtype.fields[name][0]) for name in fields)
if len(field_names) == 0:
raise ValueError("At least one field is required")
fields_lst = []
index_ar = dset[:]
for name, field_dtype in iterator:
# It's a bytes-string; decode it
if h5py.check_string_dtype(field_dtype):
ar = index_ar[name].astype(str)
# It's a h5py `vlen` dtype; convert it into a list of tuples
elif h5py.check_vlen_dtype(field_dtype):
ar = _vlen_to_tuples(index_ar[name])
else:
ar = index_ar[name]
fields_lst.append(ar)
return pd.MultiIndex.from_arrays(fields_lst, names=field_names)
def _vlen_to_tuples(array: np.ndarray) -> np.ndarray:
"""Convert an (object) array consisting of arrays into an (object) array of tuples."""
cache: Dict[bytes, tuple] = {}
ret = np.empty_like(array, dtype=object)
for i, ar in enumerate(array):
byte = ar.tobytes()
try:
tup = cache[byte]
except KeyError:
cache[byte] = tup = tuple(ar)
ret[i] = tup
return ret