Source code for src.get.strand_motifs_trajectory_ensemble

from os.path import exists

from .alphabet import alphabet as get_alphabet
from ..obj.motif_trajectory_ensemble import load_motif_trajectory_ensemble, MotifTrajectoryEnsemble
from ..read.strand_motifs_trajectory_ensemble import strand_motifs_trajectory_ensemble as read_strand_motifs_trajectory_ensemble
from ..utils.manage_strand_reactor_files import _create_typical_strand_trajectory_section_complexes_filepath 
from ..utils.save import create_trajectory_ensemble_path

[docs] def strand_motifs_trajectory_ensemble( motiflength : int, strand_trajectory_id : str, param_file_no : int = 0, skiprows : int = 2, execution_time_path : str = None, **kwargs ) -> MotifTrajectoryEnsemble: alphabet = get_alphabet(strand_trajectory_id) #check if already in archive archive_path = create_trajectory_ensemble_path( strand_trajectory_id=strand_trajectory_id, param_file_no=param_file_no, motiflength=motiflength ) + 'sd/' if exists(archive_path): return load_motif_trajectory_ensemble(archive_path) print('Reading MotifsTrajectoryEnsemble from strand reactor data...') current_filepath = lambda srn, sn : _create_typical_strand_trajectory_section_complexes_filepath( strand_trajectory_id, param_file_no=param_file_no, simulations_run_no=srn, simulations_no = sn, ) simulations_run_no = 0 simulations_no = 0 filepath_lists = [] while exists(current_filepath(simulations_run_no, simulations_no)): filepaths = [] while exists(current_filepath(simulations_run_no, simulations_no)): filepaths += [current_filepath(simulations_run_no, simulations_no),] simulations_no += 1 if execution_time_path is not None: with open(execution_time_path + 'execution_time.txt','a') as f: f.write(f'\nsimulations_run_no {simulations_run_no}: '+str(simulations_no)) filepath_lists = filepath_lists + [filepaths,] simulations_run_no += 1 simulations_no = 0 return read_strand_motifs_trajectory_ensemble(filepath_lists, alphabet, motiflength, skiprows =skiprows)