"""
dataCAT.database_functions
==========================
A module for holding functions related to the :class:`.Database` class.
Index
-----
.. currentmodule:: dataCAT.database_functions
.. autosummary::
mol_to_file
_get_unflattend
df_to_mongo_dict
get_nan_row
as_pdb_array
from_pdb_array
sanitize_yaml_settings
even_index
API
---
.. autofunction:: mol_to_file
.. autofunction:: _get_unflattend
.. autofunction:: df_to_mongo_dict
.. autofunction:: get_nan_row
.. autofunction:: as_pdb_array
.. autofunction:: from_pdb_array
.. autofunction:: sanitize_yaml_settings
.. autofunction:: even_index
"""
import reprlib
from typing import (Collection, Union, Sequence, Tuple, List, Generator, Dict, Any)
import numpy as np
import pandas as pd
from scm.plams import (Molecule, Settings)
import scm.plams.interfaces.molecule.rdkit as molkit
from rdkit import Chem
from rdkit.Chem import Mol
from CAT.utils import get_template
__all__ = ['df_to_mongo_dict']
Immutable = Union[str, int, float, frozenset, tuple] # Immutable objects
def df_to_mongo_dict(df: pd.DataFrame,
as_gen: bool = True) -> Union[Generator, list]:
"""Convert a dataframe into a generator of dictionaries suitable for a MongoDB_ databases.
Tuple-keys present in **df** (*i.e.* pd.MultiIndex) are expanded into nested dictionaries.
.. _MongoDB: https://www.mongodb.com/
Examples
--------
.. code:: python
>>> print(df)
index E_solv
sub index Acetone Acetonitrile
smiles anchor
C[O-] O2 -56.6 -57.9
CC[O-] O3 -56.5 -57.6
CCC[O-] O4 -57.1 -58.2
>>> gen = df_to_mongo_dict(df)
>>> print(type(gen))
<class 'generator'>
>>> for item in gen:
>>> print(item)
{'E_solv': {'Acetone': -56.6, 'Acetonitrile': -57.9}, 'smiles': 'C[O-]', 'anchor': 'O2'}
{'E_solv': {'Acetone': -56.5, 'Acetonitrile': -57.6}, 'smiles': 'CC[O-]', 'anchor': 'O3'}
{'E_solv': {'Acetone': -57.1, 'Acetonitrile': -58.2}, 'smiles': 'CCC[O-]', 'anchor': 'O4'}
Parameters
----------
df : |pd.DataFrame|_
A Pandas DataFrame whose axis and columns are instance of pd.MultiIndex.
as_gen : bool
If ``True``, return a generator of dictionaries rather than a list of dictionaries.
Returns
-------
|Generator|_ [|dict|_] or |list|_ [|dict|_]
A generator or list of nested dictionaries construced from **df**.
Each row in **df** is converted into a single dictionary.
The to-be returned dictionaries are updated with a dictionary containing their respective
(multi-)index in **df**.
"""
def _get_dict(idx: Sequence[Immutable],
row: pd.Series,
idx_names: Sequence[Immutable]) -> dict:
ret = {i: row[i].to_dict() for i in row.index.levels[0]} # Add values
ret.update(dict(zip(idx_names, idx))) # Add index
return ret
if not (isinstance(df.index, pd.MultiIndex) and isinstance(df.columns, pd.MultiIndex)):
raise TypeError("DataFrame.index and DataFrame.columns should be "
"instances of pandas.MultiIndex")
idx_names = df.index.names
if as_gen:
return (_get_dict(idx, row, idx_names) for idx, row in df.iterrows())
return [_get_dict(idx, row, idx_names) for idx, row in df.iterrows()]
#: A dictionary with NumPy dtypes as keys and matching ``None``-esque items as values
DTYPE_DICT: Dict[np.dtype, Any] = {
np.dtype('int'): -1,
np.dtype('float'): np.nan,
np.dtype('O'): None,
np.dtype('bool'): False
}
def get_nan_row(df: pd.DataFrame) -> list:
"""Return a list of None-esque objects for each column in **df**.
The object in question depends on the data type of the column.
Will default to ``None`` if a specific data type is not recognized
* |np.int64|_: ``-1``
* |np.float64|_: ``np.nan``
* |object|_: ``None``
* |bool|_: ``False``
Parameters
----------
df : |pd.DataFrame|_
A dataframe.
Returns
-------
|list|_ [|int|_, |float|_, |bool|_ and/or |None|_]
A list of none-esque objects, one for each column in **df**.
"""
return [(DTYPE_DICT[v.dtype] if v.dtype in DTYPE_DICT else None) for _, v in df.items()]
[docs]def as_pdb_array(mol_list: Collection[Molecule],
min_size: int = 0) -> np.ndarray:
"""Convert a list of PLAMS molecule into an array of (partially) de-serialized .pdb files.
Parameters
----------
mol_list: :math:`m` |list|_ [|plams.Molecule|_]
A list of :math:`m` PLAMS molecules.
min_size : int
The minimumum length of the pdb_array.
The array is padded with empty strings if required.
Returns
-------
:math:`m*n` |np.ndarray|_ [|np.bytes|_ *|S80*]
An array with :math:`m` partially deserialized .pdb files with up to :math:`n` lines each.
"""
def _get_value(mol: Molecule) -> Tuple[List[str], int]:
"""Return a partially deserialized .pdb file and the length of aforementioned file."""
ret = Chem.MolToPDBBlock(molkit.to_rdmol(mol)).splitlines()
return ret, len(ret)
pdb_list, shape_list = zip(*[_get_value(mol) for mol in mol_list])
# Construct, fill and return the pdb array
shape = len(mol_list), max(min_size, max(shape_list))
ret = np.zeros(shape, dtype='S80')
for i, item in enumerate(pdb_list):
ret[i][:len(item)] = item
return ret
[docs]def from_pdb_array(array: np.ndarray,
rdmol: bool = True) -> Union[Molecule, Mol]:
"""Convert an array with a (partially) de-serialized .pdb file into a molecule.
Parameters
----------
array : :math:`n` |np.ndarray|_ [|np.bytes|_ / S80]
A (partially) de-serialized .pdb file with :math:`n` lines.
rdmol : |bool|_
If ``True``, return an RDKit molecule instead of a PLAMS molecule.
Returns
-------
|plams.Molecule|_ or |rdkit.Chem.Mol|_
A PLAMS or RDKit molecule build from **array**.
"""
pdb_str = ''.join([item.decode() + '\n' for item in array if item])
ret = Chem.MolFromPDBBlock(pdb_str, removeHs=False, proximityBonding=False)
if not rdmol:
return molkit.from_rdmol(ret)
return ret
[docs]def sanitize_yaml_settings(settings: Settings,
job_type: str) -> Settings:
"""Remove a predetermined set of unwanted keys and values from a settings object.
Parameters
----------
settings : |plams.Settings|_
A settings instance with, potentially, undesired keys and values.
job_type: |str|_
The name of key in the settings blacklist.
Returns
-------
|plams.Settings|_
A new Settings instance with all unwanted keys and values removed.
Raises
------
KeyError
Raised if **jobtype** is not found in .../CAT/data/templates/settings_blacklist.yaml.
"""
# Prepare a blacklist of specific keys
blacklist = get_template('settings_blacklist.yaml')
if job_type not in blacklist:
return settings.copy()
settings_del = blacklist['generic']
settings_del.update(blacklist[job_type])
# Recursivelly delete all keys from **s** if aforementioned keys are present in the s_del
ret = settings.copy()
del_nested(settings, ret, settings_del)
return ret
def del_nested(s_ref: Settings, s_ret: dict, del_item: dict) -> None:
"""Remove all keys in **del_item** from **collection**: a (nested) dictionary and/or list."""
empty = Settings()
iterator = s_ref.items() if isinstance(s_ref, dict) else enumerate(s_ref)
for key, value in iterator:
if key in del_item:
value_del = del_item[key]
if isinstance(value_del, (dict, list)):
del_nested(value, s_ret[key], value_del)
else:
del s_ret[key]
if value == empty: # An empty (leftover) Settings instance: delete it
del s_ret[key]
def even_index(df1: pd.DataFrame,
df2: pd.DataFrame) -> pd.DataFrame:
"""Ensure that ``df2.index`` is a subset of ``df1.index``.
Parameters
----------
df1 : |pd.DataFrame|_
A DataFrame whose index is to-be a superset of ``df2.index``.
df2 : |pd.DataFrame|_
A DataFrame whose index is to-be a subset of ``df1.index``.
Returns
-------
|pd.DataFrame|_
A new (sorted) dataframe containing all unique elements of ``df1.index`` and ``df2.index``.
Returns **df1** if ``df2.index`` is already a subset of ``df1.index``
"""
# Figure out if ``df1.index`` is a subset of ``df2.index``
bool_ar = df2.index.isin(df1.index)
if bool_ar.all():
return df1
# Make ``df1.index`` a subset of ``df2.index``
nan_row = get_nan_row(df1)
idx = df2.index[~bool_ar]
df_tmp = pd.DataFrame(len(idx) * [nan_row], index=idx, columns=df1.columns)
return df1.append(df_tmp, sort=True)