Source code for freeflux.core.model

'''Define the Model class.'''


[docs] __author__ = 'Chao Wu'
import re from functools import lru_cache, reduce from collections import OrderedDict, deque from collections.abc import Iterable from itertools import chain import numpy as np import pandas as pd from sympy import Integer from multiprocessing import Pool from .metabolite import Metabolite from .reaction import Reaction from .emu import EMU from ..io.inputs import read_model_from_file from ..analysis.simulate import Simulator from ..analysis.inst_simulate import InstSimulator from ..analysis.fit import Fitter from ..analysis.inst_fit import InstFitter from ..optim.optim import Optimizer
[docs] class Model(): ''' Model class is the fundamental class of FreeFlux hosting operations for 13C MFA. A model can be built by adding reactions one by one or reading set of reactions from .tsv or .xlsx file. Parameters ---------- name: str Model name. Attributes ---------- name: str Model name. metabolites_info: dict Metabolite ID => list of Metabolites. reactions_info: OrderedDict Reaction ID => Reaction. metabolites: list Metabolite IDs, in alphabetical order. metabolites_with_atoms: list IDs of metabolite with atom assignment, in alphabetical order. end_substrates: list Initial substrates of the model, in alphabetical order. end_products: list Final products of the model, in alphabetical order. reactions: list Reaction IDs, in order of addition. n_metabolites: int # of metabolites. n_reactions: int # of reactions. _full_net_stoichiometric_matrix: df Complete stoichiometric matrix for net reaction with all metabolites in rows, net reactions in columns. _full_total_stoichiometric_matrix: df Complete stoichiometric matrix for total reaction with all metabolites in rows, total reactions in columns. metabolite_adjacency_matrix: df Metabolite adjacency matrix (MAM). Metabolites with atoms are in index and columns (no duplicates). List of Reactions are in cells if reactions exists between (sub, pro), [] otherwise. net_fluxes_bounds: dict Reaction ID => [lb, ub] by setting. net_fluxes_range: dict Reaction ID => estimeted [lb, ub]. All required net fluxes are included. netfluxids: list Net flux IDs, alias of reactions. concentrations: ser Concentrations, metabolite ID => float. concentrations_bounds: dict Metabolite ID => [lb, ub] by setting. concentrations_range: dict Metabolite ID => [lb, ub]. All required concentrations are included. concids: list Concentration IDs (used only in computation process). total_fluxes: ser Total fluxes, fluxe ID (e.g., 'v1_f' or 'v2') => float. totalfluxids: list Total flux IDs. target_EMUs: list Target EMU IDs. timepoints: list Sorted time points for MDV simulation. substrate_MDVs: dict Substrate EMU => MDV. substrate_MDVs_der_p: dict Substrate EMU => derivatives of substrate MDV w.r.t. variables in shape of (len(MDV), # of vars). # of vars = # of free fluxes for steady state MFA; # of vars = # of free fluxes + # of concentrations for INST MFA. initial_matrix_Xs: dict Size => initial MDVs of EMU in matrix X, i.e., the natural MDVs. The initial MDV matrix has the same shape of matrix X. initial_matrix_Ys: dict Size => initial MDVs of EMU in matrix Y, i.e., either natural MDVs or labeled MDVs. The initial MDV matrix has the same shape of matrix Y. initial_matrix_Xs_der_p, initial_matrix_Ys_der_p: dict Size => 3-D array in shape of (# of vars), X(Y).shape[0], X(Y).shape[1]) which is the initial MDV derivatives of EMUs in matrix X(Y) w.r.t. variables. # of vars = # of free fluxes + # of concentrations for INST MFA. initial_sim_MDVs: dict EMU ID => {t0 => MDV}. MDV of target EMUs at t0. EAMs: dict of df Size => EMU adjacency matrix (EAM). Cells are symbolic expression of flux. matrix_As, matrix_Bs: dict Size => [lambdified matrix A(B), [flux IDs], [EMUs]]. matrix_As_der_p, matrix_Bs_der_p: dict Size => 3-D array in shape of (# of vars, A(B).shape[0], A(B).shape[1]) which is the derivatives of matrix A(B) w.r.t. variables. # of vars = # of free fluxes for steady state MFA; # of vars = # of free fluxes + # of concentrations for INST MFA. matrix_Ms: dict Size => [lambdified matrix M, [metabolite IDs]]. matrix_Ms_der_p: dict Size => 3-D array in shape of (# of vars, M.shape[0], M.shape[1]), which is the derivatives of matrix M w.r.t. variables. # of vars = # of free fluxes for steady state MFA; # of vars = # of free fluxes + # of concentrations for INST MFA. label_atom: str Labeled atom, i.e., the base atom in MDV, currently only supprts "H", "C" and "N". labeling_strategy: dict Metabolite ID => [labeling_pattern(s), percentage(s), purity(s)]. measured_MDVs: dict EMU ID (metabolite ID + '_' + atom NOs) => [means of MDV, SDs of MDV]. measured_MDVs_inv_cov: array Inversed covariance matrix of measured MDVs with variances on the diagnol, other elements are zero. measured_fluxes: dict Irreversible reaction ID => [mean, SD]. measured_fluxes_inv_cov: array Inversed covariance matrix of measured fluxes with variances on the diagnol, other elements are zero. measured_fluxes_der_p: array Derivative of measured fluxes w.r.t. variables in shape of (# of measured fluxes, # of vars), # of vars = # of free fluxes for steady state MFA; # of vars = # of free fluxes + # of concentrations for INST MFA. measured_inst_MDVs: dict EMU ID (metabolite ID + '_' + atom NOs) => {timepoint => [means of MDV, SDs of MDV]}. measured_inst_MDVs_inv_cov: array Inversed covariance matrix of measured concatenated MDVs with variances on the diagnol, other elements are zero. Timepoints are concatenated except t0. null_space: 2-D array Null space of total stoichiometric matrix. transform_matrix: 2-D array Transform matrix letting vnet = transform_matrix*v. ''' def __init__(self, name = 'unnamed'): ''' Parameters ---------- name: str Model name. '''
[docs] self.name = name
[docs] self.reactions_info = OrderedDict()
[docs] self.target_EMUs = []
[docs] self.timepoints = []
[docs] self.substrate_MDVs = {}
[docs] self.substrate_MDVs_der_p = {}
[docs] self.EAMs = {}
[docs] self.matrix_As = {}
[docs] self.matrix_Bs = {}
[docs] self.matrix_Ms = {}
[docs] self.matrix_As_der_p = {}
[docs] self.matrix_Bs_der_p = {}
[docs] self.matrix_Ms_der_p = {}
[docs] self.null_space = None
[docs] self.transform_matrix = None
[docs] self.initial_matrix_Xs = {}
[docs] self.initial_matrix_Ys = {}
[docs] self.initial_matrix_Xs_der_p = {}
[docs] self.initial_matrix_Ys_der_p = {}
[docs] self.initial_sim_MDVs = {}
[docs] self.label_atom = None
[docs] self.labeling_strategy = {}
[docs] self.measured_MDVs = {}
[docs] self.measured_fluxes = {}
[docs] self.measured_inst_MDVs = {}
[docs] self.measured_MDVs_inv_cov = None
[docs] self.measured_inst_MDVs_inv_cov = None
[docs] self.measured_fluxes_inv_cov = None
[docs] self.measured_fluxes_der_p = None
[docs] self.unbalanced_metabolites = set()
[docs] self.net_fluxes_bounds = {}
[docs] self.net_fluxes_range = OrderedDict()
[docs] self.concentrations_bounds = {}
[docs] self.concentrations_range = OrderedDict()
[docs] self.total_fluxes = pd.Series(dtype = float)
[docs] self.concentrations = pd.Series(dtype = float)
[docs] def add_reactions(self, reactions): ''' Parameters ---------- reactions: Reaction or list of Reaction ''' if not isinstance(reactions, list): reactions = [reactions] new_reactions_info = OrderedDict([(rxn.id, rxn) for rxn in reactions]) self.reactions_info.update(new_reactions_info) for rxn in reactions: if rxn.host_models is None: rxn.host_models = set([self]) else: rxn.host_models.add(self)
[docs] def remove_reactions(self, reactions): ''' Parameters ---------- reactions: Reaction or list of Reaction ''' if not isinstance(reactions, list): reactions = [reactions] for rxn in reactions: self.reactions_info.pop(rxn.id) for rxn in reactions: rxn.host_models.remove(self) if not rxn.host_models: rxn.host_models = None break
[docs] def read_from_file(self, file): ''' Parameters ---------- file: file path tsv or excel file with reactions with fields "reaction_ID", "substrate_IDs(atom)", "product_IDs(atom)" and "reversibility". Header line starts with "#", and will be skiped. ''' dataRaw = read_model_from_file(file) data = pd.DataFrame() for col, ser in dataRaw.items(): if ser.dtype == object: data[col] = ser.str.replace(r'\s+', '', regex = True) else: data[col] = ser pattern_re = re.compile(r'([0-9\.]+|)([.\w]+)\(?([a-z0-9\.,]+|)\)?') for rxn, (subsStr, prosStr, rev) in data.iterrows(): v = Reaction(rxn, reversible = bool(int(rev))) for stoy, metab, atoms in pattern_re.findall(subsStr): stoy = float(stoy) if stoy else 1.0 atoms = atoms.split(',') if atoms else None v.add_substrates(Metabolite(metab, atoms), stoy) for stoy, metab, atoms in pattern_re.findall(prosStr): stoy = float(stoy) if stoy else 1.0 atoms = atoms.split(',') if atoms else None v.add_products(Metabolite(metab, atoms), stoy) self.add_reactions(v)
@property
[docs] def metabolites_info(self): ''' Returns ------- metabsInfo: dict metabolite ID => list of Metabolites ''' metabsInfo = {} for _, rxn in self.reactions_info.items(): for subid, sub in rxn.substrates_info['metab'].items(): metabsInfo.setdefault(subid, []).append(sub) for proid, pro in rxn.products_info['metab'].items(): metabsInfo.setdefault(proid, []).append(pro) metabsInfo = {metabid: list(set(metabs)) for metabid, metabs in metabsInfo.items()} return metabsInfo
@property
[docs] def metabolites(self): metabids = [] for _, rxn in self.reactions_info.items(): metabids = metabids + rxn.substrates + rxn.products return sorted(set(metabids))
@property
[docs] def metabolites_with_atoms(self): metabids = [] for _, rxn in self.reactions_info.items(): metabids = metabids + rxn.substrates_with_atoms + rxn.products_with_atoms return sorted(set(metabids))
@property
[docs] def reactions(self): return list(self.reactions_info.keys())
@property
[docs] def n_metabolites(self): return len(self.metabolites)
@property
[docs] def n_reactions(self): return len(self.reactions)
@lru_cache()
[docs] def _full_net_stoichiometric_matrix(self, metabolites, reactions): ''' Re-estimate stoichiometric matrix if metabolites or reactions changes. Parameters ---------- metabolites: tuple metabolite IDs reactions: tuple reaction IDs Returns ------- netS: df Complete stoichiometric matrix for net reaction with all metabolites in rows and net reactions in columns. ''' netS = pd.DataFrame(0.0, index = metabolites, columns = reactions) for rxnid, rxn in self.reactions_info.items(): for sub in rxn.substrates: value = rxn.substrates_info.loc[sub, 'stoy'] if isinstance(value, pd.Series): netS.loc[sub, rxnid] = -value.sum() else: netS.loc[sub, rxnid] = -value for pro in rxn.products: value = rxn.products_info.loc[pro, 'stoy'] if isinstance(value, pd.Series): netS.loc[pro, rxnid] = value.sum() else: netS.loc[pro, rxnid] = value return netS
@lru_cache()
[docs] def _full_total_stoichiometric_matrix(self, metabolites, reactions): ''' Re-estimate stoichiometric matrix if metabolites or reactions changes. Parameters ---------- metabolites: tuple metabolite IDs reactions: tuple reaction IDs Returns ------- totalS: df Complete stoichiometric matrix for total reaction with all metabolites in rows and total reactions in columns. ''' netS = self._full_net_stoichiometric_matrix(metabolites, reactions) totalS = [] totalScols = [] for rxn, col in netS.items(): if self.reactions_info[rxn].reversible: totalS.append(col) totalS.append(-col+0.0) totalScols.append(rxn+'_f') totalScols.append(rxn+'_b') else: totalS.append(col) totalScols.append(rxn) totalS = pd.DataFrame(totalS, index = totalScols).T return totalS
@property
[docs] def end_substrates(self): totalS = self._full_total_stoichiometric_matrix( tuple(self.metabolites), tuple(self.reactions) ) nLessthan0 = totalS[totalS < 0].count(axis = 1) nGreaterthan0 = totalS[totalS > 0].count(axis = 1) return sorted(totalS.index[(nLessthan0 > 0) & (nGreaterthan0 == 0)].tolist())
@property
[docs] def end_products(self): totalS = self._full_total_stoichiometric_matrix( tuple(self.metabolites), tuple(self.reactions) ) nLessthan0 = totalS[totalS < 0].count(axis = 1) nGreaterthan0 = totalS[totalS > 0].count(axis = 1) return sorted(totalS.index[(nGreaterthan0 > 0) & (nLessthan0 == 0)].tolist())
[docs] def get_net_stoichiometric_matrix(self, exclude_metabs = None, include_ends = False): ''' Parameters ---------- exclude_metabs: list or set Excluded metabolites. include_ends: bool Whether to include end metabolites (i.e., initial substrates and final products). Returns netS: df Net stoichiometric matrix with balanced metabolites in rows and net reactions in columns ''' if exclude_metabs is None: exclude_metabs = [] else: exclude_metabs = list(exclude_metabs) netS = self._full_net_stoichiometric_matrix( tuple(self.metabolites), tuple(self.reactions) ) if include_ends: selected_indices = netS.index.difference(exclude_metabs) return netS.loc[selected_indices,:] else: selected_indices = netS.index.difference( exclude_metabs+self.end_substrates+self.end_products ) return netS.loc[selected_indices,:]
[docs] def get_total_stoichiometric_matrix(self, exclude_metabs = None, include_ends = False): ''' Parameters ---------- exclude_metabs: list or set Excluded metabolites. include_ends: bool Whether to include end metabolites (initial substrates and final products). Returns totalS: df Total stoichiometric matrix with balanced metabolites in rows and total reactions in columns ''' if exclude_metabs is None: exclude_metabs = [] else: exclude_metabs = list(exclude_metabs) totalS = self._full_total_stoichiometric_matrix( tuple(self.metabolites), tuple(self.reactions) ) if include_ends: select_indices = totalS.index.difference(exclude_metabs) return totalS.loc[select_indices,:] else: select_indices = totalS.index.difference( exclude_metabs+self.end_substrates+self.end_products ) return totalS.loc[select_indices,:]
@property @lru_cache()
[docs] def netfluxids(self): return self.reactions
@property @lru_cache()
[docs] def totalfluxids(self): totalfluxids = [] for rxnid, rxn in self.reactions_info.items(): if rxn.reversible: totalfluxids.extend([rxnid+'_f', rxnid+'_b']) else: totalfluxids.append(rxnid) return totalfluxids
@property @lru_cache()
[docs] def concids(self): if self.EAMs: metabids = [] for _, EAM in self.EAMs.items(): metabids += [emu.metabolite_id for emu in EAM.columns] return sorted(set(metabids)) else: raise ValueError('network not decomposed')
@property @lru_cache()
[docs] def metabolite_adjacency_matrix(self): ''' Returns ------- MAM: df Metabolite adjacency matrix (MAM). Metabolites with atoms are in index and columns (no duplicates). List of Reactions are in cells if reactions exists between sub (index) and pro (columns), [] otherwise ''' MAM = pd.DataFrame( index = self.metabolites_with_atoms, columns = self.metabolites_with_atoms ) MAM = MAM.applymap(lambda x: []) for _, rxn in self.reactions_info.items(): for sub in rxn.substrates_with_atoms: for pro in rxn.products_with_atoms: if rxn.reversible: MAM.loc[sub, pro].append(rxn) MAM.loc[pro, sub].append(rxn) else: MAM.loc[sub, pro].append(rxn) return MAM
[docs] def __repr__(self): headStr = ( f'{self.__class__.__name__} {self.name if self.name else "unknown"} ' f'({self.n_metabolites} metabolites, {self.n_reactions} reactions)' ) bodyStr = '\n'.join([str(rxn) for rxn in self.reactions_info.values()]) return f'{headStr}\n{bodyStr}'
[docs] def _BFS(self, iniEMU): ''' Parameters ---------- iniEMU: EMU Starting EMU of the decomposition. Metabolite of iniEMU can be any Metabolite instance with the same id. Returns ------- EAMsInfo: dict of list Size => list of [EMU, [precursor EMUs], symbolic expression of flux]. ''' MAM = self.metabolite_adjacency_matrix EAMsInfo = {} searched = [] toSearch = deque() toSearch.appendleft(iniEMU) while toSearch: currentEMU = toSearch.pop() searched.append(currentEMU) formingRxns = list( set(chain(*[cell for cell in MAM[currentEMU.metabolite_id] if cell])) ) for formingRxn in formingRxns: if formingRxn.reversible: if currentEMU.metabolite_id in formingRxn.products_with_atoms: asProMetabs = formingRxn.products_info['metab'][currentEMU.metabolite_id] direction = 'forward' flux = formingRxn.fflux else: asProMetabs = formingRxn.substrates_info['metab'][currentEMU.metabolite_id] direction = 'backward' flux = formingRxn.bflux else: asProMetabs = formingRxn.products_info['metab'][currentEMU.metabolite_id] direction = 'forward' flux = formingRxn.flux if isinstance(asProMetabs, pd.Series): offset = 1/asProMetabs.size asProMetabs = list(asProMetabs) else: offset = 1.0 asProMetabs = [asProMetabs] for asProMetab in asProMetabs: currentEMU = EMU(currentEMU.id, asProMetab, currentEMU.atom_nos) preEMUsInfo = formingRxn._find_precursor_EMUs(currentEMU, direction = direction) for preEMUs, coe in preEMUsInfo: for preEMU in preEMUs: if preEMU not in searched and preEMU not in toSearch: toSearch.appendleft(preEMU) EAMsInfo.setdefault(currentEMU.size, []).append( [currentEMU, preEMUs, offset * coe * flux] ) return EAMsInfo
[docs] def _get_original_EAMs(self, iniEMU): ''' Parameters ---------- iniEMU: EMU Starting EMU of the decomposition. Returns ------- EAMs: dict of df Size => original EMU adjacency matrix (EAM), cells are symbolic expression of flux。 ''' EAMsInfo = self._BFS(iniEMU) EAMs = {} for size, EMUsInfo in EAMsInfo.items(): nonSourceEMUs = set([EMUInfo[0] for EMUInfo in EMUsInfo]) SourceEMUs = sorted( set( [tuple(EMUInfo[1]) if len(EMUInfo[1]) > 1 else EMUInfo[1][0] for EMUInfo in EMUsInfo] ) - nonSourceEMUs ) EAM = pd.DataFrame( Integer(0), index = sorted(nonSourceEMUs) + sorted(SourceEMUs), columns = sorted(nonSourceEMUs) ) for emu, preEMUs, flux in EMUsInfo: col = emu if len(preEMUs) == 1: idx = preEMUs[0] else: idx = tuple(preEMUs) EAM.loc[[idx], col] += flux EAMs[size] = EAM return EAMs
[docs] def _replace_list_item(self, iterable, toReplace, value): ''' Parameters ---------- iterable: iterable Iterable can be nested. toReplace: scalar Value to be replaced. value: scalar Value that replaces toReplace. Returns ------- newLst: tuple New tuple with toReplace recursively replaced. ''' newLst = [] for item in iterable: if not isinstance(item, Iterable): newItem = value if item == toReplace else item else: newItem = self._replace_list_item(item, toReplace, value) newLst.append(newItem) return tuple(newLst)
[docs] def _uniquify_dataFrame_index(self, df): ''' Parameters ---------- df: df DataFrame to be uniquify. Returns ------- uniqueDf: df DataFrame with duplicate rows combined (summated). ''' sortedDf = df.sort_index() sortDfIndex, idx = np.unique(sortedDf.index, return_index = True) uniqueDf = pd.DataFrame( np.add.reduceat(sortedDf.values, idx), index = sortDfIndex, columns = sortedDf.columns ) return uniqueDf
[docs] def _lump_linear_EMUs(self, EAMs, iniEMU): ''' Parameters ---------- EAMs: dict of df Size => original EMU adjacency matrix (EAM), cells are symbolic expression of flux. iniEMU: EMU Starting EMU of the decomposition. Returns ------- lumpedEAMs: dict of df Size => lumped EMU adjacency matrix (EAM), cells are symbolic expression of flux. ''' lumpedEAMs = {} orderedSizes = sorted(EAMs.keys(), reverse = True) for i, size in enumerate(orderedSizes): lumpedEAM = EAMs[size].copy(deep = 'all') for emu in lumpedEAM.columns: preEMUs = lumpedEAM.index[lumpedEAM[emu] != 0] if preEMUs.size == 1: preEMU = preEMUs[0] if (emu != iniEMU and (preEMU not in lumpedEAM.columns or lumpedEAM.loc[emu, preEMU] == 0)): lumpedEAM.drop(emu, axis = 1, inplace = True) lumpedEAM.index = self._replace_list_item(lumpedEAM.index, emu, preEMU) for j in range(i): largerEAM = lumpedEAMs[orderedSizes[j]] largerEAM.index = self._replace_list_item(largerEAM.index, emu, preEMU) lumpedEAM = self._uniquify_dataFrame_index(lumpedEAM) upper = self._uniquify_dataFrame_index( lumpedEAM.loc[lumpedEAM.columns, :] ) lower = self._uniquify_dataFrame_index( lumpedEAM.loc[lumpedEAM.index.difference(lumpedEAM.columns),:] ) lumpedEAM = pd.concat((upper, lower)) lumpedEAMs[size] = lumpedEAM return lumpedEAMs
[docs] def _combine_equivalent_EMUs(self, EAMs): ''' Parameters ---------- EAMs: dict of df Size => original EMU adjacency matrix (EAM), cells are symbolic expression of flux. Returns ------- combinedEAMs: dict df Size => EAM with equivalent EMUs combined, cells are symbolic expression of flux. ''' combinedEAMs = {} for size, EAM in EAMs.items(): combinedEAM = EAM.copy(deep = 'all') combined = [] for emu in combinedEAM.columns: if emu not in combined: equivEMU = emu.equivalent if equivEMU in combinedEAM.columns: combinedEAM.loc[:, emu] = combinedEAM.loc[:, [emu, equivEMU]].sum(axis = 1)/2 combinedEAM.drop(equivEMU, axis = 1, inplace = True) combinedEAM.loc[emu, :] = combinedEAM.loc[[emu, equivEMU], :].sum() combinedEAM.drop(equivEMU, inplace = True) combined.append(equivEMU) combinedEAMs[size] = combinedEAM return combinedEAMs
[docs] def get_emu_adjacency_matrices(self, iniEMU, lump = True): ''' Parameters ---------- iniEMU: EMU Starting EMU of the decomposition. lump: bool Whether to lump linear EMUs. Returns ------- EAMs: dict of df Size => EMU adjacency matrix (EAM) after lumping of linear EMUs and combination of equivalent EMUs. Index and columns are EMUs, cells are symbolic expression of flux. Notes ----- EMUs in sequential reactions can not be lumped in transient MFA. ''' # set CPU affinity in Linux import platform if platform.system() == 'Linux': import os os.sched_setaffinity(os.getpid(), range(os.cpu_count())) oriEAMs = self._get_original_EAMs(iniEMU) if lump: lumpedEAMs = self._lump_linear_EMUs(oriEAMs, iniEMU) combinedEAMs = self._combine_equivalent_EMUs(lumpedEAMs) else: combinedEAMs = self._combine_equivalent_EMUs(oriEAMs) return combinedEAMs
[docs] def _merge_EAMs(self, EAM1, EAM2): ''' Parameters ---------- EAM1: df EMU adjacency matrix (EAM) to merge EAM2: df EMU adjacency matrix (EAM) to merge Returns ------- mergedEAM: df merged EAM ''' nonSourceEMUunion = EAM2.columns.union(EAM1.columns) sourceEMUunion = EAM2.index.difference(EAM2.columns).union( EAM1.index.difference(EAM1.columns) ) mergedEAM = pd.DataFrame( Integer(0), index = nonSourceEMUunion.append(sourceEMUunion), columns = nonSourceEMUunion ) mergedEAM.loc[EAM1.index, EAM1.columns] = EAM1 mergedEAM.loc[EAM2.index, EAM2.columns] = EAM2 return mergedEAM
[docs] def _merge_all_EAMs(self, *EAMsAll): ''' Parameters ---------- EAMsAll: tuple of EAMs EAMs is dict of DataFrame, i.e., Size => EMU adjacency matrix (EAM). Returns ------- mergedEAMs: dict of df Size => merged EAM ''' mergedEAMs = {} maxsize = max([max(EAMs) for EAMs in EAMsAll]) for size in range(1, maxsize+1): EAMCurrentSize = list( filter( lambda x: isinstance(x, pd.DataFrame), [EAMs.get(size, 0) for EAMs in EAMsAll] ) ) if EAMCurrentSize: mergedEAMs[size] = reduce(self._merge_EAMs, EAMCurrentSize) return mergedEAMs
[docs] def _decompose_network(self, metabolites, atom_nos, lump = True, n_jobs = 1): ''' Parameters ---------- metabolites: list of str List of metabolite IDs from which initial EMU will be generated to start the decomposition. atom_nos: list of str Atom NOs of corresponding metabolites, len(atom_nos) should be equal to len(metabolites). lump: bool Whether to lump linear EMUs. n_jobs: int # of jobs to run in parallel. Returns ------- mergedEAMs: dict of df Size => merged EMU adjacency matrix (EAM). Notes ----- EMUs in sequential reactions can not be lumped in transient MFA. ''' emus = [] for metabid, atomNOs in zip(metabolites, atom_nos): emus.append(EMU(metabid+'_'+atomNOs, Metabolite(metabid), atomNOs)) EAMsAll = [] if n_jobs == 1: for emu in emus: EAMs = self.get_emu_adjacency_matrices(emu, lump) EAMsAll.append(EAMs) else: pool = Pool(processes = n_jobs) for emu in emus: EAMs = pool.apply_async( func = self.get_emu_adjacency_matrices, args = (emu, lump) ) EAMsAll.append(EAMs) pool.close() pool.join() EAMsAll = [EAMs.get() for EAMs in EAMsAll] mergedEAMs = self._merge_all_EAMs(*EAMsAll) return mergedEAMs
[docs] def decompose_network(self, ini_emus, lump = True, n_jobs = 1): ''' Parameters ---------- ini_emus: dict Metabolite ID => atom NOs or list of atom NOs. Atom NOs can be int list or str, e.g., {'Ala': [[1,2,3], '23'], 'Ser': '123'} lump: bool Whether to lump linear EMUs. n_jobs: int # of jobs to run in parallel. Returns ------- mergedEAMs: dict of df Size => merged EMU adjacency matrix (EAM) Notes ----- EMUs in sequential reactions can not be lumped in transient MFA. ''' metabids = [] atom_nos = [] for metabid, atomNOs in ini_emus.items(): if isinstance(atomNOs, list): if any(isinstance(item, Iterable) for item in atomNOs): for atomnos in atomNOs: if not isinstance(atomnos, str): atomnos = ''.join(map(str, atomnos)) metabids.append(metabid) atom_nos.append(atomnos) else: atomNOs = ''.join(map(str, atomNOs)) metabids.append(metabid) atom_nos.append(atomnos) else: metabids.append(metabid) atom_nos.append(atomNOs) return self._decompose_network(metabids, atom_nos, lump = lump, n_jobs = n_jobs)
[docs] def optimizer(self): return Optimizer(self)
[docs] def simulator(self, kind): ''' Parameters ---------- kind: {"ss", "inst"} * If "ss", simulation at isotopic steady state is performed. * If "inst", simulation at isotopically nonstationary state is performed. ''' if kind.lower() == 'ss': return Simulator(self) elif kind.lower() == 'inst': return InstSimulator(self) else: raise ValueError('kind should be either "ss" or "inst"')
[docs] def fitter(self, kind): ''' Parameters ---------- kind: {"ss", "inst"} * If "ss", fitting at isotopic steady state is performed. * If "inst", fitting at isotopically nonstationary state is performed. ''' if kind.lower() == 'ss': return Fitter(self) elif kind.lower() == 'inst': return InstFitter(self) else: raise ValueError('kind should be either "ss" or "inst"')