Source code for mdvtools.tests.mock_anndata

#!/usr/bin/env python3
"""
Mock AnnData generation module for testing and stress testing.

This module provides utilities to create realistic AnnData objects with various
configurations, data types, and edge cases for comprehensive testing of the
MDV conversion pipeline.
"""

import numpy as np
import pandas as pd
import scanpy as sc
import scipy
import scipy.sparse
from typing import Dict, List, Optional, Union, Any
import warnings
from contextlib import contextmanager


@contextmanager
[docs] def suppress_anndata_warnings(): """Context manager to suppress expected AnnData warnings.""" with warnings.catch_warnings(): warnings.filterwarnings("ignore", message="Transforming to str index", category=UserWarning) yield
[docs] def chunked_log1p_normalization(sparse_matrix, chunk_size=1000): """Perform log1p normalization in chunks to avoid dense matrices.""" if not scipy.sparse.issparse(sparse_matrix): # For dense matrices, just apply log1p directly return np.log1p(sparse_matrix) n_cells, n_genes = sparse_matrix.shape if n_cells <= chunk_size: # preserve sparsity result = sparse_matrix.copy() result.data = np.log1p(result.data) return result # For large matrices, process in chunks and vstack chunks = [] for chunk_start in range(0, n_cells, chunk_size): chunk_end = min(chunk_start + chunk_size, n_cells) chunk = sparse_matrix[chunk_start:chunk_end, :].toarray() chunk_log = np.log1p(chunk) chunk_sparse = scipy.sparse.csr_matrix(chunk_log) chunks.append(chunk_sparse) return scipy.sparse.vstack(chunks, format='csr')
[docs] def chunked_zscore_normalization(sparse_matrix, chunk_size=1000): """Perform z-score normalization in chunks to avoid dense matrices.""" if not scipy.sparse.issparse(sparse_matrix): X_dense = np.asarray(sparse_matrix) return (X_dense - X_dense.mean(axis=0)) / (X_dense.std(axis=0) + 1e-8) n_cells, n_genes = sparse_matrix.shape if n_cells <= chunk_size: X_dense = sparse_matrix.toarray() return (X_dense - X_dense.mean(axis=0)) / (X_dense.std(axis=0) + 1e-8) # Compute mean and std across all rows total_sum = np.zeros(n_genes) total_sum_sq = np.zeros(n_genes) for chunk_start in range(0, n_cells, chunk_size): chunk_end = min(chunk_start + chunk_size, n_cells) chunk = sparse_matrix[chunk_start:chunk_end, :].toarray() total_sum += chunk.sum(axis=0) total_sum_sq += (chunk ** 2).sum(axis=0) mean_vals = total_sum / n_cells var_vals = (total_sum_sq / n_cells) - (mean_vals ** 2) std_vals = np.sqrt(var_vals + 1e-8) # Now apply normalization in chunks and vstack chunks = [] for chunk_start in range(0, n_cells, chunk_size): chunk_end = min(chunk_start + chunk_size, n_cells) chunk = sparse_matrix[chunk_start:chunk_end, :].toarray() chunk_normalized = (chunk - mean_vals) / std_vals chunk_sparse = scipy.sparse.csr_matrix(chunk_normalized) chunks.append(chunk_sparse) return scipy.sparse.vstack(chunks, format='csr')
[docs] def estimate_memory_usage(n_cells, n_genes, sparse=True): """Estimate memory usage for a dataset. Args: n_cells: Number of cells n_genes: Number of genes sparse: Whether the matrix is sparse Returns: Estimated memory usage in MB """ if sparse: # Estimate 10% sparsity for typical single-cell data sparsity = 0.1 nnz = int(n_cells * n_genes * sparsity) # 8 bytes per value + 4 bytes per index + 4 bytes per indptr memory_bytes = nnz * 16 + n_cells * 4 else: # Dense matrix: 8 bytes per element memory_bytes = n_cells * n_genes * 8 return memory_bytes / (1024 * 1024) # Convert to MB
[docs] class MockAnnDataFactory: """Factory class for creating mock AnnData objects with various configurations.""" def __init__(self, random_seed: Optional[int] = None): """Initialize the factory with optional random seed.""" if random_seed is not None: np.random.seed(random_seed)
[docs] def create_minimal(self, n_cells: int = 10, n_genes: int = 5, add_missing: bool = False) -> sc.AnnData: """Create a minimal AnnData object for basic testing.""" return self._create_anndata( n_cells=n_cells, n_genes=n_genes, add_missing=add_missing, add_dim_reductions=False, add_layers=False, add_uns=False )
[docs] def create_realistic(self, n_cells: int = 1000, n_genes: int = 2000, add_missing: bool = True) -> sc.AnnData: """Create a realistic AnnData object with typical single-cell data features.""" return self._create_anndata( n_cells=n_cells, n_genes=n_genes, add_missing=add_missing, add_dim_reductions=True, add_layers=True, add_uns=True )
[docs] def create_large(self, n_cells: int = 10000, n_genes: int = 5000, add_missing: bool = True, density: float = 0.1) -> sc.AnnData: """Create a large AnnData object for stress testing.""" return self._create_anndata( n_cells=n_cells, n_genes=n_genes, add_missing=add_missing, add_dim_reductions=True, add_layers=True, add_uns=True, sparse_matrix=True, density=density )
[docs] def create_memory_efficient_large(self, n_cells: int = 10000, n_genes: int = 5000, add_missing: bool = True, density: float = 0.1) -> sc.AnnData: """Create a large AnnData object optimized for memory efficiency. This method creates large datasets without dense layers to avoid excessive memory consumption during stress testing. """ return self._create_anndata( n_cells=n_cells, n_genes=n_genes, add_missing=add_missing, add_dim_reductions=False, # Skip dense dimensionality reductions add_layers=False, # Skip dense layers add_uns=False, # Skip unstructured data for memory efficiency sparse_matrix=True, density=density, minimal_metadata=True # Use minimal metadata for large datasets )
[docs] def create_massive_dataset(self, n_cells: int = 100000, n_genes: int = 10000, add_missing: bool = True, density: float = 0.1, chunk_size: int = 10000, mode: str = 'realistic') -> sc.AnnData: """Create a massive dataset (100k+ cells) for extreme stress testing. This method uses chunked operations and memory-efficient approaches to handle datasets that would otherwise cause memory issues. Args: n_cells: Number of cells n_genes: Number of genes add_missing: Whether to add missing values density: Density of non-zero elements chunk_size: Size of chunks for matrix generation mode: Generation mode - 'realistic', 'fast', or 'skeleton' """ print(f"Creating massive dataset: {n_cells:,} cells x {n_genes:,} genes") print(f"Estimated memory usage: {estimate_memory_usage(n_cells, n_genes, sparse=True):.1f}MB (sparse)") print(f"Mode: {mode}, Chunk size: {chunk_size:,}") return self._create_anndata( n_cells=n_cells, n_genes=n_genes, add_missing=add_missing, add_dim_reductions=False, # Skip dense dimensionality reductions add_layers=False, # Skip layers for maximum memory efficiency add_uns=False, # Skip unstructured data for memory efficiency sparse_matrix=True, density=density, chunk_size=chunk_size, mode=mode, use_chunked_layers=False, # Disable chunked layer processing minimal_metadata=True # Use minimal metadata for large datasets )
[docs] def create_extreme_dataset(self, n_cells: int = 1000000, n_genes: int = 5000, density: float = 0.001, chunk_size: int = 50000, mode: str = 'fast') -> sc.AnnData: """Create an extreme dataset (1M+ cells) for ultimate stress testing. This method is optimized for generating very large datasets efficiently. Use 'fast' or 'skeleton' mode for best performance. Args: n_cells: Number of cells (default: 1M) n_genes: Number of genes (default: 5K) density: Density of non-zero elements (default: 0.1%) chunk_size: Size of chunks for matrix generation mode: Generation mode - 'fast' or 'skeleton' recommended for large datasets """ print(f"Creating extreme dataset: {n_cells:,} cells x {n_genes:,} genes") print(f"Mode: {mode}, Density: {density:.4f}") print(f"Estimated memory usage: {estimate_memory_usage(n_cells, n_genes, sparse=True):.1f}MB (sparse)") return self._create_anndata( n_cells=n_cells, n_genes=n_genes, add_missing=False, # Skip missing values for speed add_dim_reductions=False, # Skip dimensionality reductions add_layers=False, # Skip layers for speed add_uns=False, # Skip unstructured data sparse_matrix=True, density=density, chunk_size=chunk_size, mode=mode )
[docs] def create_edge_cases(self) -> sc.AnnData: """Create an AnnData object with various edge cases and problematic data.""" return self._create_edge_case_anndata()
[docs] def create_with_specific_features(self, cell_types: Optional[List[str]] = None, conditions: Optional[List[str]] = None, gene_types: Optional[List[str]] = None, n_cells: int = 100, n_genes: int = 200, **kwargs) -> sc.AnnData: """Create AnnData with specific categorical features.""" return self._create_anndata( n_cells=n_cells, n_genes=n_genes, cell_types=cell_types, conditions=conditions, gene_types=gene_types, **kwargs )
[docs] def _create_anndata(self, n_cells: int, n_genes: int, add_missing: bool = False, add_dim_reductions: bool = False, add_layers: bool = False, add_uns: bool = False, sparse_matrix: bool = False, density: float = 0.1, chunk_size: int = 10000, mode: str = 'realistic', cell_types: Optional[List[str]] = None, conditions: Optional[List[str]] = None, gene_types: Optional[List[str]] = None, use_chunked_layers: bool = False, minimal_metadata: bool = False) -> sc.AnnData: """Internal method to create AnnData with specified features.""" # Default categorical values if cell_types is None: cell_types = ['T-cell', 'B-cell', 'NK-cell', 'Monocyte', 'Dendritic'] if conditions is None: conditions = ['Control', 'Treatment', 'Disease'] if gene_types is None: gene_types = ['protein_coding', 'lncRNA', 'miRNA', 'pseudogene', 'rRNA'] # Create cell metadata (obs) obs_data = self._create_obs_data( n_cells, cell_types, conditions, add_missing, minimal_metadata ) # Create gene metadata (var) var_data = self._create_var_data( n_genes, gene_types, add_missing, minimal_metadata ) # Create expression matrix X = self._create_expression_matrix(n_cells, n_genes, sparse_matrix, density, chunk_size, mode) # Create AnnData object adata = sc.AnnData(X=X, obs=obs_data, var=var_data) # Add dimensionality reductions if add_dim_reductions: self._add_dimension_reductions(adata) # Add layers if add_layers: self._add_layers(adata, use_chunked_layers) # Add unstructured data if add_uns: self._add_unstructured_data(adata) return adata
[docs] def _create_obs_data(self, n_cells: int, cell_types: List[str], conditions: List[str], add_missing: bool, minimal_metadata: bool = False) -> pd.DataFrame: """Create cell metadata DataFrame.""" if minimal_metadata: # For large datasets, use minimal metadata to save memory obs_data = pd.DataFrame({ 'cell_type': pd.Categorical( np.random.choice(cell_types, n_cells) ), 'condition': pd.Categorical( np.random.choice(conditions, n_cells) ), 'quality_score': np.random.normal(0, 1, n_cells) }) else: # Generate probability arrays that match the number of categories cell_type_probs = [1.0 / len(cell_types)] * len(cell_types) condition_probs = [1.0 / len(conditions)] * len(conditions) obs_data = pd.DataFrame({ 'cell_type': pd.Categorical( np.random.choice(cell_types, n_cells, p=cell_type_probs) ), 'condition': pd.Categorical( np.random.choice(conditions, n_cells, p=condition_probs) ), 'quality_score': np.random.normal(0, 1, n_cells), 'total_counts': np.random.exponential(1000, n_cells), 'n_genes_by_counts': np.random.poisson(2000, n_cells), 'pct_counts_mt': np.random.beta(2, 20, n_cells) * 10, 'is_high_quality': pd.Series( np.random.choice([True, False], n_cells, p=[0.8, 0.2]), dtype='object' ), 'is_doublet': pd.Series( np.random.choice([True, False], n_cells, p=[0.1, 0.9]), dtype='object' ), 'patient_id': pd.Categorical( [f'P{i:03d}' for i in np.random.randint(1, 21, n_cells)] ), 'batch': pd.Categorical( [f'batch_{i}' for i in np.random.randint(1, 6, n_cells)] ) }) # Add missing values if requested if add_missing: missing_indices = np.random.choice(n_cells, size=n_cells//10, replace=False) for idx in missing_indices: col = np.random.choice(['cell_type', 'condition', 'is_high_quality', 'is_doublet']) obs_data.loc[idx, col] = np.nan return obs_data
[docs] def _create_var_data(self, n_genes: int, gene_types: List[str], add_missing: bool, minimal_metadata: bool = False) -> pd.DataFrame: """Create gene metadata DataFrame.""" if minimal_metadata: # For large datasets, use minimal metadata to save memory var_data = pd.DataFrame({ 'gene_type': pd.Categorical( np.random.choice(gene_types, n_genes) ), 'chromosome': pd.Categorical( [f'chr{i}' for i in np.random.randint(1, 23, n_genes)] ), 'name': [f'GENE_{i:05d}' for i in range(n_genes)] }) else: # Generate probability arrays that match the number of categories gene_type_probs = [1.0 / len(gene_types)] * len(gene_types) var_data = pd.DataFrame({ 'gene_type': pd.Categorical( np.random.choice(gene_types, n_genes, p=gene_type_probs) ), 'chromosome': pd.Categorical( [f'chr{i}' for i in np.random.randint(1, 23, n_genes)] ), 'mean_expression': np.random.exponential(1, n_genes), 'highly_variable': np.random.choice([True, False], n_genes, p=[0.2, 0.8]), 'mt': pd.Series( [name.startswith('MT-') for name in [f'GENE_{i:05d}' for i in range(n_genes)]], dtype='object' ), 'ribosomal': pd.Series( [name.startswith('RPS') or name.startswith('RPL') for name in [f'GENE_{i:05d}' for i in range(n_genes)]], dtype='object' ), 'name': [f'GENE_{i:05d}' for i in range(n_genes)] }) # Add missing values if requested if add_missing: missing_indices = np.random.choice(n_genes, size=n_genes//10, replace=False) for idx in missing_indices: col = np.random.choice(['gene_type', 'highly_variable', 'mt', 'ribosomal']) var_data.loc[idx, col] = np.nan return var_data
[docs] def _create_expression_matrix(self, n_cells: int, n_genes: int, sparse: bool = False, density: float = 0.1, chunk_size: int = 10000, mode: str = 'realistic') -> Union[np.ndarray, scipy.sparse.spmatrix]: """Create expression matrix with realistic single-cell data patterns. Args: n_cells: Number of cells n_genes: Number of genes sparse: Whether to create a sparse matrix density: Density of non-zero elements (0.0 to 1.0). Default 0.1 (10% non-zero) chunk_size: Size of chunks for large matrix generation mode: Generation mode - 'realistic' (unique indices), 'fast' (may have duplicates), or 'skeleton' (structure only, no values) """ if sparse: # Ensure density is valid density = max(0.0, min(1.0, density)) # For very large matrices, use chunked generation if n_cells * n_genes > 100_000_000: # 100M elements threshold return self._create_chunked_sparse_matrix(n_cells, n_genes, density, chunk_size, mode) # For smaller matrices, use the existing optimized approach return self._create_single_sparse_matrix(n_cells, n_genes, density, mode) else: # Create dense matrix X = np.random.negative_binomial(5, 0.3, (n_cells, n_genes)) return X
[docs] def _create_single_sparse_matrix(self, n_cells: int, n_genes: int, density: float, mode: str) -> scipy.sparse.spmatrix: """Create a single sparse matrix using the optimized approach.""" # Calculate number of non-zero elements nnz = int(n_cells * n_genes * density) # For very sparse matrices, use a more efficient approach if density < 0.01: # Less than 1% density return self._create_very_sparse_matrix(n_cells, n_genes, nnz, mode) # For moderately sparse matrices, use optimized approach if density < 0.3: # Less than 30% density return self._create_moderately_sparse_matrix(n_cells, n_genes, nnz, mode) # For dense matrices, use simple approach (duplicates are less likely) return self._create_dense_sparse_matrix(n_cells, n_genes, nnz, mode)
[docs] def _create_chunked_sparse_matrix(self, n_cells: int, n_genes: int, density: float, chunk_size: int, mode: str) -> scipy.sparse.spmatrix: """Create large sparse matrices using chunked generation.""" import scipy.sparse as sp print(f"Generating chunked sparse matrix: {n_cells:,} cells × {n_genes:,} genes, density={density:.3f}") # Calculate total number of non-zero elements total_nnz = int(n_cells * n_genes * density) if mode == 'skeleton': # For skeleton mode, just create the structure without filling values print("Creating skeleton matrix (structure only)") return self._create_skeleton_matrix(n_cells, n_genes, total_nnz) # For very large matrices, use a more memory-efficient approach if total_nnz > 5_000_000: # 5M non-zero elements threshold print("Using memory-efficient incremental CSR construction...") return self._create_incremental_csr_matrix(n_cells, n_genes, density, chunk_size, mode) # For smaller matrices, use the original approach all_rows = [] all_cols = [] all_values = [] for chunk_start in range(0, n_cells, chunk_size): chunk_end = min(chunk_start + chunk_size, n_cells) chunk_cells = chunk_end - chunk_start # Adjust nnz for this chunk chunk_nnz = int(chunk_cells * n_genes * density) if mode == 'realistic': # Generate unique indices for this chunk chunk_rows, chunk_cols, chunk_vals = self._generate_unique_chunk_indices( chunk_cells, n_genes, chunk_nnz, chunk_start ) else: # fast mode # Generate indices quickly (may have duplicates) chunk_rows, chunk_cols, chunk_vals = self._generate_fast_chunk_indices( chunk_cells, n_genes, chunk_nnz, chunk_start ) all_rows.extend(chunk_rows) all_cols.extend(chunk_cols) all_values.extend(chunk_vals) if chunk_start % (chunk_size * 10) == 0: print(f" Processed {chunk_start:,}/{n_cells:,} cells") # Create the final sparse matrix print("Assembling final sparse matrix...") X = sp.csr_matrix((all_values, (all_rows, all_cols)), shape=(n_cells, n_genes)) print(f"Final matrix: {X.shape}, nnz: {X.nnz:,}, density: {X.nnz/(n_cells*n_genes):.6f}") return X
[docs] def _create_incremental_csr_matrix(self, n_cells: int, n_genes: int, density: float, chunk_size: int, mode: str) -> scipy.sparse.spmatrix: """Create large sparse matrices using incremental CSR construction to save memory.""" import scipy.sparse as sp # Pre-allocate CSR matrix structure total_nnz = int(n_cells * n_genes * density) # Initialize CSR arrays indptr = np.zeros(n_cells + 1, dtype=np.int32) indices = np.zeros(total_nnz, dtype=np.int32) data = np.zeros(total_nnz, dtype=np.float32) current_nnz = 0 for chunk_start in range(0, n_cells, chunk_size): chunk_end = min(chunk_start + chunk_size, n_cells) chunk_cells = chunk_end - chunk_start # Adjust nnz for this chunk chunk_nnz = int(chunk_cells * n_genes * density) if mode == 'realistic': # Generate unique indices for this chunk chunk_rows, chunk_cols, chunk_vals = self._generate_unique_chunk_indices( chunk_cells, n_genes, chunk_nnz, chunk_start ) else: # fast mode # Generate indices quickly (may have duplicates) chunk_rows, chunk_cols, chunk_vals = self._generate_fast_chunk_indices( chunk_cells, n_genes, chunk_nnz, chunk_start ) # Sort by row for CSR format sorted_indices = np.argsort(chunk_rows) chunk_rows = chunk_rows[sorted_indices] chunk_cols = chunk_cols[sorted_indices] chunk_vals = chunk_vals[sorted_indices] # Fill CSR arrays for i, (row, col, val) in enumerate(zip(chunk_rows, chunk_cols, chunk_vals)): if current_nnz >= total_nnz: break indices[current_nnz] = col data[current_nnz] = val current_nnz += 1 # Update indptr for this chunk for row in range(chunk_start, chunk_end): indptr[row + 1] = current_nnz if chunk_start % (chunk_size * 10) == 0: print(f" Processed {chunk_start:,}/{n_cells:,} cells") # Create CSR matrix X = sp.csr_matrix((data[:current_nnz], indices[:current_nnz], indptr), shape=(n_cells, n_genes)) print(f"Final matrix: {X.shape}, nnz: {X.nnz:,}, density: {X.nnz/(n_cells*n_genes):.6f}") return X
[docs] def _generate_unique_chunk_indices(self, chunk_cells: int, n_genes: int, nnz: int, row_offset: int) -> tuple: """Generate unique indices for a chunk.""" # Use rejection sampling for unique indices target_size = int(nnz * 1.2) # Generate 20% more to account for duplicates # Generate initial indices rows = np.random.randint(0, chunk_cells, target_size) + row_offset cols = np.random.randint(0, n_genes, target_size) # Create unique pairs pairs = set(zip(rows, cols)) # If we don't have enough unique pairs, generate more while len(pairs) < nnz: additional_size = min(1000, nnz - len(pairs)) new_rows = np.random.randint(0, chunk_cells, additional_size) + row_offset new_cols = np.random.randint(0, n_genes, additional_size) pairs.update(zip(new_rows, new_cols)) # Take exactly nnz pairs unique_pairs = list(pairs)[:nnz] rows = np.array([p[0] for p in unique_pairs]) cols = np.array([p[1] for p in unique_pairs]) values = self._generate_realistic_expression_values(nnz) return rows, cols, values
[docs] def _generate_fast_chunk_indices(self, chunk_cells: int, n_genes: int, nnz: int, row_offset: int) -> tuple: """Generate indices quickly (may have duplicates).""" rows = np.random.randint(0, chunk_cells, nnz) + row_offset cols = np.random.randint(0, n_genes, nnz) values = self._generate_realistic_expression_values(nnz) return rows, cols, values
[docs] def _create_skeleton_matrix(self, n_cells: int, n_genes: int, nnz: int) -> scipy.sparse.spmatrix: """Create a skeleton matrix with structure but no meaningful values.""" import scipy.sparse as sp # Generate random indices (no need to worry about duplicates for skeleton) rows = np.random.randint(0, n_cells, nnz) cols = np.random.randint(0, n_genes, nnz) # Use placeholder values (all 1s or random small integers) values = np.ones(nnz, dtype=np.int32) return sp.csr_matrix((values, (rows, cols)), shape=(n_cells, n_genes))
[docs] def _create_moderately_sparse_matrix(self, n_cells: int, n_genes: int, nnz: int, mode: str) -> scipy.sparse.spmatrix: """Create moderately sparse matrices efficiently with guaranteed unique indices.""" import scipy.sparse as sp if mode == 'fast': # Fast mode - may have duplicates cell_indices = np.random.randint(0, n_cells, nnz) gene_indices = np.random.randint(0, n_genes, nnz) values = self._generate_realistic_expression_values(nnz) elif mode == 'skeleton': # Skeleton mode - just structure cell_indices = np.random.randint(0, n_cells, nnz) gene_indices = np.random.randint(0, n_genes, nnz) values = np.ones(nnz, dtype=np.int32) else: # realistic mode # Use a more efficient approach for moderate sparsity # Generate indices using rejection sampling with larger initial sample target_size = int(nnz * 1.2) # Generate 20% more to account for duplicates # Generate initial indices cell_indices = np.random.randint(0, n_cells, target_size) gene_indices = np.random.randint(0, n_genes, target_size) # Create unique pairs pairs = set(zip(cell_indices, gene_indices)) # If we don't have enough unique pairs, generate more while len(pairs) < nnz: additional_size = min(1000, nnz - len(pairs)) new_cells = np.random.randint(0, n_cells, additional_size) new_genes = np.random.randint(0, n_genes, additional_size) pairs.update(zip(new_cells, new_genes)) # Take exactly nnz pairs unique_pairs = list(pairs)[:nnz] cell_indices = np.array([p[0] for p in unique_pairs]) gene_indices = np.array([p[1] for p in unique_pairs]) values = self._generate_realistic_expression_values(nnz) # Create sparse matrix return sp.csr_matrix((values, (cell_indices, gene_indices)), shape=(n_cells, n_genes))
[docs] def _create_dense_sparse_matrix(self, n_cells: int, n_genes: int, nnz: int, mode: str) -> scipy.sparse.spmatrix: """Create dense sparse matrices using simple approach (duplicates are less likely).""" import scipy.sparse as sp # For dense matrices, duplicates are less likely, so use simple approach cell_indices = np.random.randint(0, n_cells, nnz) gene_indices = np.random.randint(0, n_genes, nnz) if mode == 'skeleton': values = np.ones(nnz, dtype=np.int32) else: values = self._generate_realistic_expression_values(nnz) # Create sparse matrix return sp.csr_matrix((values, (cell_indices, gene_indices)), shape=(n_cells, n_genes))
[docs] def _create_very_sparse_matrix(self, n_cells: int, n_genes: int, nnz: int, mode: str) -> Any: """Create very sparse matrices efficiently using COO format.""" import scipy.sparse as sp # For very sparse matrices, use coordinate format for efficiency cell_indices = np.random.choice(n_cells, nnz, replace=True) gene_indices = np.random.choice(n_genes, nnz, replace=True) if mode == 'skeleton': values = np.ones(nnz, dtype=np.int32) else: values = self._generate_realistic_expression_values(nnz) # Create COO matrix and convert to CSR coo_matrix = sp.coo_matrix((values, (cell_indices, gene_indices)), shape=(n_cells, n_genes)) csr_matrix = coo_matrix.tocsr() return csr_matrix
[docs] def _generate_realistic_expression_values(self, n_values: int) -> np.ndarray: """Generate realistic single-cell expression values. Single-cell data typically follows a negative binomial distribution with many zeros and a long tail of high expression values. """ # Use negative binomial distribution for realistic counts # Parameters tuned for typical single-cell data r, p = 5, 0.3 # Shape and probability parameters # Generate base values values = np.random.negative_binomial(r, p, n_values) # Add some zeros to make it more realistic (some genes are truly not expressed) zero_prob = 0.3 # 30% chance of being zero zero_mask = np.random.random(n_values) < zero_prob values[zero_mask] = 0 # Ensure no negative values (shouldn't happen with negative binomial, but just in case) values = np.maximum(values, 0) return values
[docs] def _add_dimension_reductions(self, adata: sc.AnnData): """Add dimensionality reductions to the AnnData object.""" n_cells = adata.n_obs n_genes = adata.n_vars # PCA for cells n_pcs = min(50, n_cells, n_genes) pca_cells = np.random.normal(0, 1, (n_cells, n_pcs)) adata.obsm['X_pca'] = pca_cells # UMAP for cells umap_cells = np.random.normal(0, 1, (n_cells, 2)) adata.obsm['X_umap'] = umap_cells # t-SNE for cells tsne_cells = np.random.normal(0, 1, (n_cells, 2)) adata.obsm['X_tsne'] = tsne_cells # PCA for genes pca_genes = np.random.normal(0, 1, (n_genes, n_pcs)) adata.varm['PCs'] = pca_genes
[docs] def _add_layers(self, adata: sc.AnnData, use_chunked_layers: bool): """Add expression layers to the AnnData object.""" # Ensure adata.X is valid assert adata.X is not None, "adata.X cannot be None" assert hasattr(adata.X, 'shape'), "adata.X must have a shape attribute" assert adata.X.shape == (adata.n_obs, adata.n_vars), f"adata.X shape {adata.X.shape} doesn't match AnnData dimensions ({adata.n_obs}, {adata.n_vars})" # Raw counts - preserve sparsity if input is sparse if scipy.sparse.issparse(adata.X): # For sparse matrices, preserve sparsity by copying the sparse structure adata.layers['counts'] = adata.X.copy() # type: ignore else: # For dense arrays, use numpy copy adata.layers['counts'] = np.array(adata.X, copy=True) # Log-normalized data - use chunked normalization if requested if use_chunked_layers: log_data = chunked_log1p_normalization(adata.X) else: if scipy.sparse.issparse(adata.X): # preserve sparsity log_data = scipy.sparse.csc_matrix(adata.X.copy()) # type: ignore log_data.data = np.log1p(log_data.data) else: log_data = np.log1p(np.asarray(adata.X)) adata.layers['log1p'] = log_data # type: ignore # Scaled data - use chunked normalization for large datasets if use_chunked_layers and scipy.sparse.issparse(log_data): # Use chunked normalization directly on sparse matrix scaled_data = chunked_zscore_normalization(log_data) else: # For smaller datasets or when chunking is disabled, use traditional approach if scipy.sparse.issparse(log_data): log_dense = log_data.toarray() # type: ignore else: log_dense = np.asarray(log_data) if use_chunked_layers: scaled_data = chunked_zscore_normalization(log_dense) else: scaled_data = (log_dense - log_dense.mean(axis=0)) / (log_dense.std(axis=0) + 1e-8) # type: ignore adata.layers['scaled'] = scaled_data # type: ignore
[docs] def _add_unstructured_data(self, adata: sc.AnnData): """Add unstructured data to the AnnData object.""" adata.uns['neighbors'] = { 'params': {'n_neighbors': 15, 'metric': 'euclidean'}, 'connectivities_key': 'connectivities', 'distances_key': 'distances' } adata.uns['leiden'] = { 'params': {'resolution': 0.5, 'random_state': 0}, 'connectivities_key': 'connectivities' } adata.uns['rank_genes_groups'] = { 'params': {'groupby': 'leiden', 'method': 't-test'}, 'names': [['gene1', 'gene2', 'gene3'] for _ in range(5)], 'scores': [[1.5, 1.2, 1.0] for _ in range(5)], 'logfoldchanges': [[0.8, 0.6, 0.4] for _ in range(5)] }
[docs] def _create_edge_case_anndata(self) -> sc.AnnData: """Create AnnData with various edge cases and problematic data.""" n_cells, n_genes = 50, 100 # Create obs with edge cases - ensure all arrays have the same length special_chars_pattern = ['test\nnewline', 'test\ttab', 'test"quote', 'test\'apos'] unicode_pattern = ['αβγδε', '🎉🎊🎈', '中文测试'] mixed_types_pattern = [1, 'string', True, 3.14, None] inf_values_pattern = [float('inf'), float('-inf')] boolean_pattern = [True, False, np.nan] # Repeat patterns to match n_cells special_chars = (special_chars_pattern * ((n_cells // len(special_chars_pattern)) + 1))[:n_cells] unicode_chars = (unicode_pattern * ((n_cells // len(unicode_pattern)) + 1))[:n_cells] mixed_types = (mixed_types_pattern * ((n_cells // len(mixed_types_pattern)) + 1))[:n_cells] inf_values = (inf_values_pattern * ((n_cells // len(inf_values_pattern)) + 1))[:n_cells] boolean_values = (boolean_pattern * ((n_cells // len(boolean_pattern)) + 1))[:n_cells] obs_data = pd.DataFrame({ 'empty_string': [''] * n_cells, 'very_long_string': ['A' * 1000] * n_cells, 'special_chars': special_chars, 'unicode': unicode_chars, 'mixed_types': mixed_types, 'all_nan': [np.nan] * n_cells, 'inf_values': inf_values, 'zero_variance': [42] * n_cells, 'boolean_with_nan': pd.Series(boolean_values, dtype='object') }) # Create var with edge cases - ensure all arrays have the same length mixed_dtypes_pattern = [1, 'string', True, 3.14] mixed_dtypes = (mixed_dtypes_pattern * ((n_genes // len(mixed_dtypes_pattern)) + 1))[:n_genes] var_data = pd.DataFrame({ 'empty_categories': pd.Categorical([''] * n_genes), 'single_category': pd.Categorical(['same'] * n_genes), 'many_categories': pd.Categorical([f'cat_{i}' for i in range(n_genes)]), 'mixed_dtypes': mixed_dtypes, 'all_inf': [float('inf')] * n_genes, 'all_nan': [np.nan] * n_genes }) # Create problematic expression matrix X = np.random.normal(0, 1, (n_cells, n_genes)) # Add some problematic values X[0, 0] = float('inf') X[0, 1] = float('-inf') X[0, 2] = np.nan return sc.AnnData(X=X, obs=obs_data, var=var_data)
# Convenience functions for backward compatibility
[docs] def create_minimal_anndata(n_cells: int = 10, n_genes: int = 5, add_missing: bool = False) -> sc.AnnData: """Create minimal AnnData object for testing (backward compatibility). This function maintains exact backward compatibility with the original implementation to ensure existing tests continue to pass. """ # Cell metadata - use the original simple pattern obs_data = pd.DataFrame({ 'cell_type': pd.Categorical(['T-cell', 'B-cell'] * (n_cells // 2)), 'condition': pd.Categorical(['Control', 'Treatment'] * (n_cells // 2)), 'quality_score': np.random.normal(0, 1, n_cells) }) # Gene metadata - use the original simple pattern var_data = pd.DataFrame({ 'gene_type': pd.Categorical(['protein_coding', 'lncRNA'] * (n_genes // 2) + ['miRNA']), 'chromosome': pd.Categorical(['chr1', 'chr2'] * (n_genes // 2) + ['chrX']), 'mean_expression': np.random.exponential(1, n_genes) }) # Add missing values if requested - use the original deterministic placement if add_missing: obs_data.loc[2, 'cell_type'] = np.nan obs_data.loc[5, 'condition'] = np.nan var_data.loc[3, 'gene_type'] = np.nan # Expression matrix - use the original distribution X = np.random.negative_binomial(5, 0.3, (n_cells, n_genes)) return sc.AnnData(X=X, obs=obs_data, var=var_data)
[docs] def create_realistic_anndata(n_cells: int = 1000, n_genes: int = 2000, add_missing: bool = True) -> sc.AnnData: """Create realistic AnnData object with typical single-cell data features.""" factory = MockAnnDataFactory() return factory.create_realistic(n_cells, n_genes, add_missing)
[docs] def create_large_anndata(n_cells: int = 10000, n_genes: int = 5000, add_missing: bool = True, density: float = 0.1) -> sc.AnnData: """Create large AnnData object for stress testing.""" factory = MockAnnDataFactory() return factory.create_large(n_cells, n_genes, add_missing, density)
[docs] def create_memory_efficient_large_anndata(n_cells: int = 10000, n_genes: int = 5000, add_missing: bool = True, density: float = 0.1) -> sc.AnnData: """Create memory-efficient large AnnData object for stress testing. This function creates large datasets without dense layers to avoid excessive memory consumption during stress testing. """ factory = MockAnnDataFactory() return factory.create_memory_efficient_large(n_cells, n_genes, add_missing, density)
[docs] def create_massive_anndata(n_cells: int = 100000, n_genes: int = 10000, add_missing: bool = True, density: float = 0.1, chunk_size: int = 10000, mode: str = 'realistic') -> sc.AnnData: """Create massive AnnData object for extreme stress testing. This function creates datasets with 100k+ cells using chunked operations to handle memory efficiently. Suitable for testing with real-world scale data. """ factory = MockAnnDataFactory() return factory.create_massive_dataset(n_cells, n_genes, add_missing, density, chunk_size, mode)
[docs] def create_extreme_anndata(n_cells: int = 1000000, n_genes: int = 5000, density: float = 0.001, chunk_size: int = 50000, mode: str = 'fast') -> sc.AnnData: """Create extreme AnnData object for ultimate stress testing. This function creates datasets with 1M+ cells using optimized chunked operations. Use 'fast' or 'skeleton' mode for best performance with large datasets. """ factory = MockAnnDataFactory() return factory.create_extreme_dataset(n_cells, n_genes, density, chunk_size, mode)
[docs] def create_fast_large_anndata(n_cells: int = 100000, n_genes: int = 5000, density: float = 0.1, chunk_size: int = 10000) -> sc.AnnData: """Create large AnnData object using fast generation mode. This function prioritizes speed over perfect accuracy (may have duplicate indices). Suitable for stress testing where speed is more important than data quality. """ factory = MockAnnDataFactory() return factory.create_massive_dataset(n_cells, n_genes, add_missing=False, density=density, chunk_size=chunk_size, mode='fast')
[docs] def create_skeleton_anndata(n_cells: int = 100000, n_genes: int = 5000, density: float = 0.1, chunk_size: int = 10000) -> sc.AnnData: """Create large AnnData object with skeleton matrix (structure only). This function creates a matrix with the correct structure but placeholder values. Fastest option for testing pipeline structure without realistic data. """ factory = MockAnnDataFactory() return factory.create_massive_dataset(n_cells, n_genes, add_missing=False, density=density, chunk_size=chunk_size, mode='skeleton')
[docs] def create_edge_case_anndata() -> sc.AnnData: """Create AnnData object with various edge cases and problematic data.""" factory = MockAnnDataFactory() return factory.create_edge_cases()
# Utility functions for testing
[docs] def get_anndata_summary(adata: sc.AnnData) -> Dict[str, Any]: """Get a summary of AnnData object properties for testing.""" # Ensure adata.X is valid assert adata.X is not None, "adata.X cannot be None" return { 'n_cells': adata.n_obs, 'n_genes': adata.n_vars, 'obs_columns': list(adata.obs.columns), 'var_columns': list(adata.var.columns), 'obsm_keys': list(adata.obsm.keys()), 'varm_keys': list(adata.varm.keys()), 'layers_keys': list(adata.layers.keys()), 'uns_keys': list(adata.uns.keys()), 'sparse': hasattr(adata.X, 'toarray'), 'has_missing_obs': bool(adata.obs.isnull().values.any()), 'has_missing_var': bool(adata.var.isnull().values.any()), 'categorical_obs': [col for col in adata.obs.columns if hasattr(adata.obs[col], 'cat')], 'categorical_var': [col for col in adata.var.columns if hasattr(adata.var[col], 'cat')] }
[docs] def validate_anndata(adata: sc.AnnData) -> bool: """Validate that AnnData object has expected structure.""" try: # Basic structure assert hasattr(adata, 'obs') and hasattr(adata, 'var') and hasattr(adata, 'X') assert adata.n_obs > 0 and adata.n_vars > 0 # Check that obs and var have the right number of rows assert len(adata.obs) == adata.n_obs assert len(adata.var) == adata.n_vars # Check that X is not None and has the right shape assert adata.X is not None, "adata.X cannot be None" assert hasattr(adata.X, 'shape'), "adata.X must have a shape attribute" assert adata.X.shape == (adata.n_obs, adata.n_vars), f"adata.X shape {adata.X.shape} doesn't match AnnData dimensions ({adata.n_obs}, {adata.n_vars})" return True except AssertionError: return False