Source code for malva.spacemake

"""
This code is adapted from spacemake.
"""

import anndata
import numpy as np
import pandas as pd
from scipy.sparse import csc_matrix, csr_matrix, dok_matrix, vstack, coo_matrix


[docs] def nonsingular(vmin, vmax, expander=0.001, tiny=1e-15, increasing=True): """ Modify the endpoints of a range as needed to avoid singularities. This code was adapted from matplotlib.transforms "Copyright (c) 2012- Matplotlib Development Team; All Rights Reserved" Args vmin, vmax: float, the initial endpoints. expander: float, default: 0.001, fractional amount by which *vmin* and *vmax* are expanded if the original interval is too small, based on *tiny*. tiny : float, default: 1e-15, hreshold for the ratio of the interval to the maximum absolute value of its endpoints. If the interval is smaller than this, it will be expanded. This value should be around 1e-15 or larger; otherwise the interval will be approaching the double precision resolution limit. increasing: bool, default: True, if True, swap *vmin*, *vmax* if *vmin* > *vmax*. Returns: vmin, vmax: float, endpoints, expanded and/or swapped if necessary. If either input is inf or NaN, or if both inputs are 0 or very close to zero, it returns -*expander*, *expander*. """ if (not np.isfinite(vmin)) or (not np.isfinite(vmax)): return -expander, expander swapped = False if vmax < vmin: vmin, vmax = vmax, vmin swapped = True vmin, vmax = map(float, [vmin, vmax]) maxabsvalue = max(abs(vmin), abs(vmax)) if maxabsvalue < (1e6 / tiny) * np.finfo(float).tiny: vmin = -expander vmax = expander elif vmax - vmin <= maxabsvalue * tiny: if vmax == 0 and vmin == 0: vmin = -expander vmax = expander else: vmin -= expander * abs(vmin) vmax += expander * abs(vmax) if swapped and not increasing: vmin, vmax = vmax, vmin return vmin, vmax
[docs] def create_mesh(width, height, diameter, distance, push_x=0, push_y=0): """ Create a mesh grid of points within a specified area, based on the given parameters. Args: width: float, the width of the area in which to create the mesh. height: float, the height of the area in which to create the mesh. diameter: float, the diameter of each mesh point (used to define spacing). distance: float, the distance between the centers of the mesh points. push_x: float, default: 0, horizontal offset for the starting point of the mesh. push_y: float, default: 0, vertical offset for the starting point of the mesh. Returns: xy: numpy.ndarray, matrix of shape (n_points, 2) with x, y coordinates of the mesh points. """ distance_y = np.sqrt(3) * distance x_coord = np.arange(push_x, width + diameter, distance) y_coord = np.arange(push_y, height + diameter, distance_y) X, Y = np.meshgrid(x_coord, y_coord) xy = np.vstack((X.flatten(), Y.flatten())).T return xy
[docs] def binning_hexagon(x, y, gridsize, extent=None, last_row=False): """Bins x,y points into a mesh of gridsize (across x axis, or xy axes). Points are assigned to the closest hexagon, without explicitly calculating pairwise distances. Does not require to create a mesh beforehand, this function handles the mesh and nearest neighbor. This code was adapted from matplotlib "Copyright (c) 2012- Matplotlib Development Team; All Rights Reserved" Args x, y: numpy.ndarray, x, y spatial coordinates of points gridsize: float or tuple, the amount of hexagons in x direction (float); y direction is automatically computed; or specified if gridsize is a tuple. extent: tuple, of (x_min, x_max, y_min, y_max) last_row: bool, whether a last row is created in mesh or not Returns: coordinates: numpy.ndarray, a (x_mesh x y_mesh) x 2 matrix, with the coordinates (centres) of each hexagon in the binned mesh accumulated: list, contains the indices from the x, y arrays that were binned to each hexagon in the mesh. """ if np.iterable(gridsize): nx, ny = gridsize else: nx = gridsize ny = int(nx / np.sqrt(3)) # Count the number of data in each hexagon x = np.asarray(x, float) y = np.asarray(y, float) # Will be log()'d if necessary, and then rescaled. tx = x ty = y if extent is not None: xmin, xmax, ymin, ymax = extent else: xmin, xmax = (tx.min(), tx.max()) if len(x) else (0, 1) ymin, ymax = (ty.min(), ty.max()) if len(y) else (0, 1) # to avoid issues with singular data, expand the min/max pairs xmin, xmax = nonsingular(xmin, xmax, expander=0.1) ymin, ymax = nonsingular(ymin, ymax, expander=0.1) nx1 = nx + 1 if last_row: ny1 = ny + 1 else: ny1 = ny nx2 = nx ny2 = ny n = nx1 * ny1 + nx2 * ny2 # In the x-direction, the hexagons exactly cover the region from # xmin to xmax. Need some padding to avoid roundoff errors. padding = 1.0e-9 * (xmax - xmin) xmin -= padding xmax += padding sx = (xmax - xmin) / nx sy = (ymax - ymin) / ny # Positions in hexagon index coordinates. ix = (tx - xmin) / sx iy = (ty - ymin) / sy ix1 = np.round(ix).astype(int) iy1 = np.round(iy).astype(int) ix2 = np.floor(ix).astype(int) iy2 = np.floor(iy).astype(int) # flat indices, plus one so that out-of-range points go to position 0. i1 = np.where((0 <= ix1) & (ix1 < nx1) & (0 <= iy1) & (iy1 < ny1), ix1 * ny1 + iy1 + 1, 0) i2 = np.where((0 <= ix2) & (ix2 < nx2) & (0 <= iy2) & (iy2 < ny2), ix2 * ny2 + iy2 + 1, 0) d1 = (ix - ix1) ** 2 + 3.0 * (iy - iy1) ** 2 d2 = (ix - ix2 - 0.5) ** 2 + 3.0 * (iy - iy2 - 0.5) ** 2 bdist = d1 < d2 Cs_at_i1 = [[] for _ in range(1 + nx1 * ny1)] Cs_at_i2 = [[] for _ in range(1 + nx2 * ny2)] for i in range(len(x)): if bdist[i]: Cs_at_i1[i1[i]].append(i) else: Cs_at_i2[i2[i]].append(i) accumulated = [acc for Cs_at_i in [Cs_at_i1, Cs_at_i2] for acc in Cs_at_i[1:]] coordinates = np.zeros((n, 2), float) coordinates[: nx1 * ny1, 0] = np.repeat(np.arange(nx1), ny1) coordinates[: nx1 * ny1, 1] = np.tile(np.arange(ny1), nx1) coordinates[nx1 * ny1 :, 0] = np.repeat(np.arange(nx2) + 0.5, ny2) coordinates[nx1 * ny1 :, 1] = np.tile(np.arange(ny2), nx2) + 0.5 coordinates[:, 0] *= sx coordinates[:, 1] *= sy coordinates[:, 0] += xmin coordinates[:, 1] += ymin return coordinates, accumulated
[docs] def aggregate_adata_by_indices(adata, idx_to_aggregate, idx_aggregated, coordinates_aggregated=None): """ Aggregate the data (along the `obs` dimensions) of an AnnData object by specified indices. The size of the resulting `obs` has to be less or equal than the size of the input. Args: adata: AnnData object, to be aggregated. idx_to_aggregate: array-like, the indices of observations to aggregate. idx_aggregated: array-like, the indices indicating how to aggregate the data. coordinates_aggregated: Union[numpy.ndarray, None], (optional) coordinates of the aggregated spots. Returns: aggregated_adata: AnnData object, the aggregated data, with spatial coordinates updated. The variable names are preserved from the input `adata`, and an additional observation field "n_joined" is included, representing the number of spots that were aggregated into each bin. """ adata = adata[idx_to_aggregate] n_bins = idx_aggregated.max() + 1 row_indices = idx_aggregated col_indices = np.arange(adata.X.shape[0]) data = np.ones_like(col_indices) binning_matrix = coo_matrix((data, (row_indices, col_indices)), shape=(n_bins, adata.X.shape[0])).tocsr() aggregated_data = binning_matrix.dot(adata.X) has_spots = np.unique(idx_aggregated) filtered_data = aggregated_data[has_spots] aggregated_adata = anndata.AnnData(X=filtered_data, obsm={'spatial': coordinates_aggregated}) aggregated_adata.var_names = adata.var_names aggregated_adata.obs["n_joined"] = binning_matrix[has_spots].sum(axis=1) from scipy.sparse import dok_matrix # Create mapping of which original indices were aggregated into each new index change_ix = np.where(idx_aggregated[:-1] != idx_aggregated[1:])[0] + 1 ix_array = np.asarray( np.split(np.arange(idx_aggregated.shape[0]), change_ix, axis=0), dtype="object" ) # Create the mapping dictionary joined_dict = {i: idx_to_aggregate[x.astype(int)] for i, x in enumerate(ix_array)} # Store the mapping as a sparse matrix indices_joined_spatial_units = dok_matrix( (len(joined_dict), len(adata.obs_names)), dtype=np.int8 ) for obs_name_aggregate, obs_name_to_aggregate in joined_dict.items(): indices_joined_spatial_units[obs_name_aggregate, obs_name_to_aggregate] = 1 indices_joined_spatial_units = indices_joined_spatial_units.tocsr() aggregated_adata.uns["spatial_units_obs_names"] = np.array(adata.obs_names) aggregated_adata.uns["indices_joined_spatial_units"] = indices_joined_spatial_units return aggregated_adata
[docs] def create_meshed_adata( adata, px_by_um, spot_diameter_um=55, spot_distance_um=100, bead_diameter_um=10, mesh_type="circle", start_at_minimum=False, optimized_binning=True, ): """ Create a meshed AnnData object by binning the spatial data into a grid. Args: adata: AnnData object, containing spatial data to be meshed. px_by_um: float, the pixel-to-micron ratio. spot_diameter_um: float, default: 55, the diameter of the spots in microns. spot_distance_um: float, default: 100, the distance between the centers of the spots in microns. bead_diameter_um: float, default: 10, the diameter of the beads in microns. mesh_type: str, default: "circle", the type of mesh to create ("circle" or "hexagon"). start_at_minimum: bool, default: False, if True, the mesh grid starts at the minimum coordinate. optimized_binning: bool, default: True, if True, use optimized binning method for creating the mesh. Returns: meshed_adata: AnnData object, the meshed data with aggregated observations according to the specified mesh. The spatial coordinates are updated to the mesh grid, and the original data is aggregated. """ import numpy as np from sklearn.metrics.pairwise import euclidean_distances if not mesh_type in ["circle", "hexagon"]: raise ValueError(f"unrecognised mesh type {mesh_type}") if mesh_type == "hexagon": spot_diameter_um = np.sqrt(3) * spot_diameter_um spot_distance_um = spot_diameter_um coords = adata.obsm["spatial"] if start_at_minimum: top_left_corner = np.min(coords, axis=0) else: top_left_corner = [0, 0] bottom_right_corner = np.max(coords, axis=0) width_px = abs(top_left_corner[0] - bottom_right_corner[0]) height_px = abs(top_left_corner[1] - bottom_right_corner[1]) # capture area width spot_radius_um = spot_diameter_um / 2 um_by_px = 1.0 / px_by_um width_um = um_by_px * width_px spot_diameter_px = spot_diameter_um / um_by_px spot_radius_px = spot_radius_um / um_by_px spot_distance_px = spot_distance_um / um_by_px height_um = height_px * um_by_px # create meshgrid with one radius push xy = create_mesh( width_um, height_um, spot_diameter_um, spot_distance_um, push_x=spot_radius_um, push_y=spot_radius_um, ) # create pushed meshgrid, pushed by xy_pushed = create_mesh( width_um, height_um, spot_diameter_um, spot_distance_um, push_x=spot_radius_um + spot_distance_um / 2, push_y=spot_radius_um + np.sqrt(3) * spot_distance_um / 2, ) mesh = np.vstack((xy, xy_pushed)) mesh_px = mesh / um_by_px # add the top_left_corner mesh_px = mesh_px + top_left_corner # example: the diameter of one visium spot is 55um. if we take any bead, which center # falls into that, assuming that a bead is 10um, we would in fact take all beads # within 65um diameter. for this reason, the max distance should be 45um/2 between # visium spot center and other beads max_distance_px = (spot_diameter_um - bead_diameter_um) / um_by_px / 2 def _create_optimized_hex_mesh_properties(mesh_px): _y_values = np.unique(mesh_px[:, 1]) _x_values = np.unique(mesh_px[:, 0]) grid_x = len(mesh_px[mesh_px[:, 1] == _y_values[1]]) grid_y = len(mesh_px[mesh_px[:, 0] == _x_values[1]]) # Calculating the extent if len(mesh_px[mesh_px[:, 0] == _x_values[0]]) == grid_y: _offset = np.diff(_y_values[0:2]) _last_row = False else: _offset = 0 _last_row = True _extent = ( mesh_px[:, 0].min(), mesh_px[:, 0].max(), mesh_px[:, 1].min(), mesh_px[:, 1].max() + _offset, ) return grid_x, grid_y, _extent, _last_row if mesh_type == "circle": if optimized_binning: grid_x, grid_y, _extent, _last_row = _create_optimized_hex_mesh_properties(mesh_px) mesh_px, accum = binning_hexagon( coords[:, 0], coords[:, 1], gridsize=(grid_x, grid_y), extent=_extent, last_row=_last_row, ) new_ilocs = [[i] * len(accum[i]) for i in range(len(accum))] new_ilocs = np.array([n for new_iloc in new_ilocs for n in new_iloc]) original_ilocs = np.concatenate(accum).astype(int) distance_filter = ( np.linalg.norm( np.array(coords[original_ilocs]) - np.array(mesh_px[new_ilocs]), axis=1, ) < max_distance_px ) new_ilocs = new_ilocs[distance_filter] original_ilocs = original_ilocs[distance_filter] else: distance_M = euclidean_distances(mesh_px, coords) # circle mesh type: we create spots in a hexagonal mesh # new_ilocs contains the indices of the columns of the sparse matrix to be created # original_ilocs contains the column location of the original adata.X (csr_matrix) new_ilocs, original_ilocs = np.nonzero(distance_M < max_distance_px) elif mesh_type == "hexagon": if optimized_binning: grid_x, grid_y, _extent, _last_row = _create_optimized_hex_mesh_properties(mesh_px) mesh_px, accum = binning_hexagon( coords[:, 0], coords[:, 1], gridsize=(grid_x, grid_y), extent=_extent, last_row=_last_row, ) new_ilocs = np.zeros(len(coords), dtype=int) for i in range(len(accum)): new_ilocs[accum[i]] = i else: # we simply create a hex mesh, without holes. For each spot, we find the # hexagon it belongs to. # Not recommended this non-optimized approach; high memory # usage if spot distance is low -> O(n^2) complexity! # Kept for reproducibility with legacy runs of spacemake distance_M = euclidean_distances(mesh_px, coords) new_ilocs = np.argmin(distance_M, axis=0) original_ilocs = np.arange(new_ilocs.shape[0]) # we need to sort the new ilocs so that they are in increasing order sorted_ix = np.argsort(new_ilocs) new_ilocs = new_ilocs[sorted_ix] original_ilocs = original_ilocs[sorted_ix] joined_coordinates = mesh_px[np.unique(new_ilocs)] meshed_adata = aggregate_adata_by_indices( adata, idx_to_aggregate=original_ilocs, idx_aggregated=new_ilocs, coordinates_aggregated=joined_coordinates, ) return meshed_adata