Source code for mdvtools.conversions

from scanpy import AnnData
import scipy
import pandas as pd
from os.path import join, split
import os
from .mdvproject import MDVProject,create_bed_gz_file
import numpy as np
import json
import gzip
import copy
import yaml
import shutil


[docs] def convert_scanpy_to_mdv( folder: str, scanpy_object: AnnData, max_dims: int = 3, delete_existing: bool = False, label: str = "", chunk_data: bool = False, add_layer_data = True, gene_identifier_column = None ) -> MDVProject: """ Convert a Scanpy AnnData object to MDV (Multi-Dimensional Viewer) format. This function transforms single-cell RNA sequencing data from AnnData format into the MDV project structure, handling both cells and genes as separate datasources with their associated dimensionality reductions and metadata. Args: folder (str): Path to the target MDV project folder scanpy_object (AnnData): The AnnData object containing the single-cell data max_dims (int, optional): Maximum number of dimensions to include from dimensionality reductions. Defaults to 3. delete_existing (bool, optional): Whether to delete existing project data. If False, merges with existing data. Defaults to False. label (str, optional): Prefix to add to datasource names and metadata columns when merging with existing data. Defaults to "". chunk_data (bool, optional): For dense matrices, transposing and flattening will be performed in chunks. Saves memory but takes longer. Default is False. add_layer_data (bool, optional): If True (default) then the layer data (log values etc.) will be added, otherwise just the X object will be used gene_identifier_column: (str, optional) This is the gene column that the user will use to identify the gene. If not specified (default) than a column 'name' will be added that is created from the index (which is usaully the unique gene name) Returns: MDVProject: The configured MDV project object with the converted data Notes: Data Structure Creation: - Creates two main datasources: '{label}cells' and '{label}genes' - Preserves all cell metadata from scanpy_object.obs - Preserves all gene metadata from scanpy_object.var - Transfers dimension reductions from obsm/varm matrices - Links cells and genes through expression data - Adds gene expression scores as a subgroup View Handling: - If delete_existing=True: * Creates new default view with empty initial charts * Sets project as editable - If delete_existing=False: * Preserves existing views * Updates views with new datasources * Maintains panel widths and other view settings * Adds new datasources to each view's initialCharts Dimension Reduction: - Processes dimensionality reductions up to max_dims - Supports standard formats (e.g., PCA, UMAP, t-SNE) - Column names in the format: {reduction_name}_{dimension_number} Raises: ValueError: If the provided AnnData object is invalid or missing required components IOError: If there are issues with file operations in the target folder Exception: For other unexpected errors during conversion """ # Validate input AnnData if scanpy_object.n_obs == 0 or scanpy_object.n_vars == 0: raise ValueError("Cannot convert empty AnnData object (0 cells or 0 genes)") mdv = MDVProject(folder, delete_existing=delete_existing) # If not deleting existing, preserve current views current_views = None if not delete_existing: current_views = mdv.views else: current_views = {} # create datasource 'cells' cell_table = scanpy_object.obs cell_table["cell_id"] = cell_table.index # add any dimension reduction to the dataframe cell_table = _add_dims(cell_table, scanpy_object.obsm, max_dims) # cell_id is the unique barcode and should of type unique # (will be text16 by default if number of values are below 65536) # hopefully other columns will be of the correct format columns=[{ "name":"cell_id", "datatype":"unique" }] mdv.add_datasource(f"{label}cells", cell_table,columns) # create datasource 'genes' gene_table = scanpy_object.var #need a way of detecting which column is the common gene name #most times it is the index but sometimes this is just the gene code or an #incremental number. if gene_identifier_column and not gene_identifier_column in gene_table.columns: print(f"gene identifier column {gene_identifier_column} not found, using index") gene_identifier_column= None if not gene_identifier_column: gene_identifier_column= f"{label}name" gene_table[gene_identifier_column] = gene_table.index gene_table = _add_dims(gene_table, scanpy_object.varm, max_dims) #originally column had to be unique - but now is just text #need to coerce gene name column to unique #columns=[{"name":"name","datatype":"text16"}] mdv.add_datasource(f"{label}genes", gene_table) # link the two datasets mdv.add_rows_as_columns_link(f"{label}cells", f"{label}genes", gene_identifier_column, "Gene Expr") #get the matrix in the correct format print("Getting Matrix") matrix,sparse= get_matrix(scanpy_object.X) #sometimes X is empty - all the data is in the layers assert matrix is not None # asserting here so that 'invalid_adata' test gets expected error # don't want to faff about with more complex logic for permutation with add_layer_data just now if matrix.shape[1] !=0: # add the gene expression print("Adding gene expression") mdv.add_rows_as_columns_subgroup( f"{label}cells", f"{label}genes", "gs", matrix, name="gene_scores", label="Gene Scores", # sparse=sparse, #this should be inferred from the matrix chunk_data=chunk_data ) #now add layers if add_layer_data: for layer,matrix in scanpy_object.layers.items(): matrix,sparse = get_matrix(matrix) print(f"Adding layer {layer}") mdv.add_rows_as_columns_subgroup( f"{label}cells", f"{label}genes", layer, matrix, chunk_data=chunk_data ) if delete_existing: # If we're deleting existing, create new default view mdv.set_view("default", {"initialCharts": {"cells": [], "genes": []}}, True) mdv.set_editable(True) else: # If we're not deleting existing, update existing views with new datasources new_views = {} for view_name, view_data in current_views.items(): new_view_data = copy.deepcopy(view_data) # Initialize new charts if they don't exist if "initialCharts" not in new_view_data: new_view_data["initialCharts"] = {} # Add new datasources to initialCharts new_view_data["initialCharts"][f"{label}cells"] = [] new_view_data["initialCharts"][f"{label}genes"] = [] # Initialize dataSources if they don't exist if "dataSources" not in new_view_data: new_view_data["dataSources"] = {} # Add new datasources with panel widths new_view_data["dataSources"][f"{label}cells"] = {"panelWidth": 50} new_view_data["dataSources"][f"{label}genes"] = {"panelWidth": 50} new_views[view_name] = new_view_data return mdv
[docs] def convert_mudata_to_mdv(folder,mudata_object,max_dims=3,delete_existing=False, chunk_data=False): md=mudata_object p= MDVProject(folder,delete_existing=delete_existing) #are there any general drs table = _add_dims(md.obs,md.obsm,max_dims) #add drs derived from modalities for name,mod in md.mod.items(): table = _add_dims(table,mod.obsm,max_dims,name) md.obs["cell_id"] = md.obs.index columns= [{"name":"cell_id","datatype":"unique"}] p.add_datasource("cells",table,columns) for mod in md.mod.keys(): mdata = md.mod[mod] #add the modality to project p.add_datasource(mod,mdata.var) #adds the index to the data as a name column #This is usually the unique gene name - but not always column ="name" #no longer unique #column = {"name":"name","datatype":"unique"} p.set_column(mod,column,mdata.var.index) #mod is used as both the tag and the label #the name column is specified as the identifier that the user will use #it is derived from the index and is usually the gene 'name' #However it may not be appropriate can be changed later on p.add_rows_as_columns_link("cells",mod,"name",mod) matrix,sparse= get_matrix(mdata.X) #sometimes X is empty - all the data is in the layers if matrix.shape[1] !=0: p.add_rows_as_columns_subgroup("cells",mod,mod+"_expr",matrix,sparse=sparse, chunk_data=chunk_data) #now add the layers (if there are any) layers = mdata.layers.keys() for layer in layers: matrix = mdata.layers[layer] matrix,sparse = get_matrix(matrix,mdata.obs_names,md.obs_names) p.add_rows_as_columns_subgroup("cells",mod,f"{mod}_{layer}",matrix,sparse=sparse, chunk_data=chunk_data) return p
# The main_names correspond to obs_names in the main anndata object # and the mod_names those in the modality. # Only required if data is being added from a modality,as sometimes # the modality's obs_names will be in a different order and/or a subset of the main names # Hence a sparse matrix corresponding to the main indices needs to be created
[docs] def get_matrix(matrix,main_names=[],mod_names=[]): #check where the matrix data actually is matrix = matrix.value if hasattr(matrix,"value") else matrix #is the matrix backed sparse matrix -convert to non backed else cannot convert if hasattr(matrix,"backend"): matrix=matrix._to_backed() #not a sparse matrix - nothing else to do if not scipy.sparse.issparse(matrix): return matrix,False #will convert sparse (and dense matrixes) to the csc #format required by MDV if not isinstance(matrix, scipy.sparse.csc_matrix): matrix = scipy.sparse.csc_matrix(matrix) #check indexes are in sync and if so just return the csc matrix if list(main_names) == list(mod_names): return matrix,True #create lookup of mod indices to main indices main_map = {name: i for i, name in enumerate(main_names)} lookup = np.array([main_map[name] for name in mod_names]) # Apply the lookup to the entire mod indices array using vectorized approach indices = lookup[matrix.indices] # create a new sparse matrix matrix = scipy.sparse.csc_matrix((matrix.data, indices, matrix.indptr), shape=(len(main_names), matrix.shape[1]), dtype=matrix.dtype) return matrix,True
[docs] def convert_vcf_to_df(vcf_filename: str) -> pd.DataFrame: f = open(vcf_filename, "r") metadata = {} while True: line = f.readline() if line.startswith("##"): key, value = line[2:].strip().split("=", 1) if key in metadata: metadata[key].append(value) else: metadata[key] = [value] if line.startswith("#CHROM"): break # todo - do something with metadata # print(metadata) temp_file = "temp.csv" ## todo with tempfile with open(temp_file, "w") as tmp: while line: tmp.write(line) line = f.readline() f.close() df = pd.read_csv(temp_file, sep="\t") os.remove(temp_file) return df
[docs] def compute_vcf_end(df: pd.DataFrame) -> pd.DataFrame: """ Compute the end position of the variant determined from 'POS', 'REF' and 'ALT'. This is added as a column 'END' in the given DataFrame. """ def varlen(s) -> int: return max([len(v) for v in str(s).split(",")]) df["END"] = df["POS"] + df[["REF", "ALT"]].map(varlen).max(axis=1) return df
[docs] def convert_vcf_to_mdv(folder: str, vcf_filename: str) -> MDVProject: """ Converts a VCF file to an MDV project. The VCF file must be tab-delimited, with the header lines starting with "##" and column names in the line starting with "#CHROM". An 'END' column is derived, which is the end position of the variant determined from 'POS', 'REF' and 'ALT'. """ p = MDVProject(folder, delete_existing=True) df = convert_vcf_to_df(vcf_filename) # for single nucleotide variants, we still need an end position for the genome browser # maybe add_genome_browser should be able to understand only one position? # other forms VCF variants have a length, so we could use that... df = compute_vcf_end(df) # ^^ I should verify that this makes sense from biological perspective columns = [ {"name": "#CHROM", "datatype": "text"}, # chromosome {"name": "POS", "datatype": "integer"}, # start of the variant { "name": "END", "datatype": "integer", }, # not standard VCF, but useful for genome browser(? maybe temporary) { "name": "ID", "datatype": "unique", "delimiter": ";", }, # should be unique, but also semicolon-delimited - could be useful to preserve this {"name": "REF", "datatype": "multitext", "delimiter": ","}, # reference base(s) { "name": "ALT", "datatype": "multitext", "delimiter": ",", }, # comma-delimited list of alternate non-reference alleles { "name": "QUAL", "datatype": "double", }, # phred-scaled quality score for the assertion made in ALT { "name": "FILTER", "datatype": "multitext", "delimiter": ";", }, # PASS or semicolon-delimited list of filters that the variant has failed { "name": "INFO", "datatype": "multitext", "delimiter": ",", }, # comma-delimited list of additional information, no white space, semicolons or equals signs permitted # ^^^ note, the first random vcf file I found has all manner of = and ; in the INFO field, so I'm not enclined to parse this too rigidly # {'name': 'FORMAT', 'datatype': 'text'}, # not sure why copilot thought this should be here - not in the VCF spec ] name = os.path.basename(vcf_filename) p.add_datasource(name, df, columns=columns) p.add_genome_browser(name, ["#CHROM", "POS", "END"]) p.set_editable(True) return p
[docs] def create_regulamentary_project_from_pipeline( output, config, results_folder, atac_bw=None, peaks="merge", genome="hg38", openchrom="DNase" ): """Creates a regulamentary project from pipeline outputs. Args: output (str): Path to the directory which will house the MDV Project config (str): Path to the YAML configuration file. results_folder (str): Base path to the results directory. atac_bw (str, optional): Path to ATAC-seq bigWig file. Defaults to None. peaks (str, optional): Name of the peaks subdirectory. Defaults to "merge". genome (str, optional): Genome assembly version to use. Defaults to "hg38". openchrom (str, optional): Name of the open chromatin mark. Defaults to "DNase". Returns: An MDVProject """ fold = join(results_folder, peaks) marks = ["H3K4me1", "H3K4me3", "H3K27ac", "CTCF", "ATAC"] # Load configuration YAML with open(config, 'r') as file: info = yaml.safe_load(file) # was this created from bigwigs alone? from_bw = True if info.get("bigwigs") else False # Get the bed files for each mark # beds have been created in the pipeline if from_bw and info["bigwigs"].get("create_bed_files"): beds= {mark:join(results_folder,"bed_files",f"{mark}.bed") for mark in marks} #beds have been specified in the config else: beds = {mark: info["union_peaks"].get(f"bed_{mark}") for mark in marks} # Get the bigWig files for each mark if from_bw: #bigwigs including ATAC bigwig specified in 'bigwigs' entry bigwigs={mark: info["bigwigs"].get(mark) for mark in marks} else: #otherwise bigwigs are specified in the 'compute_matrix_bigwigs' entry bigwigs = {mark: info["compute_matrix_bigwigs"].get(f"bigwig_{mark}") for mark in marks} bigwigs["ATAC"] = atac_bw # Override ATAC with provided file if given # Set the path to the regulatory table table = join(fold, "08_REgulamentary", "mlv_REgulamentary.csv") # Determine genome version, considering blacklist genome override gen = genome bl = info.get("remove_blacklist") if bl and bl.get("genome"): gen = bl.get("genome") # Define matrix data source and region order matrix = { "data": join(fold, "09_metaplot", "matrix.csv"), "order": join(fold, "04_sort_regions", "sort_union.bed"), "marks": ["H3K4me1", "H3K4me3", "H3K27ac", "CTCF"] } return create_regulamentary_project( output, table, bigwigs, beds, matrix, openchrom=openchrom, genome=gen )
[docs] def create_regulamentary_project( output: str, table, bigwigs, beds, matrix=None, openchrom="DNase", marks = None, mark_colors = None, genome="hg38" ): """ Creates a regulatory project visualization from input data sources. This method constructs a project using signal and peak files for various histone marks and chromatin accessibility, adds them as data sources and tracks, and configures a genome browser and visualization views. Args: output (str): Output directory or file for the project. table (str): Path to the CSV table containing regulatory element data. bigwigs (dict): Dictionary mapping mark names to bigWig file paths or URLs. beds (dict): Dictionary mapping mark names to BED file paths. matrix (dict or None, optional): Matrix and order file information for heatmaps, or None. openchrom (str, optional): Name for open chromatin mark. Defaults to "DNase". marks (list of str, optional): List of marks to process. Defaults to `["ATAC", "H3K4me1", "H3K4me3", "H3K27ac", "CTCF"]`. mark_colors (list of str, optional): List of colors for the marks. Defaults to a preset palette. genome (str, optional): Genome assembly to use. Defaults to "hg38". Returns: MDVProject: The project object constructed with the given data and views. """ if marks is None: marks = ["ATAC", "H3K4me1", "H3K4me3", "H3K27ac", "CTCF"] if mark_colors is None: mark_colors = ["#eb9234", "#349beb", "#3aeb34", "#c4c41f", "#ab321a"] # Get the template directory tdir = join(split(os.path.abspath(__file__))[0], "templates") p = MDVProject(output, delete_existing=True) # Load regulatory elements table mdv = pd.read_csv(table, sep="\t") columns = [{"name": "start", "datatype": "int32"}, {"name": "end", "datatype": "int32"}] p.add_datasource("elements", mdv, columns) default_tracks = [] for mark, color in zip(marks, mark_colors): name = mark if mark != 'ATAC' else openchrom bw = bigwigs.get(mark) if bw: url = bw if not bw.startswith("http"): fname = split(bw)[1] shutil.copy(bw, join(p.trackfolder, fname)) url = f"./tracks/{fname}" default_tracks.append( { "url": url, "short_label": f"{name} cov", "color": color, "height": 60, "track_id": f"coverage_{mark}" } ) bed = beds.get(mark) if bed: url = bed if not bed.startswith("http"): fname = split(bed)[1] # Process BED files for browser compatibility if bed.endswith(".bed"): # Bed file processing: remove header and keep first 3 columns df = pd.read_csv(bed, sep="\t", header=None) first_row = df.iloc[0] cell_str = str(first_row[1]).strip() has_header = not cell_str.isdigit() if has_header: df = df.iloc[1:] df = df.iloc[:, :3] t_file = join(p.trackfolder, f"{fname}.temp") o_file = join(p.trackfolder, fname) df.to_csv(t_file, sep="\t", header=False, index=False) create_bed_gz_file(t_file, o_file) os.remove(t_file) url = f"./tracks/{fname}.gz" else: to_file = join(p.trackfolder, fname) shutil.copyfile(bed, to_file) # Copy tabix index if present if bed.endswith(".gz"): shutil.copyfile(f"{bed}.tbi", f"{to_file}.tbi") url = f"./tracks/{fname}" default_tracks.append( { "url": url, "short_label": f"{name} peaks", "color": color, "height": 15, "featureHeight": 5, "track_id": f"peaks_{mark}" } ) extra_params = { "default_tracks": default_tracks, "default_parameters": { "view_margins": {"type": "fixed_length", "value": 4000}, "color_by": "RE", "color_legend": {"display": False, "pos": [5, 5]}, "feature_label": "RE", }, } p.add_genome_browser( "elements", ["chromosome", "start", "end"], extra_params=extra_params ) p.add_refseq_track("elements", genome) if matrix: _create_dt_heatmap(p, matrix) # Load and extend visualization views with open(join(tdir, "views", "regulamentary.json")) as f: view = json.load(f) if not matrix: view = [x for x in view if x["type"] != "deeptools_heatmap"] gb = p.get_genome_browser("elements") gb.update( { "id": "browser", "size": [389, 550], "position": [1046, 8], "title": "elements", } ) view.append(gb) p.set_view( "default", { "initialCharts": { "elements": view, } }, ) return p
[docs] def _create_dt_heatmap( project, matrix, ds="elements", ifields = None ): if ifields is None: ifields = ["chromosome", "start", "end"] # get the order of the regions in the matrix data = {x:project.get_column(ds,x) for x in ifields} mdv_i = pd.DataFrame(data) mdv_i = mdv_i.set_index(ifields) order = pd.read_csv(matrix["order"], sep="\t", header=None) order = order.set_index([0, 1, 2]) order = order[order.index.isin(mdv_i.index)] mdv_i = mdv_i.reset_index() mdv_i["row_position"] = mdv_i.index arr = order.index.map( mdv_i.set_index(["chromosome", "start", "end"])["row_position"] ) arr = np.array(arr, dtype=np.int32) # add the binary data output = join(project.dir, "binarydata", "elements") os.makedirs(output, exist_ok=True) hm = gzip.open(join(output, "heat_map.gz"), "wb") hm.write(arr.tobytes()) # get the matrix and drop last two columns mt = pd.read_csv(matrix["data"], sep="\t") mt = mt.iloc[:, :1600] # flatten and halve in size (mean of adjacent values) arr = mt.values.flatten() arr = arr.reshape(-1, 2) arr = np.mean(arr, axis=1) # normalize and clip to 255 (1 byte per value) mx = np.percentile(arr, 99.99) arr = (arr / mx) * 255 arr = np.clip(arr, 0, 255) arr = arr.astype(np.uint8) hm.write(arr.tobytes()) hm.close() # add the metadata md = project.get_datasource_metadata(ds) md["deeptools"] = { "maps": { "default": { "data": "heat_map", "rows": md["size"], "cols": 800, "groups": matrix["marks"], "max_scale": mx, } } } md["binary_data_loader"] = True project.set_datasource_metadata(md)
[docs] def _add_dims(table, dims, max_dims,stub=""): if len(dims.keys()) == 0: return table #stub is if there is more than one modality e.g. #RNA UMAP, ATAC UMAP if stub: stub=stub+":" for dname,data in dims.items(): #not sure whats going on here but cannot extract dims if len(data.shape) == 1: continue num_dims= min(max_dims,data.shape[1]) #name the columns cols = [f"{stub}{dname}_{i+1}" for i in range(num_dims)] dim_table= pd.DataFrame() #can either be an ndarray or a dataframe if isinstance(data, np.ndarray): dim_table = pd.DataFrame(data[:, 0:num_dims]) dim_table.index = dims.dim_names elif isinstance(data, pd.DataFrame): # Should already have correct index dim_table = data.iloc[:, 0:num_dims] else: print (f"unrecognized dimension reduction format - {type(data)} for dim reduction {dname} ") continue dim_table.columns=cols #merge the tables to ensure the dims are in sync with the main table #perhaps only necessary with mudata objects but will do no harm table = table.merge(dim_table, left_index=True, right_index=True, how= "left") return table