Source code for dataCAT.database

"""
dataCAT.database
================

A module which holds the :class:`.Database` class.

Index
-----
.. currentmodule:: dataCAT.database
.. autosummary::
    Database

API
---
.. autoclass:: Database
    :members:
    :private-members:
    :special-members:

"""

from os import getcwd
from time import sleep
from typing import (Optional, Sequence, List, Union, Any, Dict)
from itertools import count
from collections.abc import Container

import h5py
import numpy as np
import pandas as pd
from pymongo import MongoClient
from pymongo.errors import (ServerSelectionTimeoutError, DuplicateKeyError)

from rdkit.Chem import Mol
from scm.plams import (Settings, Molecule)

from CAT.logger import logger
from CAT.mol_utils import from_rdmol

from .create_database import (_create_csv, _create_yaml, _create_hdf5, _create_mongodb)
from .context_managers import (MetaManager, OpenYaml, OpenLig, OpenQD)
from .database_functions import (
    df_to_mongo_dict, even_index, from_pdb_array, sanitize_yaml_settings, as_pdb_array
)

__all__ = ['Database']

# Union of immutable objects suitable as dictionary keys
Immutable = Union[int, float, str, frozenset, tuple]

# Aliases for pd.MultiIndex columns
HDF5_INDEX = ('hdf5 index', '')
OPT = ('opt', '')
MOL = ('mol', '')


