from typing import Tuple, Optional, Any, Dict, Union
import numpy
from dolomite_base import save_object, validate_saves
import delayedarray
import os
import h5py
from .choose_chunk_dimensions import choose_chunk_dimensions
from . import _optimize_storage as optim
from . import _utils as ut
###################################################
###################################################
def _blockwise_write_to_hdf5(dhandle: h5py.Dataset, chunk_shape: Tuple, x: Any, placeholder: Any, buffer_size: int):
masked = delayedarray.is_masked(x)
is_string = numpy.issubdtype(dhandle.dtype, numpy.bytes_)
if placeholder is not None:
if is_string:
placeholder = placeholder.encode("UTF8")
else:
placeholder = dhandle.dtype.type(placeholder)
def _blockwise_dense_writer(pos: Tuple, block):
if masked:
block = ut.replace_mask_with_placeholder(block, placeholder, dhandle.dtype)
# h5py doesn't want to convert from numpy's Unicode type to bytes
# automatically, and fails: so fine, we'll do it ourselves.
if is_string:
block = block.astype(dhandle.dtype, copy=False)
# Block processing is inherently Fortran-order based (i.e., first
# dimension is assumed to change the fastest), and the blocks
# themselves are also in F-contiguous layout (i.e., column-major). By
# comparison HDF5 uses C order. To avoid any rearrangement of data
# by h5py, we save it as a transposed array for efficiency.
coords = [slice(start, end) for start, end in reversed(pos)]
dhandle[(*coords,)] = block.T
# Cost factor doesn't really matter here as we're not choosing between grids.
grid = delayedarray.chunk_shape_to_grid(chunk_shape, x.shape, cost_factor=10)
delayedarray.apply_over_blocks(x, _blockwise_dense_writer, grid = grid, buffer_size = buffer_size)
return
###################################################
###################################################
def _save_dense_array(
x: numpy.ndarray,
path: str,
dense_array_chunk_dimensions: Optional[Tuple[int, ...]] = None,
dense_array_chunk_args: Dict = {},
dense_array_buffer_size: int = 1e8,
**kwargs
):
os.mkdir(path)
# Coming up with a decent chunk size.
if dense_array_chunk_dimensions is None:
dense_array_chunk_dimensions = choose_chunk_dimensions(x.shape, x.dtype.itemsize, **dense_array_chunk_args)
else:
capped = []
for i, d in enumerate(x.shape):
capped.append(min(d, dense_array_chunk_dimensions[i]))
dense_array_chunk_dimensions = (*capped,)
# Choosing the smallest data type that we can use.
tt = None
blockwise = False
if numpy.issubdtype(x.dtype, numpy.integer):
tt = "integer"
opts = optim.optimize_integer_storage(x, buffer_size = dense_array_buffer_size)
elif numpy.issubdtype(x.dtype, numpy.floating):
tt = "number"
opts = optim.optimize_float_storage(x, buffer_size = dense_array_buffer_size)
elif x.dtype == numpy.bool_:
tt = "boolean"
opts = optim.optimize_boolean_storage(x, buffer_size = dense_array_buffer_size)
elif numpy.issubdtype(x.dtype, numpy.str_):
tt = "string"
opts = optim.optimize_string_storage(x, buffer_size = dense_array_buffer_size)
blockwise = True
else:
raise NotImplementedError("cannot save dense array of type '" + x.dtype.name + "'")
if opts.placeholder is not None:
blockwise = True
if not isinstance(x, numpy.ndarray):
blockwise = True
fpath = os.path.join(path, "array.h5")
with h5py.File(fpath, "w") as handle:
ghandle = handle.create_group("dense_array")
ghandle.attrs["type"] = tt
if not blockwise:
# Saving it in transposed form if it's in Fortran order (i.e., first dimensions are fastest).
# This avoids the need for any data reorganization inside h5py itself.
if x.flags.f_contiguous:
x = x.T
dense_array_chunk_dimensions = (*reversed(dense_array_chunk_dimensions),)
ghandle.attrs.create("transposed", data=1, dtype="i1")
else:
ghandle.attrs.create("transposed", data=0, dtype="i1")
dhandle = ghandle.create_dataset("data", data=x, chunks=dense_array_chunk_dimensions, dtype=opts.type, compression="gzip")
else:
# Block processing of a dataset is always Fortran order, but HDF5 uses C order.
# So, we save the blocks in transposed form for efficiency.
ghandle.attrs.create("transposed", data=1, dtype="i1")
dhandle = ghandle.create_dataset("data", shape=(*reversed(x.shape),), chunks=(*reversed(dense_array_chunk_dimensions),), dtype=opts.type, compression="gzip")
_blockwise_write_to_hdf5(dhandle, chunk_shape=dense_array_chunk_dimensions, x=x, placeholder=opts.placeholder, buffer_size=dense_array_buffer_size)
if opts.placeholder is not None:
dhandle.attrs.create("missing-value-placeholder", data=opts.placeholder, dtype=opts.type)
with open(os.path.join(path, "OBJECT"), "w") as handle:
handle.write('{ "type": "dense_array", "dense_array": { "version": "1.0" } }')
###################################################
###################################################
[docs]
@save_object.register
@validate_saves
def save_dense_array_from_ndarray(
x: numpy.ndarray,
path: str,
dense_array_chunk_dimensions: Optional[Tuple[int, ...]] = None,
dense_array_chunk_args: Dict = {},
dense_array_buffer_size: int = 1e8,
**kwargs
):
"""
Method for saving :py:class:`~numpy.ndarray` objects to disk, see
:py:meth:`~dolomite_base.save_object.save_object` for details.
Args:
x: Object to be saved.
path: Path to a directory to save ``x``.
dense_array_chunk_dimensions:
Chunk dimensions for the HDF5 dataset. Larger values improve
compression at the potential cost of reducing random access
efficiency. If not provided, we choose some chunk sizes with
:py:meth:`~dolomite_matrix.choose_chunk_dimensions.choose_chunk_dimensions`.
dense_array_chunk_args:
Arguments to pass to ``choose_chunk_dimensions`` if
``dense_array_chunk_dimensions`` is not provided.
dense_array_buffer_size:
Size of the buffer in bytes, for blockwise processing and writing
to file. Larger values improve speed at the cost of memory.
kwargs: Further arguments, ignored.
Returns:
``x`` is saved to ``path``.
"""
_save_dense_array(
x,
path=path,
dense_array_chunk_dimensions=dense_array_chunk_dimensions,
dense_array_chunk_args = dense_array_chunk_args,
dense_array_buffer_size = dense_array_buffer_size,
**kwargs
)