from __future__ import annotations
import bz2
import gzip
from collections import OrderedDict
from os import PathLike, fspath
from pathlib import Path
from types import MappingProxyType
from typing import TYPE_CHECKING
from warnings import warn
import h5py
import numpy as np
import pandas as pd
from scipy import sparse
from .. import AnnData
from ..compat import _deprecate_positional_args
from .utils import is_float
if TYPE_CHECKING:
from collections.abc import Generator, Iterable, Iterator, Mapping
try:
from .zarr import read_zarr
except ImportError as _e:
e = _e
def read_zarr(*_, **__):
raise e
[docs]
def read_csv(
filename: PathLike | Iterator[str],
delimiter: str | None = ",",
first_column_names: bool | None = None,
dtype: str = "float32",
) -> AnnData:
"""\
Read `.csv` file.
Same as :func:`~anndata.read_text` but with default delimiter `','`.
Parameters
----------
filename
Data file.
delimiter
Delimiter that separates data within text file.
If `None`, will split at arbitrary number of white spaces,
which is different from enforcing splitting at single white space `' '`.
first_column_names
Assume the first column stores row names.
dtype
Numpy data type.
"""
return read_text(filename, delimiter, first_column_names, dtype)
def read_excel(filename: PathLike, sheet: str | int, dtype: str = "float32") -> AnnData:
"""\
Read `.xlsx` (Excel) file.
Assumes that the first columns stores the row names and the first row the
column names.
Parameters
----------
filename
File name to read from.
sheet
Name of sheet in Excel file.
"""
# rely on pandas for reading an excel file
from pandas import read_excel
df = read_excel(fspath(filename), sheet)
X = df.values[:, 1:]
row = dict(row_names=df.iloc[:, 0].values.astype(str))
col = dict(col_names=np.array(df.columns[1:], dtype=str))
return AnnData(X, row, col)
def read_umi_tools(filename: PathLike, dtype=None) -> AnnData:
"""\
Read a gzipped condensed count matrix from umi_tools.
Parameters
----------
filename
File name to read from.
"""
# import pandas for conversion of a dict of dicts into a matrix
# import gzip to read a gzipped file :-)
table = pd.read_table(filename, dtype={"gene": "category", "cell": "category"})
X = sparse.csr_matrix(
(table["count"], (table["cell"].cat.codes, table["gene"].cat.codes)),
dtype=dtype,
)
obs = pd.DataFrame(index=pd.Index(table["cell"].cat.categories, name="cell"))
var = pd.DataFrame(index=pd.Index(table["gene"].cat.categories, name="gene"))
return AnnData(X=X, obs=obs, var=var)
def read_hdf(filename: PathLike, key: str) -> AnnData:
"""\
Read `.h5` (hdf5) file.
Note: Also looks for fields `row_names` and `col_names`.
Parameters
----------
filename
Filename of data file.
key
Name of dataset in the file.
"""
with h5py.File(filename, "r") as f:
# the following is necessary in Python 3, because only
# a view and not a list is returned
keys = [k for k in f.keys()]
if key == "":
raise ValueError(
f"The file {filename} stores the following sheets:\n{keys}\n"
f"Call read/read_hdf5 with one of them."
)
# read array
X = f[key][()]
# try to find row and column names
rows_cols = [{}, {}]
for iname, name in enumerate(["row_names", "col_names"]):
if name in keys:
rows_cols[iname][name] = f[name][()]
adata = AnnData(X, rows_cols[0], rows_cols[1])
return adata
def _fmt_loom_axis_attrs(
input: Mapping, idx_name: str, dimm_mapping: Mapping[str, Iterable[str]]
) -> tuple[pd.DataFrame, Mapping[str, np.ndarray]]:
axis_df = pd.DataFrame()
axis_mapping = {}
for key, names in dimm_mapping.items():
axis_mapping[key] = np.array([input.pop(name) for name in names]).T
for k, v in input.items():
if v.ndim > 1 and v.shape[1] > 1:
axis_mapping[k] = v
else:
axis_df[k] = v
if idx_name in axis_df:
axis_df.set_index(idx_name, drop=True, inplace=True)
return axis_df, axis_mapping
@_deprecate_positional_args(version="0.9")
def read_loom(
filename: PathLike,
*,
sparse: bool = True,
cleanup: bool = False,
X_name: str = "spliced",
obs_names: str = "CellID",
obsm_names: Mapping[str, Iterable[str]] | None = None,
var_names: str = "Gene",
varm_names: Mapping[str, Iterable[str]] | None = None,
dtype: str = "float32",
obsm_mapping: Mapping[str, Iterable[str]] = MappingProxyType({}),
varm_mapping: Mapping[str, Iterable[str]] = MappingProxyType({}),
**kwargs,
) -> AnnData:
"""\
Read `.loom`-formatted hdf5 file.
This reads the whole file into memory.
Beware that you have to explicitly state when you want to read the file as
sparse data.
Parameters
----------
filename
The filename.
sparse
Whether to read the data matrix as sparse.
cleanup
Whether to collapse all obs/var fields that only store
one unique value into `.uns['loom-.']`.
X_name
Loompy key with which the data matrix :attr:`~anndata.AnnData.X` is initialized.
obs_names
Loompy key where the observation/cell names are stored.
obsm_mapping
Loompy keys which will be constructed into observation matrices
var_names
Loompy key where the variable/gene names are stored.
varm_mapping
Loompy keys which will be constructed into variable matrices
**kwargs:
Arguments to loompy.connect
Example
-------
.. code:: python
pbmc = anndata.read_loom(
"pbmc.loom",
sparse=True,
X_name="lognorm",
obs_names="cell_names",
var_names="gene_names",
obsm_mapping={
"X_umap": ["umap_1", "umap_2"]
}
)
"""
# Deprecations
if obsm_names is not None:
warn(
"Argument obsm_names has been deprecated in favour of `obsm_mapping`. "
"In 0.9 this will be an error.",
FutureWarning,
)
if obsm_mapping != {}:
raise ValueError(
"Received values for both `obsm_names` and `obsm_mapping`. This is "
"ambiguous, only pass `obsm_mapping`."
)
obsm_mapping = obsm_names
if varm_names is not None:
warn(
"Argument varm_names has been deprecated in favour of `varm_mapping`. "
"In 0.9 this will be an error.",
FutureWarning,
)
if varm_mapping != {}:
raise ValueError(
"Received values for both `varm_names` and `varm_mapping`. This is "
"ambiguous, only pass `varm_mapping`."
)
varm_mapping = varm_names
filename = fspath(filename) # allow passing pathlib.Path objects
from loompy import connect
with connect(filename, "r", **kwargs) as lc:
if X_name not in lc.layers.keys():
X_name = ""
X = lc.layers[X_name].sparse().T.tocsr() if sparse else lc.layers[X_name][()].T
X = X.astype(dtype, copy=False)
layers = OrderedDict()
if X_name != "":
layers["matrix"] = (
lc.layers[""].sparse().T.tocsr() if sparse else lc.layers[""][()].T
)
for key in lc.layers.keys():
if key != "":
layers[key] = (
lc.layers[key].sparse().T.tocsr()
if sparse
else lc.layers[key][()].T
)
# TODO: Figure out the singleton obs elements
obs, obsm = _fmt_loom_axis_attrs(dict(lc.col_attrs), obs_names, obsm_mapping)
var, varm = _fmt_loom_axis_attrs(dict(lc.row_attrs), var_names, varm_mapping)
uns = {}
if cleanup:
uns_obs = {}
for key in obs.columns:
if len(obs[key].unique()) == 1:
uns_obs[key] = obs[key].iloc[0]
del obs[key]
if uns_obs:
uns["loom-obs"] = uns_obs
uns_var = {}
for key in var.columns:
if len(var[key].unique()) == 1:
uns_var[key] = var[key].iloc[0]
del var[key]
if uns_var:
uns["loom-var"] = uns_var
adata = AnnData(
X,
obs=obs,
var=var,
layers=layers,
obsm=obsm if obsm else None,
varm=varm if varm else None,
uns=uns,
)
return adata
[docs]
def read_mtx(filename: PathLike, dtype: str = "float32") -> AnnData:
"""\
Read `.mtx` file.
Parameters
----------
filename
The filename.
dtype
Numpy data type.
"""
from scipy.io import mmread
# could be rewritten accounting for dtype to be more performant
X = mmread(fspath(filename)).astype(dtype)
from scipy.sparse import csr_matrix
X = csr_matrix(X)
return AnnData(X)
def read_text(
filename: PathLike | Iterator[str],
delimiter: str | None = None,
first_column_names: bool | None = None,
dtype: str = "float32",
) -> AnnData:
"""\
Read `.txt`, `.tab`, `.data` (text) file.
Same as :func:`~anndata.read_csv` but with default delimiter `None`.
Parameters
----------
filename
Data file, filename or stream.
delimiter
Delimiter that separates data within text file. If `None`, will split at
arbitrary number of white spaces, which is different from enforcing
splitting at single white space `' '`.
first_column_names
Assume the first column stores row names.
dtype
Numpy data type.
"""
if not isinstance(filename, (PathLike, str, bytes)):
return _read_text(filename, delimiter, first_column_names, dtype)
filename = Path(filename)
if filename.suffix == ".gz":
with gzip.open(str(filename), mode="rt") as f:
return _read_text(f, delimiter, first_column_names, dtype)
elif filename.suffix == ".bz2":
with bz2.open(str(filename), mode="rt") as f:
return _read_text(f, delimiter, first_column_names, dtype)
else:
with filename.open() as f:
return _read_text(f, delimiter, first_column_names, dtype)
def _iter_lines(file_like: Iterable[str]) -> Generator[str, None, None]:
"""Helper for iterating only nonempty lines without line breaks"""
for line in file_like:
line = line.rstrip("\r\n")
if line:
yield line
def _read_text(
f: Iterator[str],
delimiter: str | None,
first_column_names: bool | None,
dtype: str,
) -> AnnData:
comments = []
data = []
lines = _iter_lines(f)
col_names = []
row_names = []
# read header and column names
for line in lines:
if line.startswith("#"):
comment = line.lstrip("# ")
if comment:
comments.append(comment)
else:
if delimiter is not None and delimiter not in line:
raise ValueError(f"Did not find delimiter {delimiter!r} in first line.")
line_list = line.split(delimiter)
# the first column might be row names, so check the last
if not is_float(line_list[-1]):
col_names = line_list
# logg.msg(" assuming first line in file stores column names", v=4)
else:
if not is_float(line_list[0]) or first_column_names:
first_column_names = True
row_names.append(line_list[0])
data.append(np.array(line_list[1:], dtype=dtype))
else:
data.append(np.array(line_list, dtype=dtype))
break
if not col_names:
# try reading col_names from the last comment line
if len(comments) > 0:
# logg.msg(" assuming last comment line stores variable names", v=4)
col_names = np.array(comments[-1].split())
# just numbers as col_names
else:
# logg.msg(" did not find column names in file", v=4)
col_names = np.arange(len(data[0])).astype(str)
col_names = np.array(col_names, dtype=str)
# read another line to check if first column contains row names or not
if first_column_names is None:
first_column_names = False
for line in lines:
line_list = line.split(delimiter)
if first_column_names or not is_float(line_list[0]):
# logg.msg(" assuming first column in file stores row names", v=4)
first_column_names = True
row_names.append(line_list[0])
data.append(np.array(line_list[1:], dtype=dtype))
else:
data.append(np.array(line_list, dtype=dtype))
break
# if row names are just integers
if len(data) > 1 and data[0].size != data[1].size:
# logg.msg(
# " assuming first row stores column names and first column row names",
# v=4,
# )
first_column_names = True
col_names = np.array(data[0]).astype(int).astype(str)
row_names.append(data[1][0].astype(int).astype(str))
data = [data[1][1:]]
# parse the file
for line in lines:
line_list = line.split(delimiter)
if first_column_names:
row_names.append(line_list[0])
data.append(np.array(line_list[1:], dtype=dtype))
else:
data.append(np.array(line_list, dtype=dtype))
# logg.msg(" read data into list of lists", t=True, v=4)
# transform to array, this takes a long time and a lot of memory
# but it’s actually the same thing as np.genfromtxt does
# - we don’t use the latter as it would involve another slicing step
# in the end, to separate row_names from float data, slicing takes
# a lot of memory and CPU time
if data[0].size != data[-1].size:
raise ValueError(
f"Length of first line ({data[0].size}) is different "
f"from length of last line ({data[-1].size})."
)
data = np.array(data, dtype=dtype)
# logg.msg(" constructed array from list of list", t=True, v=4)
# transform row_names
if not row_names:
row_names = np.arange(len(data)).astype(str)
# logg.msg(" did not find row names in file", v=4)
else:
row_names = np.array(row_names)
for iname, name in enumerate(row_names):
row_names[iname] = name.strip('"')
# adapt col_names if necessary
if col_names.size > data.shape[1]:
col_names = col_names[1:]
for iname, name in enumerate(col_names):
col_names[iname] = name.strip('"')
return AnnData(
data,
obs=dict(obs_names=row_names),
var=dict(var_names=col_names),
)