from __future__ import annotations

import re
import os
import sys
import copy
import types
import hashlib
import operator
import subprocess
import tempfile
import warnings
import contextlib
import functools
import multiprocessing
from typing import Any, ContextManager, cast, overload, TYPE_CHECKING, TypeVar
from pathlib import Path
from itertools import chain, repeat
from import Iterable, Mapping, Callable, Iterator, Sequence, Hashable

import numpy as np
import pandas as pd
from more_itertools import chunked
from qmflows import InitRestart
from scm.plams import CRSJob, CRSResults, Settings, KFFile
from rdkit.Chem import CanonSmiles
from CAT.utils import get_template

    if sys.version_info >= (3, 8):
        from typing import Literal, TypedDict, SupportsIndex
        from typing_extensions import Literal, TypedDict, SupportsIndex

    _SCT = TypeVar("_SCT", bound=np.generic)
    _NDArray = np.ndarray[Any, np.dtype[_SCT]]

    class _LogOptions(TypedDict, total=False):
        """Verbosity of log messages: 0:none  1:minimal  3:normal  5:verbose  7:extremely talkative."""  # noqa: E501

        #: Verbosity of the log printed to .log file in the main working folder
        file: Literal[0, 1, 3, 5, 7]

        #: Verbosity of the log printed to the standard output
        stdout: Literal[0, 1, 3, 5, 7]

        #: Print time for each log event
        time: bool

        #: Print date for each log event
        date: bool