[docs]class Database(Container): """The Database class. .. _pymongo.MongoClient: http://api.mongodb.com/python/current/api/pymongo/mongo_client.html Parameters ---------- path : str The path+directory name of the directory which is to contain all database components (see :attr:`Database.dirname`). host : str Hostname or IP address or Unix domain socket path of a single mongod or mongos instance to connect to, or a mongodb URI, or a list of hostnames mongodb URIs. If **host** is an IPv6 literal it must be enclosed in ``"["`` and ``"]"`` characters following the RFC2732 URL syntax (e.g. ``"[::1]"`` for localhost). Multihomed and round robin DNS addresses are not supported. See :attr:`Database.mongodb`. port : str port number on which to connect. See :attr:`Database.mongodb`. **kwargs Optional keyword argument for `pymongo.MongoClient <http://api.mongodb.com/python/current/api/pymongo/mongo_client.html>`_. See :attr:`Database.mongodb`. Attributes ---------- dirname : str The path+filename of the directory containing all database components. csv_lig : |dataCAT.MetaManager|_ A dataclass for accesing the context manager for opening the .csv file containing all ligand related results. csv_qd : |dataCAT.MetaManager|_ A dataclass for accesing the context manager for opening the .csv file containing all quantum dot related results. yaml : |dataCAT.MetaManager|_ A dataclass for accesing the context manager for opening the .yaml file containing all job settings. hdf5 : |dataCAT.MetaManager|_ A dataclass for accesing the context manager for opening the .hdf5 file containing all structures (as partiallize de-serialized .pdb files). mongodb : dict Optional: A dictionary with keyword arguments for pymongo.MongoClient_. Defaults to ``None`` if a :exc:`ServerSelectionTimeoutError` is raised when failing to contact the host. See the **host**, **port** and **kwargs** parameter. """ # noqa def __init__(self, path: Optional[str] = None, host: str = 'localhost', port: int = 27017, **kwargs) -> None: """Initialize :class:`Database`.""" self.dirname = path or getcwd() # Create the database components and return the filename lig_path = _create_csv(self.dirname, database='ligand') qd_path = _create_csv(self.dirname, database='QD') yaml_path = _create_yaml(self.dirname) hdf5_path = _create_hdf5(self.dirname) # Populate attributes with MetaManager instances self.csv_lig = MetaManager(lig_path, OpenLig) self.csv_qd = MetaManager(qd_path, OpenQD) self.yaml = MetaManager(yaml_path, OpenYaml) self.hdf5 = MetaManager(hdf5_path, h5py.File) # Try to create or access the mongodb database try: self.mongodb = _create_mongodb(host, port, **kwargs) except ServerSelectionTimeoutError: self.mongodb = None def __str__(self) -> str: """Return a human string representation of this instance.""" def _dict_to_str(value: dict) -> str: iterator = sorted(value.items(), key=str) return '{' + newline.join(f'{repr(k)}: {repr(v)}' for k, v in iterator) + '}' def _get_str(key: str, value: Any) -> str: func = _dict_to_str if isinstance(value, dict) else repr return f' {key:{offset}} = {func(value)}' offset = max(len(k) for k in vars(self)) newline = ',\n' + ' ' * (6 + offset) ret = ',\n'.join(_get_str(k, v) for k, v in vars(self).items()) return f'Database(\n{ret}\n)' def __repr__(self) -> str: """Return a machine string representation of this instance.""" return self.__str__() def __eq__(self, value: Any) -> bool: """Check if this instance is equivalent to **value**.""" return vars(self) == vars(value) def __contains__(self, value: Any) -> bool: """Check if **value** is in this instance.""" return value in vars(self) """ ################################# Updating the database ############################## """ def _parse_database(self, database: str) -> MetaManager: """Operate on either the ligand or quantum dot database.""" if database in ('ligand', 'ligand_no_opt'): return self.csv_lig elif database in ('QD', 'QD_no_opt'): return self.csv_qd else: raise ValueError(f"database={database}; accepted values for are 'ligand' and 'QD'")
[docs] def update_mongodb(self, database: Union[str, Dict[str, pd.DataFrame]] = 'ligand', overwrite: bool = False) -> None: """Export ligand or qd results to the MongoDB database. Examples -------- .. code:: python >>> from CAT import Database >>> db = Database(**kwargs) # Update from db.csv_lig >>> db.update_mongodb('ligand') # Update from a lig_df, a user-provided DataFrame >>> db.update_mongodb({'ligand': lig_df}) >>> print(type(lig_df)) <class 'pandas.core.frame.DataFrame'> Parameters ---------- database : |str|_ or |dict|_ [|str|_, |pd.DataFrame|_] The type of database. Accepted values are ``"ligand"`` and ``"QD"``, opening :attr:`Database.csv_lig` and :attr:`Database.csv_qd`, respectivelly. Alternativelly, a dictionary with the database name and a matching DataFrame can be passed directly. overwrite : bool Whether or not previous entries can be overwritten or not. """ if self.mongodb is None: raise ValueError('Database.Mongodb is None') # Open the MongoDB database client = MongoClient(**self.mongodb) mongo_db = client.cat_database if isinstance(database, dict): database, db = next(iter(database.items())) dict_gen = df_to_mongo_dict(db) idx_keys = db.index.names collection = mongo_db.ligand_database if database == 'ligand' else mongo_db.qd_database else: # Operate on either the ligand or quantum dot database if database == 'ligand': idx_keys = ('smiles', 'anchor') collection = mongo_db.ligand_database manager = self.csv_lig elif database == 'QD': idx_keys = ('core', 'core anchor', 'ligand smiles', 'ligand anchor') collection = mongo_db.qd_database manager = self.csv_lig # Parse the ligand or qd dataframe with manager.open(write=False) as db: dict_gen = df_to_mongo_dict(db) # Update the collection # Try to insert al keys at once try: collection.insert_many(dict_gen) except DuplicateKeyError: pass else: return # Simultaneous insertion failed, resort to plan B for item in dict_gen: try: collection.insert_one(item) except DuplicateKeyError: # An item is already present in the collection if overwrite: filter_ = {i: item[i] for i in idx_keys} collection.replace_one(filter_, item)
[docs] def update_csv(self, df: pd.DataFrame, database: str = 'ligand', columns: Optional[Sequence] = None, overwrite: bool = False, job_recipe: Optional[Settings] = None, opt: bool = False) -> None: """Update :attr:`Database.csv_lig` or :attr:`Database.csv_qd` with new settings. Parameters ---------- df : |pd.DataFrame|_ A dataframe of new (potential) database entries. database : str The type of database; accepted values are ``"ligand"`` (:attr:`Database.csv_lig`) and ``"QD"`` (:attr:`Database.csv_qd`). columns : |Sequence|_ Optional: A list of column keys in **df** which (potentially) are to be added to this instance. If ``None``: Add all columns. overwrite : |bool|_ Whether or not previous entries can be overwritten or not. job_recipe : |plams.Settings|_ Optional: A :class:`.Settings` instance with settings specific to a job. opt : |bool|_ WiP. """ # Operate on either the ligand or quantum dot database manager = self._parse_database(database) # Update **self.yaml** if job_recipe is not None: job_settings = self.update_yaml(job_recipe) for key, value in job_settings.items(): df[('settings', key)] = value with manager.open(write=True) as db: # Update **db.index** db.df = even_index(db.df, df) # Filter columns if not columns: df_columns = df.columns else: df_columns = pd.Index(columns) # Update **db.columns** bool_ar = df_columns.isin(db.columns) for i in df_columns[~bool_ar]: if 'job_settings' in i[0]: self._update_hdf5_settings(df, i[0]) del df[i] idx = columns.index(i) columns.pop(idx) continue try: db[i] = np.array((None), dtype=df[i].dtype) except TypeError: # e.g. if csv[i] consists of the datatype np.int64 db[i] = -1 # Update **self.hdf5**; returns a new series of indices hdf5_series = self.update_hdf5(df, database=database, overwrite=overwrite, opt=opt) # Update **db.values** db.update(df[columns], overwrite=overwrite) db.update(hdf5_series, overwrite=True) if opt: db.update(df[OPT], overwrite=True)
[docs] def update_yaml(self, job_recipe: Settings) -> dict: """Update :attr:`Database.yaml` with (potentially) new user provided settings. Parameters ---------- job_recipe : |plams.Settings|_ A settings object with one or more settings specific to a job. Returns ------- |dict|_ A dictionary with the column names as keys and the key for :attr:`Database.yaml` as matching values. """ ret = {} with self.yaml.open() as db: for item in job_recipe: # Unpack and sanitize keys key = job_recipe[item].key if isinstance(key, type): key = key.__name__ # Unpack and sanitize values value = job_recipe[item].value if isinstance(value, dict): value = sanitize_yaml_settings(value, key) # Check if the appropiate key is available in **self.yaml** if key not in db: db[key] = [] # Check if the appropiate value is available in **self.yaml** if value in db[key]: ret[item] = '{} {:d}'.format(key, db[key].index(value)) else: db[key].append(value) ret[item] = '{} {:d}'.format(key, len(db[key]) - 1) return ret
[docs] def update_hdf5(self, df: pd.DataFrame, database: str = 'ligand', overwrite: bool = False, opt: bool = False): """Export molecules (see the ``"mol"`` column in **df**) to the structure database. Returns a series with the :attr:`Database.hdf5` indices of all new entries. Parameters ---------- df : |pd.DataFrame|_ A dataframe of new (potential) database entries. database : str The type of database; accepted values are ``"ligand"`` and ``"QD"``. overwrite : bool Whether or not previous entries can be overwritten or not. Returns ------- |pd.Series|_ A series with the indices of all new molecules in :attr:`Database.hdf5`. """ # Identify new and preexisting entries if opt: new = df[HDF5_INDEX][df[OPT] == False] # noqa old = df[HDF5_INDEX][df[OPT] == True] # noqa else: new = df[HDF5_INDEX][df[HDF5_INDEX] == -1] old = df[HDF5_INDEX][df[HDF5_INDEX] >= 0] # Add new entries to the database self.hdf5_availability() with self.hdf5.open('r+') as f: i, j = f[database].shape if new.any(): pdb_array = as_pdb_array(df[MOL][new.index], min_size=j) # Reshape and update **self.hdf5** k = i + pdb_array.shape[0] f[database].shape = k, pdb_array.shape[1] f[database][i:k] = pdb_array ret = pd.Series(np.arange(i, k), index=new.index, name=HDF5_INDEX) df.update(ret, overwrite=True) if opt: df.loc[new.index, OPT] = True else: ret = pd.Series(name=HDF5_INDEX, dtype=int) # If **overwrite** is *True* if overwrite and old.any(): ar = as_pdb_array(df[MOL][old.index], min_size=j) # Ensure that the hdf5 indices are sorted # import pdb; pdb.set_trace() idx = np.argsort(old) old = old[idx] f[database][old] = ar[idx] if opt: df.loc[idx.index, OPT] = True return ret
def _update_hdf5_settings(self, df: pd.DataFrame, column: str) -> None: """Export all files in **df[column]** to hdf5 dataset **column**.""" # Add new entries to the database self.hdf5_availability() with self.hdf5.open('r+') as f: i, j, k = f[column].shape # Create a 3D array of input files try: job_ar = self._read_inp(df[column], j, k) except ValueError: # df[column] consists of empty lists, abort return None # Reshape **self.hdf5** k = max(i, 1 + int(df[HDF5_INDEX].max())) f[column].shape = k, job_ar.shape[1], job_ar.shape[2] # Update the hdf5 dataset idx = df[HDF5_INDEX].astype(int, copy=False) idx_argsort = np.argsort(idx) f[column][idx[idx_argsort]] = job_ar[idx_argsort] return None @staticmethod def _read_inp(job_paths: Sequence[str], ax2: int = 0, ax3: int = 0) -> np.ndarray: """Convert all files in **job_paths** (nested sequence of filenames) into a 3D array.""" # Determine the minimum size of the to-be returned 3D array line_count = [[Database._get_line_count(j) for j in i] for i in job_paths] ax1 = len(line_count) ax2 = max(ax2, max(len(i) for i in line_count)) ax3 = max(ax3, max(j for i in line_count for j in i)) # Create and return a padded 3D array of strings ret = np.zeros((ax1, ax2, ax3), dtype='S120') for i, list1, list2 in zip(count(), line_count, job_paths): for j, k, filename in zip(count(), list1, list2): ret[i, j, :k] = np.loadtxt(filename, dtype='S120', comments=None, delimiter='\n') return ret @staticmethod def _get_line_count(filename: str) -> int: """Return the total number of lines in **filename**.""" substract = 0 with open(filename, 'r') as f: for i, j in enumerate(f, 1): if j == '\n': substract += 1 return i - substract """ ######################## Pulling results from the database ########################### """
[docs] def from_csv(self, df: pd.DataFrame, database: str = 'ligand', get_mol: bool = True, inplace: bool = True) -> Optional[pd.Series]: """Pull results from :attr:`Database.csv_lig` or :attr:`Database.csv_qd`. Performs in inplace update of **df** if **inplace** = ``True``, thus returing ``None``. Parameters ---------- df : |pd.DataFrame|_ A dataframe of new (potential) database entries. database : str The type of database; accepted values are ``"ligand"`` and ``"QD"``. get_mol : bool Attempt to pull preexisting molecules from the database. See the **inplace** argument for more details. inplace : bool If ``True`` perform an inplace update of the ``"mol"`` column in **df**. Otherwise return a new series of PLAMS molecules. Returns ------- |pd.Series|_ [|plams.Molecule|_] Optional: A Series of PLAMS molecules if **get_mol** = ``True`` and **inplace** = ``False``. """ # Operate on either the ligand or quantum dot database manager = self._parse_database(database) # Update the *hdf5 index* column in **df** with manager.open(write=False) as db: df.update(db.df, overwrite=True) df[HDF5_INDEX] = df[HDF5_INDEX].astype(int, copy=False) # **df** has been updated and **get_mol** = *False* if not get_mol: return None return self._get_csv_mol(df, database, inplace)
def _get_csv_mol(self, df: pd.DataFrame, database: str = 'ligand', inplace: bool = True) -> Optional[pd.Series]: """A method which handles the retrieval and subsequent formatting of molecules. Called internally by :meth:`.Database.from_csv`. Parameters ---------- df : |pd.DataFrame|_ A dataframe of new (potential) database entries. database : str The type of database; accepted values are ``"ligand"`` and ``"QD"``. inplace : bool If ``True`` perform an inplace update of the ``("mol", "")`` column in **df**. Otherwise return a new series of PLAMS molecules. Returns ------- |pd.Series|_ [|plams.Molecule|_] Optional: A Series of PLAMS molecules if **inplace** is ``False``. """ # Sort and find all valid HDF5 indices df.sort_values(by=[HDF5_INDEX], inplace=True) if 'no_opt' in database: df_slice = df[HDF5_INDEX] >= 0 else: df_slice = df[OPT] == True # noqa idx = df[HDF5_INDEX][df_slice].values # If no HDF5 indices are availble in **df** then abort the function if not df_slice.any(): if inplace: return None return pd.Series(None, name=MOL, dtype=object) # Update **df** with preexisting molecules from **self**, returning *None* if inplace: rdmol_list = self.from_hdf5(idx, database=database) for mol, rdmol in zip(df.loc[df_slice, MOL], rdmol_list): mol.from_rdmol(rdmol) ret = None # Create and return a new series of PLAMS molecules else: mol_list = self.from_hdf5(idx, database=database, rdmol=False) ret = pd.Series(mol_list, index=df[df_slice].index, name=MOL) return ret
[docs] def from_hdf5(self, index: Sequence[int], database: str = 'ligand', rdmol: bool = True) -> List[Union[Molecule, Mol]]: """Import structures from the hdf5 database as RDKit or PLAMS molecules. Parameters ---------- index : |list|_ [|int|_] The indices of the to be retrieved structures. database : str The type of database; accepted values are ``"ligand"`` and ``"QD"``. rdmol : bool If ``True``, return an RDKit molecule instead of a PLAMS molecule. close : bool If the database component (:attr:`Database.hdf5`) should be closed afterwards. Returns ------- |list|_ [|plams.Molecule|_ or |rdkit.Chem.Mol|_] A list of PLAMS or RDKit molecules. """ # Convert **index** to an array if it is a series or dataframe if hasattr(index, '__array__'): index = np.asarray(index).tolist() # Open the database and pull entries self.hdf5_availability() with self.hdf5.open('r') as f: pdb_array = f[database][index] # Return a list of RDKit or PLAMS molecules return [from_pdb_array(mol, rdmol=rdmol) for mol in pdb_array]
[docs] def hdf5_availability(self, timeout: float = 5.0, max_attempts: Optional[int] = None) -> None: """Check if a .hdf5 file is opened by another process; return once it is not. If two processes attempt to simultaneously open a single hdf5 file then h5py will raise an :class:`OSError`. The purpose of this method is ensure that a .hdf5 file is actually closed, thus allowing the :meth:`Database.from_hdf5` method to safely access **filename** without the risk of raising an :class:`OSError`. Parameters ---------- filename : str The path+filename of the hdf5 file. timeout : float Time timeout, in seconds, between subsequent attempts of opening **filename**. max_attempts : int Optional: The maximum number attempts for opening **filename**. If the maximum number of attempts is exceeded, raise an ``OSError``. Raises ------ OSError Raised if **max_attempts** is exceded. """ err = (f"h5py.File('{self.hdf5.filename}') is currently unavailable; " f"repeating attempt in {timeout:1.1f} seconds") i = max_attempts or np.inf while i: try: with self.hdf5.open('r+'): return None # the .hdf5 file can safely be opened except OSError as ex: # the .hdf5 file cannot be safely opened yet logger.warning(err) exception = ex sleep(timeout) i -= 1 raise exception