Source code for src.read.strand_motifs_productions_trajectory

import numpy as np
from json import loads
from typing import Union

from ..obj.units import make_unit
from ..utils.manage_strand_reactor_files import read_txt

from .strand_motifs_trajectory import _extract_upper_and_lower_strands_as_continuous_strings_from_structure_of_complex
from .strand_motifs_trajectory import _transform_motif_string_to_index_tuple

from ..obj.times_vector import TimesVector
from ..obj.motif_production_vector import MotifProductionVector
from ..obj.motif_production_vector import _create_empty_motif_production_dict
from ..obj.motif_production_trajectory import MotifProductionTrajectory

from ..domains.motif_production_space import make_motif_production_dct, _production_channel_id
from ..domains.motif_space import _return_motif_categories, _motif_categories

[docs] def strand_motifs_productions_trajectory(filepaths : list, alphabet : list, motiflength : int, maximum_ligation_window_length : int ) -> MotifProductionTrajectory: time_unit = make_unit('t_0') motif_production_vectors = [] total_physical_times = [] for filepath in filepaths: step, current_physical_times, ligstr = _step_simulation_and_ligation_strands_array(filepath) total_physical_times = total_physical_times + list(current_physical_times) for time_index in range(len(current_physical_times)): motif_production_vectors = motif_production_vectors + [_extract_motif_production_vector(ligstr[time_index], motiflength, alphabet, maximum_ligation_window_length)] total_physical_times = TimesVector(np.asarray(total_physical_times), time_unit) return MotifProductionTrajectory(motif_production_vectors,total_physical_times)
def _calculate_ligation_window_length_and_spot_and_motifs(ending_strand : str, leaving_strand : str, template_strand_1st_part : str, template_strand_2nd_part :str, motiflength : int, maximum_ligation_window_length : int ) -> Union[int,int,str,str,str,str,str]: """ Returns: -------- ligation_window_length : int, ligation_spot : int, product_motif : str, ending_motif : str, leaving_motif : str template_motif_1st_part : str, template_motif_2nd_part : str """ left_ligation_window_length = min( len(ending_strand), len(template_strand_2nd_part) ) + 1 right_ligation_window_length = min( len(leaving_strand), len(template_strand_1st_part) ) + 1 ligation_window_length = left_ligation_window_length + right_ligation_window_length if ligation_window_length > maximum_ligation_window_length: if (left_ligation_window_length > (maximum_ligation_window_length - maximum_ligation_window_length//2)): if right_ligation_window_length > maximum_ligation_window_length//2: left_ligation_window_length = (maximum_ligation_window_length - maximum_ligation_window_length//2) right_ligation_window_length = maximum_ligation_window_length//2 else: left_ligation_window_length = (maximum_ligation_window_length - right_ligation_window_length) else: right_ligation_window_length = maximum_ligation_window_length - left_ligation_window_length ligation_window_length = left_ligation_window_length + right_ligation_window_length ligation_spot = left_ligation_window_length-1 ending_motif = ending_strand[-(motiflength-1):] leaving_motif = leaving_strand[:(motiflength-1)] product_motif = ending_motif[-left_ligation_window_length:] + leaving_motif[:right_ligation_window_length] template_motif_1st_part = template_strand_1st_part[-right_ligation_window_length:] template_motif_2nd_part = template_strand_2nd_part[:left_ligation_window_length] return ligation_window_length, ligation_spot, product_motif, ending_motif, leaving_motif, template_motif_1st_part, template_motif_2nd_part def _extract_motif_production_vector(ligstr, motiflength : int, alphabet : list, maximum_ligation_window_length : int ) -> MotifProductionVector: motif_production_dct = _create_empty_motif_production_dict(motiflength, alphabet, maximum_ligation_window_length) for reaction_index in range(len(ligstr)): (central_produced_motif, ending_motif, leaving_motif, template_motif, production_channel_id ) = _motif_production_reactants( *ligstr[reaction_index], motiflength, maximum_ligation_window_length ) production_index = _transform_motif_string_to_index_tuple(central_produced_motif+template_motif, alphabet) try: motif_production_dct[production_channel_id][production_index] += 1 except KeyError: print(motiflength) print(motif_production_dct.keys()) print(production_index) print(central_produced_motif) print(template_motif) print(production_channel_id) raise KeyError motif_production_vector = MotifProductionVector(motiflength,alphabet, '1', maximum_ligation_window_length) return motif_production_vector(motif_production_dct) def _load_complex(cc): cc = '['+str(cc)+']' return cc.replace('”','"').replace('−','-').replace("None","0") def _read_key(keys : str) -> Union[tuple,str]: ''' returns a tuple of the first ligation statistics stated in keys according to the formula below and the remaining string key: ll-lr|LS-LR|LT|nl-nr|sl-sr|kl-kr|lt|es-ls:n, value: # ll = length of left segment lr = lenght of right segment LL = length of left strand (note: segment != strand) LR = lnegth of right strand (note: segment != strand) LT = length of template strand (note: segment != strand) nl = number of mismatches of left segment nr = number of mismatches of right segment sl = number of mismatches left of ligation site sr = number of mismatches right of ligation site kl = dehyb rate of left segment kr = dehyb rate of right segment lt = ligation triplex es = ending strand ls = leaving strand n = occupation number? PARAMETERS: ----------- keys : string string in the format ll-lr|LS-LR|LT|nl-nr|sl-sr|kl-kr|lt|es-ls:n, and eventually further ligations afterwards RETURN: ------- rtrn : tuple in the format (ll,lr,LS,LR,LT,nl,nr,sl,sr,kl,kr,lt,es,ls,n) keys : string remaining string, i.e. the original string after lstripping the rtrn tuple ''' separators = ['-','|','-','|','|','-','|','-','|','-','|[',']|','-',':',','] types = [int,int,int,int,int, int,int,int,int,np.float64,np.float64, _load_complex,str,str,int] rtrn = () for ii in range(len(separators)): current, _, keys = keys.partition(separators[ii]) rtrn += (types[ii](current),) return rtrn, keys def _ligation_statistics(fname : str = None, skiprows : int = 2) -> Union[np.ndarray,np.ndarray,list]: ''' reads the "ligation_statistics.txt" output file and returns the output as lists PARAMETERS: ----------- fname : string (optional) path to the ligation_statistics file default : './data/ligation_statistics.txt' skiprows : int, optional Skip the first `skiprows` lines, including comments when reading the file; default : 2 RETURN: ------- step : np.array the simulation steps total_physical_time : array corresponding physical times ligation_statistics : list of tuples ligation_statistics at corresponding simulation step in the format (ll,lr,LS,LR,LT,nl,nr,sl,sr,kl,kr,lt,es,ls,n) ''' replace=[['step = ',''],[' simulation_time = ',';'],['\n{',';{'],[' ',''],[';',' ']] x = read_txt(fname, replace=replace) x = np.array(x.split()).reshape(-1,3).transpose() step = np.array(x[0], dtype = float) total_physical_time = np.array(x[1], dtype = float) complexes=x[2] ligation_statistics = [[]]*len(step) for ii in range(len(step)): keys = complexes[ii].lstrip('{').rstrip('}') ligation_statistics[ii] = [] while keys != '': lg, keys = _read_key(keys) ligation_statistics[ii].append(lg) return step, total_physical_time, ligation_statistics def _step_simulation_and_ligation_strands_array(fname : str =None, skiprows : int = 2) -> Union[list,list,list]: """ Reads the ligation file and returns its content. PARAMETERS: ----------- fname : string The filename (and path) of the ligation_statistics file default: None skiprows : int Integer of how many lines are skipped in the ligation_statistics file because they indicate the columns default : 2 RETURNS: -------- step : integer list Computation steps at which ligations have been measured total_physical_time : list The physical time at those steps ligation_strands : list List of tuples of the format (ending_strand, leaving_strand, leaving_template, ending_template) Like the output of _splitted_template_strand """ step, total_physical_time, ligation_statistics = _ligation_statistics(fname=fname, skiprows=2) rtrn = [[]]*len(step) for ii in range(len(step)): rtrn[ii] = [] if len(ligation_statistics[ii])!=0: for ls in ligation_statistics[ii]: binary_complex = ls[-4] ending_segment = ls[-3] leaving_segment = ls[-2] rtrn[ii].append(tuple( _strip_zeros(sequence) for sequence in _splitted_template_strand(binary_complex, ending_segment, leaving_segment) )) return step, total_physical_time, rtrn def _strip_zeros(sequence : str): return sequence.lstrip('0').rstrip('0') def _ligation_spot(binary_complex, ending_segment, leaving_segment): ''' returns the index of the ligation spot in terms of segments ''' try: binary_complex = loads(binary_complex) except: print("binary_complex cannot be interpreted by json.loads. I assume, it already is a list.") # check indices of upper strand index_ending_segment = np.where(np.array(binary_complex)==('5'+ending_segment+'3')) index_leaving_segment = np.where(np.array(binary_complex)==('5'+leaving_segment+'3')) ligation_spot = [] if len(index_ending_segment[0])>0 and len(index_leaving_segment[0])>0: # check that the indices fulfill i2-i1=2 # and that in between there is a ligation spot for ii in index_ending_segment[0][index_ending_segment[1]==0]+1: if ii in index_leaving_segment[0][index_leaving_segment[1]==0]-1: if binary_complex[ii] == ["|","-"]: ligation_spot.append([ii,0]) # check indices of lower strand index_ending_segment = np.where(np.array(binary_complex)==('3'+ending_segment[::-1]+'5')) index_leaving_segment = np.where(np.array(binary_complex)==('3'+leaving_segment[::-1]+'5')) if len(index_ending_segment[0])>0 and len(index_leaving_segment[0])>0: # check that the indices fulfill i2-i1=2 # (note that in the lower strand we go from right to left) # and that in between there is a ligation spot for ii in index_ending_segment[0][index_ending_segment[1]==1]-1: if ii in index_leaving_segment[0][index_leaving_segment[1]==1]+1: if binary_complex[ii] ==["-","|"]: ligation_spot.append([ii,1]) if len(ligation_spot) > 1: print("Warning: more than one possible ligation spot.") elif len(ligation_spot) == 0: print("Warning: Did not find any one possible ligation spot.") print(binary_complex,"\n", ending_segment,"\n", leaving_segment) return np.array(ligation_spot).transpose() def _splitted_template_strand(binary_complex : str, ending_segment : str, leaving_segment : str) -> Union[str,str,str,str]: ''' gives the leaving strand, the ending strand and the two parts of the template in clockwise direction, i.e. ending_strand | leaving_strand etalpmet_gnidne - etalpmet_gnivael resp. if ligation_happens_in_lower_strand leaving_template - ending_template dnarts_gniveal | dnarts_gnidne PARAMETERS: ----------- binary_complex : string ending_segment : string leaving_segment : string RETURNS: -------- ending_strand : string left strand from 5' to 3' leaving_strand : string right strand from 5' to 3' leaving_template : string left template strand from 5' to 3' ending_template : string right template strand from 5' to 3' ''' ligation_spot = _ligation_spot(binary_complex, ending_segment, leaving_segment) if ligation_spot[1][0] != 1 and ligation_spot[1][0]!=0: raise NotImplementedError("The ligation spot seems to be neither in the lower nor the upper strand") if len(ligation_spot[0])>1: print("I'll consider the first ligation spot") ligation_happens_in_lower_strand = bool(ligation_spot[1][0]) ligation_spot = ligation_spot[0][0] try: binary_complex = loads(binary_complex) except: pass try: binary_complex = np.array(binary_complex) except: pass left_complexpart = binary_complex[:ligation_spot] right_complexpart = binary_complex[ligation_spot+1:] if ligation_happens_in_lower_strand: leaving_template , leaving_strand = _extract_upper_and_lower_strands_as_continuous_strings_from_structure_of_complex(left_complexpart) ending_template, ending_strand = _extract_upper_and_lower_strands_as_continuous_strings_from_structure_of_complex(right_complexpart) else: ending_strand, ending_template = _extract_upper_and_lower_strands_as_continuous_strings_from_structure_of_complex(left_complexpart) leaving_strand, leaving_template = _extract_upper_and_lower_strands_as_continuous_strings_from_structure_of_complex(right_complexpart) ending_strand = _strip_zeros(ending_strand).split('0')[-1] leaving_strand = _strip_zeros(leaving_strand).split('0')[0] leaving_template = _strip_zeros(leaving_template).split('0')[-1] ending_template = _strip_zeros(ending_template).split('0')[0] return ending_strand, leaving_strand, leaving_template, ending_template def _determine_production_channel_id( ending_motif : str, leaving_motif : str, template_motif_1st_part : str, template_motif_2nd_part : str, ligation_window_length : int, ligation_spot : int, motiflength:int, )->str: motif_categories = [motif_category.format(ligation_window_length-2) for motif_category in _motif_categories()] if len(ending_motif)<ligation_spot+1: product_begins = True else: product_begins = False if len(leaving_motif)<ligation_window_length-(ligation_spot+1): product_ends = True else: product_ends = False if len(template_motif_2nd_part)<ligation_spot+1: template_ends = True else: template_ends = False if len(template_motif_1st_part)<ligation_window_length-(ligation_spot+1): template_begins = True else: template_begins = False if product_begins or not product_ends: product_category = motif_categories[-2-product_begins-product_ends] else: product_category = motif_categories[-1] if template_begins or not template_ends: template_category = motif_categories[-2-template_begins-template_ends] else: template_category = motif_categories[-1] return _production_channel_id(product_category, template_category, ligation_window_length, ligation_spot) def _motif_production_reactants(ending_sequence : str, leaving_sequence : str, leaving_template_sequence : str, ending_template_sequence : str, motiflength : int, maximum_ligation_window_length : int ) -> Union[str,str,str,str,str]: """ Returns: -------- product_motif : str, ending_motif : str, leaving_motif : str, template_motif : str, production_channel_id : str """ ending_motif = ending_sequence[-(motiflength-1):] leaving_motif = leaving_sequence[:(motiflength-1)] (ligation_window_length, ligation_spot, product_motif, ending_motif, leaving_motif, template_motif_1st_part, template_motif_2nd_part ) = _calculate_ligation_window_length_and_spot_and_motifs( ending_sequence, leaving_sequence, leaving_template_sequence, ending_template_sequence, motiflength, maximum_ligation_window_length ) production_channel_id = _determine_production_channel_id( ending_motif, leaving_motif, template_motif_1st_part, template_motif_2nd_part, ligation_window_length, ligation_spot, motiflength ) template_motif = template_motif_1st_part+template_motif_2nd_part return product_motif, ending_motif, leaving_motif, template_motif, production_channel_id