__all__ = [

LOGP_SETTINGS = get_template('qd.yaml')['COSMO-RS logp']
LOGP_SETTINGS.runscript.nproc = 1
LOGP_SETTINGS.update(get_template('crs.yaml')['ADF combi2005']) = 6.766

GAMMA_E_SETTINGS = get_template('qd.yaml')['COSMO-RS activity coefficient']
GAMMA_E_SETTINGS.runscript.nproc = 1
GAMMA_E_SETTINGS.update(get_template('crs.yaml')['ADF combi2005'])
del GAMMA_E_SETTINGS.input.compound[0]

SOL_SETTINGS = copy.deepcopy(GAMMA_E_SETTINGS) = "puresolubility"
SOL_SETTINGS.input.temperature = "298.15 298.15 1"
SOL_SETTINGS.input.pressure = "1.01325"
SOL_SETTINGS.input.compound = [Settings({"_h": None, "_1": "compkffile"})]

BP_SETTINGS = copy.deepcopy(GAMMA_E_SETTINGS) = "pureboilingpoint" = "Pure"
BP_SETTINGS.input.temperature = "298.15"
BP_SETTINGS.input.pressure = "1.01325 1.01325 1"
del BP_SETTINGS.input.compound

# The default PLAMS `config.log` options
LOG_DEFAULT: _LogOptions = types.MappingProxyType({    # type: ignore[assignment]
    "file": 5,
    "stdout": 3,
    "time": True,
    "date": False,

[docs]def get_compkf( smiles: str, directory: None | str | os.PathLike[str] = None, name: None | str = None, ) -> str: """Estimate the sigma profile of a SMILES string using the COSMO-RS fast-sigma method. See the COSMO-RS `docs <>`_ for more details. Parameters ---------- smiles : :class:`str` The SMILES string of the molecule of interest. directory : :class:`str`, optional The directory wherein the resulting ``.compkf`` file should be stored. If :data:`None`, use the current working directory. name : :class:`str` The name of the to-be created .compkf file (excluding extensions). If :data:`None`, use **smiles**. Returns ------- :class:`str`, optional The absolute path to the created ``.compkf`` file. :data:`None` will be returned if an error is raised by AMS. """ # noqa: E501 filename = smiles if name is None else name if directory is None: directory = os.getcwd() kf_file = os.path.join(directory, f'{filename}.compkf') command = f'"$AMSBIN"/fast_sigma --smiles "{smiles}" -o "{kf_file}"' _run(command, smiles, err_msg="Failed to compute the sigma profile of {!r}") return kf_file
def get_fast_sigma_properties( smiles: str, directory: None | str | os.PathLike[str] = None, name: None | str = None, ) -> None: """Calculate various pure-compound properties with the COSMO-RS property prediction program. See the COSMO-RS `docs <>`_ for more details. Parameters ---------- smiles : :class:`str` The SMILES string of the molecule of interest. directory : :class:`str`, optional The directory wherein the resulting ``.compkf`` file should be stored. If :data:`None`, use the current working directory. name : :class:`str` The name of the to-be created .compkf file (excluding extensions). If :data:`None`, use **smiles**. def _run(command: str, smiles: str, err_msg: str) -> None | subprocess.CompletedProcess[str]:
    """Run **command** and return the the status."""
    status = None
    try:
        status =, shell=True, check=True, text=True, capture_output=True)
        stderr = status.stderr.strip()
        stdout = status.stdout.strip()
        if stderr:
            raise RuntimeError(stderr)
        elif "WARNING" in stdout:
            raise RuntimeError(stdout)
    except (RuntimeError, subprocess.SubprocessError) as ex:
        warn = RuntimeWarning(err_msg.format(smiles))
        warn.__cause__ = ex
        warnings.warn(warn, stacklevel=1)
    return status


def _hash_smiles(smiles: str) -> str:
    """Return the sha256 hash of the passed SMILES string."""
    return hashlib.sha256(smiles.encode()).hexdigest()


def _get_compkf(
    smiles_iter: Iterable[str],
    directory: str | os.PathLike[str],
) -> list[str]:
    """Wrap :func:`get_compkf` in a for-loop."""
    return [get_compkf(smiles, directory, name=_hash_smiles(smiles)) for smiles in smiles_iter]


def _get_fast_sigma_properties(
    smiles_iter: Iterable[str],
    directory: str | os.PathLike[str],
) -> None:
    """Wrap :func:`get_fast_sigma_properties` in a for-loop."""
    for smiles in smiles_iter:
        get_fast_sigma_properties(smiles, directory, name=_hash_smiles(smiles))


def _set_properties(
    df: pd.DataFrame,
    solutes: list[str],
    solvents: Mapping[str, str],
) -> None:
    df["LogP", None] = _get_logp(solutes)
    for name, solv in solvents.items():
        df[[
            ("Activity Coefficient", name),
            ("Solvation Energy", name),
        ]] = _get_gamma_e(solutes, solv, name)
    prop_array = _get_compkf_prop(solutes)
    iterator = ((k, prop_array[k]) for k in prop_array.dtype.fields)
    for k, v in iterator:
        df[k, None] = v


def _get_compkf_prop(solutes: list[str]) -> _NDArray[np.void]:
    """Extract all (potentially) interesting properties from the compkf file."""
    prop_iter: list[tuple[str, Any, str, type | str]] = [
        ("Compound Data", "", "Formula", "U160"),
        ("Compound Data", np.nan, "Molar Mass", np.float64),
        ("Compound Data", 0, "Nring", np.int64),
        ("PROPPREDICTION", np.nan, "boilingpoint", np.float64),
        ("PROPPREDICTION", np.nan, "criticalpressure", np.float64),
        ("PROPPREDICTION", np.nan, "criticaltemp", np.float64),
        ("PROPPREDICTION", np.nan, "criticalvol", np.float64),
        ("PROPPREDICTION", np.nan, "density", np.float64),
        ("PROPPREDICTION", np.nan, "dielectricconstant", np.float64),
        ("PROPPREDICTION", np.nan, "entropygas", np.float64),
        ("PROPPREDICTION", np.nan, "flashpoint", np.float64),
        ("PROPPREDICTION", np.nan, "gidealgas", np.float64),
        ("PROPPREDICTION", np.nan, "hcombust", np.float64),
        ("PROPPREDICTION", np.nan, "hformstd", np.float64),
        ("PROPPREDICTION", np.nan, "hfusion", np.float64),
        ("PROPPREDICTION", np.nan, "hidealgas", np.float64),
        ("PROPPREDICTION", np.nan, "hsublimation", np.float64),
        ("PROPPREDICTION", np.nan, "meltingpoint", np.float64),
        ("PROPPREDICTION", np.nan, "molarvol", np.float64),
        ("PROPPREDICTION", np.nan, "parachor", np.float64),
        ("PROPPREDICTION", np.nan, "solubilityparam", np.float64),
        ("PROPPREDICTION", np.nan, "tpt", np.float64),
        ("PROPPREDICTION", np.nan, "vdwarea", np.float64),
        ("PROPPREDICTION", np.nan, "vdwvol", np.float64),
        ("PROPPREDICTION", np.nan, "vaporpressure", np.float64),
    ]
    dtype = np.dtype([i[2:] for i in prop_iter])
    fill_value = np.array(tuple(fill for _, fill, _, field_dtype in prop_iter), dtype=dtype)
    ret = np.full(len(solutes), fill_value, dtype=dtype)
    iterator = ((i, KFFile(f), f) for i, f in enumerate(solutes))
    for i, kf, file in iterator: if kf.reader is None:
            warn = RuntimeWarning(f"No such file or directory: {file!r}")
            continue
        for section, _, variable, _ in prop_iter:
            try:
                ret[variable][i] =, variable)
            except Exception as ex:
                smiles ="Compound Data", "SMILES").strip("\x00")
                warn = RuntimeWarning(
                    f'Failed to extract the "{section}%{variable}" property of {smiles!r}'
                )
                warn.__cause__ = ex
                warnings.warn(warn)
    return ret


def _get_logp(solutes: list[str]) -> _NDArray[np.float64]:
    """Perform a LogP calculation."""
    s = copy.deepcopy(LOGP_SETTINGS)
    for v in s.input.compound[:2]:
        v._h = v._h.format(os.environ["AMSRESOURCES"])
    s.input.compound += [Settings({"_h": f'"{sol}"', "_1": "compkffile"}) for sol in solutes]
    return _run_crs(
        s,
        len(solutes),
        logp=lambda r: r.readkf('LOGP', 'logp')[2:],
    )


def _get_gamma_e(
    solutes: list[str],
    solvent: str,
    solvent_name: str,
) -> _NDArray[np.float64]:
    """Perform an activity coefficient and solvation energy calculation."""
    s = copy.deepcopy(GAMMA_E_SETTINGS)
    s.input.compound[0]._h = f'"{solvent}"'
    s.input.compound += [Settings({"_h": f'"{sol}"', "_1": "compkffile"}) for sol in solutes]
    return _run_crs(
        s,
        len(solutes),
        solvent_name,
        activity_coefficient=lambda r: r.readkf('ACTIVITYCOEF', 'gamma')[1:],
        solvation_energy=lambda r: r.readkf('ACTIVITYCOEF', 'deltag')[1:],
    )


def _run_crs(
    settings: Settings,
    count: int,
    solvent: None | str = None,
    **callbacks: Callable[[CRSResults], float | Sequence[float]],
) -> _NDArray[np.float64]:
    """Perform all COSMO-RS calculations."""
    job = CRSJob(settings=settings)
    results =
    ret = np.full((len(callbacks), count), np.nan, dtype=np.float64)
    if job.status in ('failed', 'crashed'):
        return ret.T if ret.shape[0] != 1 else np.squeeze(ret, 0)

    for i, (prop, callback) in enumerate(callbacks.items()):
        try:
            value = callback(results)
        except Exception as ex:
            msg = f"Failed to extract the {prop!r} property"
            if solvent is not None:
                msg += f" in {solvent!r}"
            warn = RuntimeWarning(msg)
            warn.__cause__ = ex
            warnings.warn(warn)
        else:
            ret[i] = value
    return ret.T if ret.shape[0] != 1 else np.squeeze(ret, 0)


def _abspath(path: str | bytes | os.PathLike[Any], isfile: bool = False) -> str:
    """Path sanitizing."""
    ret = os.path.abspath(os.path.expandvars(os.fsdecode(path)))
    if isfile and not os.path.isfile(ret):
        open(ret, "r") return ret


def _inner_loop(
    args: tuple[int, pd.Index],
    columns: pd.MultiIndex,
    output_dir: Path,
    ams_dir: None | str,
    solvents: Mapping[str, str],
    log: _LogOptions = LOG_DEFAULT,
) -> tuple[int, pd.DataFrame]:
    """Perform the inner loop of :func:`run_fast_sigma`."""
    i, index = args
    if not len(index):
        df = pd.DataFrame(index=index, columns=columns)
        df.sort_index(axis=1, inplace=True)
        return i, df

    # Skip if a .csv file already exists
    df_filename = output_dir / f"{i}.temp.csv"
    if os.path.isfile(df_filename):
        df = read_csv(df_filename)
        return i, df

    # Parse the ams directory
    if ams_dir is None:
        ams_dir_cm: ContextManager[str] = tempfile.TemporaryDirectory(dir=output_dir)
    else:
        ams_dir_cm = contextlib.nullcontext(ams_dir)

    # Calculate properties for the given chunk
    df = pd.DataFrame(index=index, columns=columns)
    with ams_dir_cm as workdir, InitRestart(*os.path.split(workdir)):
        from scm.plams import config
        config.log.update(log)
        config.job.pickle = False

        compkf_list = _get_compkf(index, workdir)
        _get_fast_sigma_properties(index, workdir)
        _set_properties(df, compkf_list, solvents)

    df.sort_index(axis=1, inplace=True)
    df.to_csv(df_filename)
    return i, df


@overload
def run_fast_sigma(
    input_smiles: Iterable[str],
    solvents: Mapping[str, str | bytes | os.PathLike[Any]],
    *,
    output_dir: str | bytes | os.PathLike[Any] = ...,
    ams_dir: None | str | bytes | os.PathLike[Any] = ...,
    chunk_size: int = ...,
    processes: None | int = ...,
    return_df: Literal[False] = ...,
    log_options: _LogOptions = ...,
) -> None:
    ...


@overload noqa: E302 def run_fast_sigma( input_smiles: Iterable[str], solvents: Mapping[str, str | bytes | os.PathLike[Any]], *, output_dir: str | bytes | os.PathLike[Any] = ..., ams_dir: None | str | bytes | os.PathLike[Any] = ..., chunk_size: int = ..., processes: None | int = ..., return_df: Literal[True], log_options: _LogOptions = ..., ) -> pd.DataFrame: ...
[docs]def run_fast_sigma( # noqa: E302 input_smiles: Iterable[str], solvents: Mapping[str, str | bytes | os.PathLike[Any]], *, output_dir: str | bytes | os.PathLike[Any] = "crs", ams_dir: None | str | bytes | os.PathLike[Any] = None, chunk_size: int = 100, processes: None | int = None, return_df: bool = False, log_options: _LogOptions = LOG_DEFAULT, ) -> None | pd.DataFrame: """Perform (fast-sigma) COSMO-RS property calculations on the passed SMILES and solvents. The output is exported to the ``cosmo-rs.csv`` file. Includes the following properties: * LogP * Activety Coefficient * Solvation Energy * Formula * Molar Mass * Nring * boilingpoint * criticalpressure * criticaltemp * criticalvol * density * dielectricconstant * entropygas * flashpoint * gidealgas * hcombust * hformstd * hfusion * hidealgas * hsublimation * meltingpoint * molarvol * parachor * solubilityparam * tpt * vdwarea * vdwvol * vaporpressure Jobs are performed in parallel, with chunks of a given size being distributed to a user-specified number of processes and subsequently cashed. After all COSMO-RS calculations have been performed, the temporary .csv files are concatenated into ``cosmo-rs.csv``. Examples -------- .. code-block:: python >>> import os >>> import pandas as pd >>> from import run_fast_sigma >>> output_dir: str = ... >>> smiles_list = ["CO[H]", "CCO[H]", "CCCO[H]"] >>> solvent_dict = { ... "water": "$AMSRESOURCES/ADFCRS/Water.coskf", ... "octanol": "$AMSRESOURCES/ADFCRS/1-Octanol.coskf", ... } >>> run_fast_sigma(smiles_list, solvent_dict, output_dir=output_dir) >>> csv_file = os.path.join(output_dir, "cosmo-rs.csv") >>> pd.read_csv(csv_file, header=[0, 1], index_col=0) property Activity Coefficient ... Solvation Energy solvent octanol water ... octanol water smiles ... CO[H] 1.045891 4.954782 ... -2.977354 -3.274420 CCO[H] 0.980956 12.735228 ... -4.184214 -3.883986 CCCO[H] 0.905952 47.502557 ... -4.907177 -3.779867 [3 rows x 8 columns] Parameters ---------- input_smiles : :class:`Iterable[str] <>` The input SMILES strings. solvents : :class:`Mapping[str, path-like] <>` A mapping with solvent-names as keys and paths to their respective .coskf files as values. Keyword Arguments ----------------- output_dir : :term:`path-like object` The directory wherein the .csv files will be stored. A new directory will be created if it does not yet exist. plams_dir : :term:`path-like <path-like object>`, optional The directory wherein all COSMO-RS computations will be performed. If :data:`None`, use a temporary directory inside **output_dir**. chunk_size : :class:`int` The (maximum) number of entries to-be stored in a single .csv file. processes : :class:`int`, optional The number of worker processes to use. If :data:`None`, use the number returned by :func:`os.cpu_count()`. return_df : :class:`bool` If :data:`True`, return a dataframe with the content of ``cosmo-rs.csv``. log_options : :class:`Mapping[str, Any] <>` Alternative settings for :data:`plams.config.log`. See the `PLAMS documentation <>`_ for more details. """ # noqa: E501 # Validation `log_options` log_options = dict(log_options) # type: ignore[assignment] illegal_keys = log_options.keys() - {"file", "stdout", "time", "date"} if illegal_keys: key_str = ", ".join(repr(i) for i in sorted(illegal_keys)) raise KeyError(f"Invalid `log_options` keys: {key_str}") # Parse the `chunk_size` chunk_size = operator.index(chunk_size) if chunk_size < 1: raise ValueError(f"`chunk_size` must be larger than zero; observed value: {chunk_size}") # Parse `processes` if processes is not None: processes = operator.index(processes) if processes < 1: raise ValueError(f"`processes` must be larger than zero; observed value {processes}") # Parse `output_dir` output_dir = Path(_abspath(output_dir)) if not os.path.isdir(output_dir): os.makedirs(output_dir) # Parse `ams_dir` if ams_dir is not None: ams_dir = _abspath(ams_dir) # Parse the `solvents` if len(solvents) == 0: raise ValueError("`solvents` requires at least one solvent") solvents = cast("dict[str, str]", {k: _abspath(v, True) for k, v in solvents.items()}) # Construct the dataframe columns prop_names = ["Activity Coefficient", "Solvation Energy"] _columns: list[tuple[str, None | str]] = [ ("LogP", None), ("Formula", None), ("Molar Mass", None), ("Nring", None), ('boilingpoint', None), ('criticalpressure', None), ('criticaltemp', None), ('criticalvol', None), ('density', None), ('dielectricconstant', None), ('entropygas', None), ('flashpoint', None), ('gidealgas', None), ('hcombust', None), ('hformstd', None), ('hfusion', None), ('hidealgas', None), ('hsublimation', None), ('meltingpoint', None), ('molarvol', None), ('parachor', None), ('solubilityparam', None), ('tpt', None), ('vdwarea', None), ('vdwvol', None), ('vaporpressure', None), ] for solv in solvents: _columns += [(prop, solv) for prop in prop_names] columns = pd.MultiIndex.from_tuples(_columns, names=["property", "solvent"]) # Run the workflow with multiprocessing.Pool(processes) as pool: enumerator = enumerate( pd.Index(lst, name="smiles") for lst in chunked(input_smiles, chunk_size) ) func = functools.partial( _inner_loop, columns=columns, output_dir=output_dir, solvents=solvents, ams_dir=ams_dir, log=log_options, ) if not return_df: ret = None for _ in pool.imap_unordered(func, enumerator): pass else: df_idx_list = [i_df for i_df in pool.imap_unordered(func, enumerator)] df_idx_list.sort(key=lambda i_df: i_df[0]) ret = pd.concat(df for _, df in df_idx_list) _concatenate_csv(output_dir) return ret
def _concatenate_csv(output_dir: Path) -> None: """Concatenate all ``{i}.tmp.csv`` files into ``cosmo-rs.csv``.""" pattern = re.compile(r"[0-9]+\.temp\.csv") csv_files = [output_dir / i for i in os.listdir(output_dir) if pattern.fullmatch(i) is not None] csv_files.sort(key=lambda n: int(".", 1)[0])) if not len(csv_files): raise FileNotFoundError(f"Failed to identify any files with the {pattern.pattern!r} " f"pattern in {str(output_dir)!r}") # Construct the final .csv file output_csv = output_dir / "cosmo-rs.csv" if not os.path.isfile(output_csv): header_iter: Iterator[bool] = chain([True], repeat(False)) else: header_iter = repeat(False) # Append its content using that of all other .csv files with open(output_csv, "a") as f: for file, header in zip(csv_files, header_iter): df = read_csv(file) df.to_csv(f, header=header) os.remove(file) def _read_columns(file: str | bytes | os.PathLike[Any], **kwargs: Any) -> pd.MultiIndex: """Extract the dataframe columns from the passed .csv files.""" kwargs["nrows"] = 0 df = pd.read_csv(file, header=[0, 1], index_col=0, **kwargs) return pd.MultiIndex.from_tuples( [(i, (j if j != "nan" else None)) for i, j in df.columns], names=df.columns.names, ) #: Invalid keyword arguments for :func:`read_csv`. _INVALID_KWARGS = frozenset({ "filepath_or_buffer", "index_col", "header", "names", "usecols", })
[docs]def read_csv( file: str | bytes | os.PathLike[Any], *, columns: None | Any = None, **kwargs: Any, ) -> pd.DataFrame: r"""Read the passed .csv file as produced by :func:`run_fast_sigma`. Examples -------- .. code-block:: python >>> from import read_csv >>> file: str = ... >>> columns1 = ["molarvol", "gidealgas", "Activity Coefficient"] >>> read_csv(file, usecols=columns1) property molarvol gidealgas Activity Coefficient solvent NaN NaN octanol water smiles CCCO[H] 0.905952 47.502557 -153.788589 0.078152 CCO[H] 0.980956 12.735228 -161.094955 0.061220 CO[H] 1.045891 4.954782 NaN NaN >>> columns2 = [("Solvation Energy", "water")] >>> read_csv(file, usecols=columns2) property Solvation Energy solvent water smiles CCCO[H] -3.779867 CCO[H] -3.883986 CO[H] -3.274420 Parameters ---------- file : :term:`path-like object` The name of the to-be opened .csv file. columns : key or sequence of keys, optional The to-be read columns. Note that any passed value must be a valid dataframe (multiindex) key. \**kwargs : :data:`~typing.Any` Further keyword arguments for :func:`pd.read_csv <pandas.read_csv>`. See Also -------- :class:`pd.read_csv <pandas.read_csv>` Read a comma-separated values (csv) file into DataFrame. """ # Validate ``kwargs if not _INVALID_KWARGS.isdisjoint(kwargs.keys()): keys = sorted(_INVALID_KWARGS.intersection(kwargs.keys())) raise TypeError(f"Invalid or duplicate keys: {keys}") columns_superset = _read_columns(file, **kwargs) ref = pd.read_csv(file, index_col=0, skiprows=2, **kwargs) ref.columns = columns_superset if columns is None: df = pd.read_csv(file, index_col=0, skiprows=2, **kwargs) df.columns = columns_superset else: columns_series = pd.Series(np.arange(1, 1 + len(columns_superset)), index=columns_superset) columns_idx = np.append(0, columns_series.loc[columns]) columns_idx2 = columns_idx[1:] - 1 argsort = np.argsort(columns_idx2) df = pd.read_csv(file, usecols=columns_idx, index_col=0, skiprows=2, **kwargs) df.sort_index( axis=1, inplace=True, key=lambda i: i.str.strip("Unnamed: ").astype(np.int64)[argsort], ) df.columns = columns_superset[columns_idx2] formula = ("Formula", None) if formula in df.columns: df.loc[df[formula].isnull(), formula] = "" return df
def _canonicalize_smiles(smiles: str) -> None | str: """Attempt to canonicalize a **smiles** string.""" try: return CanonSmiles(smiles) except Exception as ex: warn = RuntimeWarning(f"Failed to canonicalize {smiles!r}") warn.__cause__ = ex warnings.warn(warn) return None
[docs]def sanitize_smiles_df( df: pd.DataFrame, column_levels: SupportsIndex = 2, column_padding: Hashable = None, ) -> pd.DataFrame: """Sanitize the passed dataframe, canonicalizing the SMILES in its index, converting the columns into a multiIndex and removing duplicate entries. Examples -------- .. code-block:: python >>> import pandas as pd >>> from import sanitize_smiles_df >>> df: pd.DataFrame = ... >>> print(df) a smiles CCCO[H] 1 CCO[H] 2 CO[H] 3 >>> sanitize_smiles_df(df) a NaN smiles CCCO 1 CCO 2 CO 3 Parameters ---------- df : :class:`pd.DataFrame <pandas.DataFrame>` The dataframe in question. The dataframes' index should consist of smiles strings. column_levels : :class:`int` The number of multiindex column levels that should be in the to-be returned dataframe. column_padding : :class:`` The object used as padding for the multiindex levels (where appropiate). Returns ------- :class:`pd.DataFrame <pandas.DataFrame>` The newly sanitized dataframe. Returns either the initially passed dataframe or a copy thereof. """ # noqa: E501 # Sanitize `arguments` column_levels = operator.index(column_levels) if column_levels < 1: raise ValueError("`column_levels` must be larger than or equal to 1") elif isinstance(df.columns, pd.MultiIndex) and len(df.columns.levels) > column_levels: raise ValueError("`column_levels` must be larger than or equal to number " "of MultiIndex levels in `df`") elif not isinstance(column_padding, Hashable): raise TypeError("`column_padding` expected a hashable object") # Sanitize the index index = pd.Index( [_canonicalize_smiles(i) for i in df.index], dtype=df.index.dtype,, ) # Create or pad a MultiIndex padding = (column_levels - 1) * (column_padding,) if not isinstance(df.columns, pd.MultiIndex): columns = pd.MultiIndex.from_tuples( [(i, *padding) for i in df.columns], names=(, *padding) ) elif len(df.columns.levels) < column_levels: columns = pd.MultiIndex.from_tuples( [(*j, *padding) for j in df.columns], names=(*df.columns.names, *padding) ) else: columns = df.columns.copy() mask = ~df.index.duplicated(keep='first') & (df.index != None) ret = df[mask] ret.index = index[mask] ret.columns = columns return ret