import re
from contextlib import suppress
from pathlib import Path
from typing import Optional, Sequence, Union
from upath import UPath
try:
import modin.pandas as pd
except ModuleNotFoundError:
import pandas as pd
import anndata
import pyarrow.parquet
[docs]def read_h5ad(path, include_columns=None, backed=None):
"""Read an annData object stored in a .h5ad file.
Parameters
----------
path: Union[Path, str]
Path to the .h5ad file
include_columns: Optional[Sequence[str]] = None
List of column names and/or regex expressions, used to only include the
desired columns in the resulting dataframe.
backed: Optional[str] = None
Can be (either "r" or "r+").
See anndata's docs for details:
https://anndata.readthedocs.io/en/latest/generated/anndata.read_h5ad.html#anndata.read_h5ad
Returns
-------
annData
"""
if backed:
assert backed in ("r", "r+")
dataframe = anndata.read_hda5(path, backed=backed)
if include_columns is not None:
columns = []
for filter_ in include_columns:
columns += filter_columns(dataframe.obs.columns.tolist(), regex=filter_)
dataframe.obs = dataframe.obs[columns]
return dataframe
[docs]def read_parquet(path, include_columns=None):
"""Read a dataframe stored in a .parquet file, and optionally include only the columns given by
`include_columns`
Parameters
----------
path: Union[Path, UPath, str]
Path to the .parquet file
include_columns: Optional[Sequence[str]] = None
List of column names and/or regex expressions, used to only include the
desired columns in the resulting dataframe.
Returns
-------
dataframe: pd.DataFrame
"""
if include_columns is not None:
schema = pyarrow.parquet.read_schema(path, memory_map=True)
columns = []
for filter_ in include_columns:
columns += filter_columns(schema.names, regex=filter_)
else:
columns = None
return pd.read_parquet(path, columns=columns)
[docs]def read_csv(path, include_columns=None):
"""Read a dataframe stored in a .csv file, and optionally include only the columns given by
`include_columns`
Parameters
----------
path: Union[Path, UPath, str]
Path to the .csv file
include_columns: Optional[Sequence[str]] = None
List of column names and/or regex expressions, used to only include the
desired columns in the resulting dataframe.
Returns
-------
dataframe: pd.DataFrame
"""
dataframe = pd.read_csv(path)
if include_columns is not None:
columns = []
for filter_ in include_columns:
columns += filter_columns(dataframe.columns.tolist(), regex=filter_)
dataframe = dataframe[columns]
return dataframe
[docs]def read_dataframe(
dataframe: Union[Path, UPath, str, pd.DataFrame],
required_columns: Optional[Sequence[str]] = None,
include_columns: Optional[Sequence[str]] = None,
) -> pd.DataFrame:
"""Load a dataframe from a .csv or .parquet file, or assert a given pd.DataFrame contains the
expected required columns.
Parameters
----------
dataframe: Union[Path, UPath, str, pd.DataFrame]
Either the path to the dataframe to be loaded, or a pd.DataFrame. Supported
file types are .csv and .parquet
required_columns: Optional[Sequence[str]] = None
List of columns that the dataframe must contain. If these aren't found,
a ValueError is thrown
include_columns: Optional[Sequence[str]] = None
List of column names and/or regex expressions, used to only include the
desired columns in the resulting dataframe. If `required_columns` is not
None, those get appended to `include_columns` (without duplication).
Returns
-------
dataframe: pd.DataFrame
"""
required_columns = set(required_columns) if required_columns else set()
include_columns = set(include_columns) if include_columns else set()
include_columns = include_columns | required_columns
include_columns = sorted(list(include_columns))
required_columns = sorted(list(required_columns))
if not include_columns:
include_columns = None
if isinstance(dataframe, str):
dataframe = UPath(dataframe)
if isinstance(dataframe, (UPath, Path)):
with suppress((NotImplementedError, FileNotFoundError)):
dataframe = dataframe.expanduser().resolve(strict=True)
if not dataframe.is_file():
raise FileNotFoundError("Manifest file not found at given path")
if dataframe.suffix == ".csv":
dataframe = read_csv(dataframe, include_columns)
elif dataframe.suffix == ".parquet":
dataframe = read_parquet(dataframe, include_columns)
elif dataframe.suffix == ".h5ad":
dataframe = read_h5ad(dataframe, include_columns)
else:
raise TypeError("File type of provided manifest is not in [.csv, .parquet, .h5ad]")
elif isinstance(dataframe, pd.DataFrame):
if include_columns is not None:
columns = []
for filter_ in include_columns:
columns += filter_columns(dataframe.columns, regex=filter_)
dataframe = dataframe[columns]
elif isinstance(dataframe, anndata.AnnData):
if include_columns is not None:
columns = []
for filter_ in include_columns:
columns += filter_columns(dataframe.obs.columns, regex=filter_)
dataframe.obs = dataframe.obs[columns]
else:
raise TypeError(
f"`dataframe` must be either a pd.DataFrame or a path to "
f"a file to load one. You passed {type(dataframe)}"
)
if isinstance(dataframe, anndata.AnnData):
# Make dataframe out of anndata object
X = dataframe.X.toarray()
X = pd.DataFrame(X, columns=[f"X_{i}" for i in range(X.shape[1])]).reset_index(drop=True)
index = pd.DataFrame(dataframe.obs_names)
dataframe = dataframe.obs.reset_index(drop=True)
dataframe = pd.concat([X, dataframe], axis=1)
dataframe = pd.concat([index, dataframe], axis=1)
if required_columns is not None:
missing_columns = set(required_columns) - set(dataframe.columns)
if missing_columns:
raise ValueError(
f"Some or all of the required columns were not "
f"found on the given dataframe:\n{missing_columns}"
)
return dataframe
[docs]def filter_columns(
columns_to_filter: Sequence[str],
regex: Optional[str] = None,
startswith: Optional[str] = None,
endswith: Optional[str] = None,
contains: Optional[str] = None,
excludes: Optional[str] = None,
) -> Sequence[str]:
"""Filter a list of columns, using a combination of different queries, or a `regex` pattern. If
`regex` is supplied it takes precedence and the remaining arguments are ignored. Otherwise, the
logical AND of the supplied filters is applied, i.e. the columns that respect all of the
supplied conditions are returned.
Parameters
----------
columns_to_filter: Sequence[str]
List of columns to filter
regex: Optional[str] = None
A string containing a regular expression to be matched
startswith: Optional[str] = None
A substring the matching columns must start with
endswith: Optional[str] = None
A substring the matching columns must end with
contains: Optional[str] = None
A substring the matching columns must contain
excludes: Optional[str] = None
A substring the matching columns must not contain
"""
if regex is not None:
return [col for col in columns_to_filter if re.match(regex, col)]
keep = [True] * len(columns_to_filter)
for i in range(len(columns_to_filter)):
if startswith is not None:
keep[i] &= str(columns_to_filter[i]).startswith(startswith)
if endswith is not None:
keep[i] &= str(columns_to_filter[i]).endswith(endswith)
if contains is not None:
keep[i] &= contains in str(columns_to_filter[i])
if excludes is not None:
keep[i] &= excludes not in str(columns_to_filter[i])
return [col for col, keep_column in zip(columns_to_filter, keep) if keep_column]