from typing import Any
from functools import singledispatch
from dolomite_base import save_object, validate_saves
from delayedarray import SparseNdarray, chunk_grid
import os
import h5py
import numpy
import delayedarray
from . import _utils as ut
from . import _optimize_storage as optim
has_scipy = False
try:
import scipy.sparse
has_scipy = True
except:
pass
def _choose_index_type(n: int) -> str:
if n < 2**8:
return "u1"
elif n < 2**16:
return "u2"
elif n < 2**32:
return "u4"
else:
return "u8"
###############################################
###############################################
@singledispatch
def _h5_write_sparse_matrix(x: Any, handle, details, compressed_sparse_matrix_buffer_size, compressed_sparse_matrix_chunk_size):
chunks = chunk_grid(x)
chunks_per_col = len(chunks.boundaries[0])
chunks_per_row = len(chunks.boundaries[1])
# If we have to extract fewer chunks per column, we iterate by column to
# create a CSC matrix. Otherwise we make a CSR matrix.
if chunks_per_col < chunks_per_row:
primary = 1
secondary = 0
handle.attrs["layout"] = "CSC"
else:
primary = 0
secondary = 1
handle.attrs["layout"] = "CSR"
compressed_sparse_matrix_chunk_size = min(compressed_sparse_matrix_chunk_size, details.non_zero)
dhandle = handle.create_dataset("data", shape = details.non_zero, dtype = details.type, compression = "gzip", chunks = compressed_sparse_matrix_chunk_size)
if details.placeholder is not None:
dhandle.attrs.create("missing-value-placeholder", data = details.placeholder, dtype = details.type)
masked = delayedarray.is_masked(x)
block_size = max(int(compressed_sparse_matrix_buffer_size) // (x.dtype.itemsize * x.shape[secondary]), 1)
limit = x.shape[primary]
itype = _choose_index_type(x.shape[secondary])
ihandle = handle.create_dataset("indices", shape = details.non_zero, dtype = itype, compression = "gzip", chunks = compressed_sparse_matrix_chunk_size)
indptrs = numpy.zeros(x.shape[primary] + 1, dtype = numpy.uint64)
counter = 0
subset = [None] * 2
subset[secondary] = range(x.shape[secondary])
for start in range(0, limit, block_size):
end = min(limit, start + block_size)
subset[primary] = range(start, end)
block = delayedarray.extract_sparse_array(x, (*subset,))
if block.contents is not None:
# Sparse2darrays are always CSC, so if we need to save it as CSR,
# we transpose it before we extract the contents.
if primary == 0:
block = block.T
original = counter
icollected = []
dcollected = []
for i, b in enumerate(block.contents):
if b is not None:
counter += len(b[0])
icollected.append(b[0])
vals = b[1]
if masked:
vals = ut.replace_mask_with_placeholder(vals, details.placeholder, dhandle.dtype)
dcollected.append(vals)
indptrs[start + i + 1] = counter
# Collecting everything in memory for a single write operation, avoid
# potential issues with writing/reloading partial chunks.
ihandle[original : counter] = numpy.concatenate(icollected)
dhandle[original : counter] = numpy.concatenate(dcollected)
handle.create_dataset("indptr", data=indptrs, dtype="u8", compression="gzip", chunks=True)
if has_scipy:
def _write_compressed_sparse_matrix(x: Any, handle, details, compressed_sparse_matrix_buffer_size, compressed_sparse_matrix_chunk_size, by_column):
if by_column:
primary = 1
secondary = 0
handle.attrs["layout"] = "CSC"
else:
primary = 0
secondary = 1
handle.attrs["layout"] = "CSR"
compressed_sparse_matrix_chunk_size = min(compressed_sparse_matrix_chunk_size, details.non_zero)
itype = _choose_index_type(x.shape[secondary])
handle.create_dataset("indices", data = x.indices, dtype = itype, compression = "gzip", chunks = compressed_sparse_matrix_chunk_size)
handle.create_dataset("indptr", data = x.indptr, dtype = "u8", compression = "gzip", chunks = True)
# Currently, it seems like scipy's sparse matrices are not intended
# to be masked, so we'll just ignore it completely.
handle.create_dataset("data", data = x.data, dtype = details.type, compression = "gzip", chunks = compressed_sparse_matrix_chunk_size)
@_h5_write_sparse_matrix.register
def _h5_write_sparse_matrix_from_csc_matrix(x: scipy.sparse.csc_matrix, handle, details, compressed_sparse_matrix_buffer_size, compressed_sparse_matrix_chunk_size):
_write_compressed_sparse_matrix(
x,
handle,
details,
compressed_sparse_matrix_buffer_size = compressed_sparse_matrix_buffer_size,
compressed_sparse_matrix_chunk_size = compressed_sparse_matrix_chunk_size,
by_column = True
)
@_h5_write_sparse_matrix.register
def _h5_write_sparse_matrix_from_csr_matrix(x: scipy.sparse.csr_matrix, handle, details, compressed_sparse_matrix_buffer_size, compressed_sparse_matrix_chunk_size):
_write_compressed_sparse_matrix(
x,
handle,
details,
compressed_sparse_matrix_buffer_size = compressed_sparse_matrix_buffer_size,
compressed_sparse_matrix_chunk_size = compressed_sparse_matrix_chunk_size,
by_column = False
)
###############################################
###############################################
def _save_compressed_sparse_matrix(x: Any, path: str, compressed_sparse_matrix_chunk_size: int = 10000, compressed_sparse_matrix_buffer_size: int = 1e8, **kwargs):
os.mkdir(path)
if len(x.shape) != 2:
raise ValueError("only 2-dimensional sparse arrays are currently supported")
with h5py.File(os.path.join(path, "matrix.h5"), "w") as handle:
ghandle = handle.create_group("compressed_sparse_matrix")
if numpy.issubdtype(x.dtype, numpy.integer):
tt = "integer"
opts = optim.optimize_integer_storage(x, buffer_size = compressed_sparse_matrix_buffer_size)
elif numpy.issubdtype(x.dtype, numpy.floating):
tt = "number"
opts = optim.optimize_float_storage(x, buffer_size = compressed_sparse_matrix_buffer_size)
elif x.dtype == numpy.bool_:
tt = "boolean"
opts = optim.optimize_boolean_storage(x, buffer_size = compressed_sparse_matrix_buffer_size)
else:
raise NotImplementedError("cannot save sparse matrix of type '" + x.dtype.name + "'")
ghandle.attrs["type"] = tt
ghandle.create_dataset("shape", data = x.shape, dtype = "u8")
_h5_write_sparse_matrix(
x,
handle = ghandle,
details = opts,
compressed_sparse_matrix_buffer_size = compressed_sparse_matrix_buffer_size,
compressed_sparse_matrix_chunk_size = compressed_sparse_matrix_chunk_size
)
with open(os.path.join(path, "OBJECT"), "w") as handle:
handle.write('{ "type": "compressed_sparse_matrix", "compressed_sparse_matrix": { "version": "1.0" } }')
[docs]
@save_object.register
@validate_saves
def save_compresssed_sparse_matrix_from_Sparse2darray(x: SparseNdarray, path: str, compressed_sparse_matrix_chunk_size: int = 10000, compressed_sparse_matrix_buffer_size: int = 1e8, **kwargs):
"""
Method for saving a :py:class:`~delayedarray.SparseNdarray.SparseNdarray`
to disk, see :py:meth:`~dolomite_base.save_object.save_object` for details.
Args:
x: Object to be saved.
path: Path to a directory to save ``x``.
compressed_sparse_matrix_chunk_size
Chunk size for the data and indices. Larger values improve compression
at the potential cost of reducing random access efficiency.
compressed_sparse_matrix_buffer_size:
Size of the buffer in bytes, for blockwise processing and writing
to file. Larger values improve speed at the cost of memory.
kwargs: Further arguments, ignored.
Returns:
``x`` is saved to ``path``.
"""
_save_compressed_sparse_matrix(
x,
path,
compressed_sparse_matrix_chunk_size = compressed_sparse_matrix_chunk_size,
compressed_sparse_matrix_buffer_size = compressed_sparse_matrix_buffer_size,
**kwargs
)
if has_scipy:
[docs]
@save_object.register
@validate_saves
def save_compresssed_sparse_matrix_from_scipy_csc_matrix(x: scipy.sparse.csc_matrix, path: str, compressed_sparse_matrix_chunk_size: int = 10000, compressed_sparse_matrix_buffer_size: int = 1e8, **kwargs):
"""
Method for saving :py:class:`~scipy.sparse.csc_matrix` objects to disk,
see :py:meth:`~dolomite_base.stage_object.stage_object` for details.
Args:
x: Matrix to be saved.
path: Path to a directory to save ``x``.
compressed_sparse_matrix_chunk_size
Chunk size for the data and indices. Larger values improve compression
at the potential cost of reducing random access efficiency.
compressed_sparse_matrix_cache_size:
Size of the buffer in bytes, for blockwise processing and writing
to file. Larger values improve speed at the cost of memory.
kwargs: Further arguments, ignored.
Returns:
``x`` is saved to ``path``.
"""
return _save_compressed_sparse_matrix(
x,
path,
compressed_sparse_matrix_chunk_size = compressed_sparse_matrix_chunk_size,
compressed_sparse_matrix_buffer_size = compressed_sparse_matrix_buffer_size,
**kwargs
)
[docs]
@save_object.register
@validate_saves
def save_compresssed_sparse_matrix_from_scipy_csr_matrix(x: scipy.sparse.csr_matrix, path: str, compressed_sparse_matrix_chunk_size: int = 10000, compressed_sparse_matrix_buffer_size: int = 1e8, **kwargs):
"""
Method for saving :py:class:`~scipy.sparse.csr_matrix` objects to disk,
see :py:meth:`~dolomite_base.stage_object.stage_object` for details.
Args:
x: Matrix to be saved.
path: Path to a directory to save ``x``.
compressed_sparse_matrix_chunk_size
Chunk size for the data and indices. Larger values improve compression
at the potential cost of reducing random access efficiency.
compressed_sparse_matrix_cache_size:
Size of the buffer in bytes, for blockwise processing and writing
to file. Larger values improve speed at the cost of memory.
kwargs: Further arguments, ignored.
Returns:
``x`` is saved to ``path``.
"""
return _save_compressed_sparse_matrix(
x,
path,
compressed_sparse_matrix_chunk_size = compressed_sparse_matrix_chunk_size,
compressed_sparse_matrix_buffer_size = compressed_sparse_matrix_buffer_size,
**kwargs
)
[docs]
@save_object.register
@validate_saves
def save_compresssed_sparse_matrix_from_scipy_coo_matrix(x: scipy.sparse.coo_matrix, path: str, compressed_sparse_matrix_chunk_size: int = 10000, compressed_sparse_matrix_buffer_size: int = 1e8, **kwargs):
"""
Method for saving :py:class:`~scipy.sparse.coo_matrix` objects to disk,
see :py:meth:`~dolomite_base.stage_object.stage_object` for details.
Args:
x: Matrix to be saved.
path: Path to a directory to save ``x``.
compressed_sparse_matrix_chunk_size
Chunk size for the data and indices. Larger values improve compression
at the potential cost of reducing random access efficiency.
compressed_sparse_matrix_cache_size:
Size of the buffer in bytes, for blockwise processing and writing
to file. Larger values improve speed at the cost of memory.
kwargs: Further arguments, ignored.
Returns:
``x`` is saved to ``path``.
"""
return _save_compressed_sparse_matrix(
x,
path,
compressed_sparse_matrix_chunk_size = compressed_sparse_matrix_chunk_size,
compressed_sparse_matrix_buffer_size = compressed_sparse_matrix_buffer_size,
**kwargs
)