Source code for fretraj.cloud

#!/usr/bin/env python3

import numpy as np
import pandas as pd
import os
import json
import mdtraj as md
import numba as nb
import copy
import pickle


from fretraj import export
from fretraj import fret
from fretraj import grid
from fretraj import _LabelLib_found

if _LabelLib_found:
    import LabelLib as ll

DISTANCE_SAMPLES = 100000

package_directory = os.path.dirname(os.path.abspath(__file__))

with open(os.path.join(package_directory, "periodic_table.json")) as f:
    _periodic_table = json.load(f)

VDW_RADIUS = dict((key, _periodic_table[key]["vdw_radius"]) for key in _periodic_table.keys())

# set None if the parameter is required
_label_dict = {
    "Position": {
        "pd_key": {
            "attach_id": ((int, float), None),
            "linker_length": ((int, float), None),
            "linker_width": ((int, float), None),
            "dye_radius1": ((int, float), None),
            "dye_radius2": ((int, float), None),
            "dye_radius3": ((int, float), None),
            "cv_thickness": ((int, float), 0),
            "grid_spacing": ((int, float), 1.0),
            "mol_selection": (str, "all"),
            "simulation_type": (str, "AV3"),
            "cv_fraction": ((int, float), 0),
            "state": (int, 1),
            "frame_mdtraj": (int, 0),
            "use_LabelLib": (bool, False),
            "contour_level_AV": ((int, float), 0),
            "contour_level_CV": ((int, float), 0.7),
            "b_factor": (int, 100),
            "gaussian_resolution": (int, 2),
            "grid_buffer": ((int, float), 2.0),
            "transparent_AV": (bool, True),
        }
    },
    "Distance": {"pd_key": {"R0": ((int, float), None), "n_dist": (int, 10 ** 6)}},
}

_label_dict_vals = {
    field: {"pd_key": {key: val[1] for key, val in _label_dict[field]["pd_key"].items()}}
    for field in _label_dict.keys()
}
_default_params = {
    field: {key: val[1] for key, val in _label_dict[field]["pd_key"].items() if val[1] is not None}
    for field in _label_dict.keys()
}


[docs]def labeling_params(param_file, verbose=True): """Parse the parameter file to get the configuration settings Parameters ---------- param_file : str json-formatted parameter filename Returns ------- dict position of labels, dye and grid parameters """ with open(param_file) as f: labels_json = json.load(f) try: check_labels(labels_json, verbose) except KeyError as e: error_type = e.args[0] key = e.args[1] pos = e.args[2] field = e.args[3] print("{}: '{}' in {} {}. Exiting...".format(error_type, key, field, pos)) except TypeError as e: print("Wrong data type: {}".format(e)) else: return labels_json
[docs]def check_labels(labels, verbose=True): """Check integrity of parameter dictionary Parameters ---------- labels : dict position of labels, dye and grid parameters verbose : bool, optional=True be verbose about missing parameter and their fall-back values """ for field in _label_dict.keys(): if field in labels.keys(): for pos in labels[field].keys(): if field == "Position": if "simulation_type" not in labels[field][pos]: labels[field][pos]["simulation_type"] = copy.copy(_default_params[field]["simulation_type"]) else: if labels[field][pos]["simulation_type"] == "AV1": labels[field][pos]["dye_radius2"] = 0 labels[field][pos]["dye_radius3"] = 0 if "state" in labels[field][pos] and "frame_mdtraj" not in labels[field][pos]: labels[field][pos]["frame_mdtraj"] = labels[field][pos]["state"] - 1 if "frame_mdtraj" in labels[field][pos] and "state" not in labels[field][pos]: labels[field][pos]["state"] = labels[field][pos]["frame_mdtraj"] + 1 elif field == "Distance": pass # check if all keys that are needed are defined for key, (t, _) in _label_dict[field]["pd_key"].items(): if key not in labels[field][pos]: if key in _default_params[field].keys(): labels[field][pos][key] = copy.copy(_default_params[field][key]) if verbose: print( "Missing Key: '{}' in {} {}. Falling back to \"{}\"".format( key, field, pos, _default_params[field][key] ) ) else: raise KeyError("Missing Key", key, pos, field) else: if not isinstance(labels[field][pos][key], t): raise TypeError( "'{}' in {} {} must be of one of the following types: {}".format(key, field, pos, t) ) # check if there are any unrecognized keys for key in labels[field][pos].keys(): if key not in _label_dict_vals[field]["pd_key"].keys(): raise KeyError("Unrecognized key", key, pos, field) else: labels[field] = None raise ValueError("Cannot read {} parameters from file: Missing field '{}'.".format(field, field))
[docs]def save_obj(filename, obj): """ Save a serialized object to a binary file Parameters ---------- filename : str filename for pickle object obj : serializable object """ with open(filename, "wb") as f: try: pickle.dump(obj, f) except TypeError: try: obj_tmp = copy.copy(obj) obj_tmp.acv = copy.copy(obj.acv) del obj_tmp.av del obj_tmp.acv.ll_Grid3D print("Note: the LabelLib.Grid objects have been removed as they cannot be pickled.") except AttributeError: print("Error: Cannot pickle the passed object") else: pickle.dump(obj_tmp, f)
[docs]def load_obj(filename): """ Load a serialized object from a binary file Parameters ---------- filename : str filename of pickle obj """ with open(filename, "rb") as f: return pickle.load(f)
[docs]def save_labels(filename, labels): """Write the ACV parameters to a .json file Parameters ---------- filename : str filename for label parameters labels : dict position of labels, dye and grid parameters Examples -------- >>> obj.save_labels('parameters.json') """ with open(filename, "w") as f: json.dump(labels, f, indent=2)
[docs]def printProgressBar(iteration, total, prefix="Progress:", suffix="complete", length=20, fill="█"): """Command line progress bar Parameters ---------- iteration : int current iteration total : int total number of iterations prefix : str, optional='Progress:' string before progress bar suffix : str, optional='complete' string after progress bar length : int, optional=20 length of progress bar fill : str, optional='█' fill character of progress bar Examples -------- >>> printProgressBar(0, n) >>> for i in range(n): doSomething printProgressBar(i + 1, n) """ percent = "{:0.0f}".format(100 * iteration / total) filledLength = int(length * iteration // total) bar = fill * filledLength + "-" * (length - filledLength) print("\r{} |{}| {}% {}".format(prefix, bar, percent, suffix), end="\r") if iteration == total: print()
[docs]def pipeline_frames(structure, donor_site, acceptor_site, labels, frames, fret_pair): """Create a pipeline to compute multi-frame donor and acceptor ACVs and calculate a FRET trajectory Parameters ---------- structure : mdtraj.Trajectory trajectory of atom coordinates loaded from a pdb, xtc or other file donor_site : str reference identifier for the donor labeling position acceptor_site : str reference identifier for the acceptor labeling position labels : dict dye, linker and setup parameters for the accessible volume calculation frames_mdtraj : list list of frames on the trajectory to be used for the ACV calculation fret_pair : str Distance key specifying the donor acceptor pair Note ---- Running a pipeline more memory-efficient than running Volume.from_frames() because the ACVs are stored only until the FRET efficiency is calculated. On the downside, no ACV trajectories(.xtc / .xyz) can be saved. """ _labels = copy.copy(labels) acv = {} fret = [] n_frames = len(frames) printProgressBar(0, n_frames) for i, frame in enumerate(frames): for dye, site in zip(["D", "A"], [donor_site, acceptor_site]): _labels["Position"][site]["frame_mdtraj"] = frame _labels["Position"][site]["state"] = frame + 1 acv[dye] = Volume(structure, site, _labels) fret.append(FRET(acv["D"], acv["A"], fret_pair, labels)) printProgressBar(i + 1, n_frames) return fret
[docs]def save_mp_traj(filename, volume_list, units="A"): """Save a trajectory of dye mean positions as an xyz file Parameters ---------- filename : str filename for mean position trajectory volume_list : array_like list of Volume instances units : {'A', 'nm'}, optional='A' distance units ('A': Angstroms, 'nm': nanometers) """ mps = np.vstack([volume_list[i].acv.mp for i in range(len(volume_list)) if hasattr(volume_list[i].acv, "mp")]) mps = np.hstack((mps, np.ones((mps.shape[0], 1)))) mean_mp = np.mean(mps, 0) xyz_str = export.xyz(mps, mean_mp, None) with open(filename, "w") as fname: fname.write(xyz_str)
[docs]def acv_subsampling(volume_list, verbose=False): """Subsample the ACV to obtain identical number of points over all volumes. This allows to create mdtraj.Trajectory objects of the ACV which can be also save as .xtc or .xyz files Parameters ---------- volume_list : array_like list of Volume instances verbose: bool be verbose about the number of subsamples """ rng = np.random.default_rng() vols = ["AV", "FV", "CV"] top = {} xyz_sub = {} for v in vols: if v == "CV": xyz_list = list( map(lambda volume: volume.acv.cloud_xyzqt[volume.acv.cloud_xyzqt[:, 4] == 2, 0:3], volume_list) ) elif v == "FV": xyz_list = list( map(lambda volume: volume.acv.cloud_xyzqt[volume.acv.cloud_xyzqt[:, 4] < 2, 0:3], volume_list) ) else: xyz_list = list(map(lambda volume: volume.acv.cloud_xyzqt[:, 0:3], volume_list)) n_pts = min([xyz.shape[0] for xyz in xyz_list]) top[v] = _create_volume_topology(n_pts) xyz_sub[v] = np.stack([rng.choice(xyz, n_pts, replace=False) for xyz in xyz_list]) if verbose: print(f"{n_pts} points sampled in {v}") return xyz_sub, top
[docs]def create_acv_traj(volume_list, verbose=False): """Create a mdtraj.Trajectory object from a volume list Parameters ---------- volume_list : array_like list of Volume instances verbose: bool be verbose about the number of subsamples """ acv_traj = {} xyz_sub, top = acv_subsampling(volume_list, verbose=False) acv_traj["AV"] = md.Trajectory(xyz_sub["AV"] / 10, top["AV"]) if any(volume_list[0].acv.tag_1d > 1): acv_traj["FV"] = md.Trajectory(xyz_sub["FV"] / 10, top["FV"]) acv_traj["CV"] = md.Trajectory(xyz_sub["CV"] / 10, top["CV"]) return acv_traj
[docs]def save_acv_traj(filename, volume_list, format="pdb", verbose=False): """Save a trajectory of ACVs as a full multi model PDB or a subsampled .xyz or .xtc (with identical number of points over all volumes) Parameters ---------- filename : str filename for ACV trajectory volume_list : array_like list of Volume instances format : str file format of the ACV (for 'pdb' the full cloud is saved without subsampling; for 'xyz' k points in the volumes are randomly picked, where k is the number of points in the smallest volume) separate_CV : bool Save contact and free volume in separate files (choose this if you want to retain the CV information) """ if format == "pdb": if verbose: print('With format "PDB" no subsampling is performed.') file_str = "" for i, volume in enumerate(volume_list): file_str += f"MODEL {i+1}\n" file_str += export.pdb(volume.acv.cloud_xyzqt, volume.acv.mp) file_str += "ENDMDL\n\n" with open(filename, "w") as fname: fname.write(file_str) elif (format == "xyz") or (format == "xtc"): vols = ["AV"] if any(volume_list[0].acv.tag_1d > 1): vols += ["FV", "CV"] base, suffix = os.path.splitext(filename) xyz_sub, top = acv_subsampling(volume_list, verbose=False) for v in vols: filename = f"{base}_{v}{suffix}" if format == "xtc": with md.formats.XTCTrajectoryFile(filename, "w") as f: f.write(xyz_sub[v] / 10) else: with md.formats.XYZTrajectoryFile(filename, "w") as f: f.write(xyz_sub[v]) first_frame = md.Trajectory(xyz_sub[v][0] / 10, top[v]) first_frame.save_pdb(f"{base}_{v}.pdb")
[docs]def load_acv_traj(filename): """Load an AV, FV and CV from .xyz or .xtc file Parameters ---------- filename : str filename of ACV trajectory Note ---- Multi model PDB files cannot be loaded as a mdtraj.Trajectory because they are on purpose not subsampled and thus contain different number of points per volume """ base, suffix = os.path.splitext(filename) acv_traj = {} for v in ["FV", "FV", "CV"]: acv_traj[v] = md.load(f"{base}_{v}{suffix}", top=f"{base}_{v}.pdb") return acv_traj
[docs]def _create_volume_topology(n_atoms): """Create a topology for an accessible-contact volume Parameters ---------- n_atoms : int number of points in the volume """ atoms = pd.concat( ( pd.Series(range(n_atoms), name="serial"), pd.Series(["D"] * n_atoms, name="name"), pd.Series(["D"] * n_atoms, name="element"), pd.Series([0] * n_atoms, name="resSeq"), pd.Series(["X"] * n_atoms, name="resName"), pd.Series([0] * n_atoms, name="chainID"), ), axis=1, ) top = md.Topology.from_dataframe(atoms) return top
[docs]def save_structure_traj(filename, structure, frames, format="pdb"): """Save selected frames of a trajectory as multi-model PDB or XTC Parameters ---------- filename : str filename for structure trajectory structure : mdtraj.Trajectory trajectory of atom coordinates loaded from a pdb, xtc or other file format : {'pdb', 'xtc'} trajectory file format """ sliced_structure = structure.slice(frames) if format == "xtc": sliced_structure.save_xtc(filename) else: sliced_structure.save_pdb(filename)
[docs]class ACV: """Reweighted accessible contact volume of a covalently linked fluorophore Parameters ---------- grid_acv : LabelLib.Grid3D, optional=None accessible volume with added weight labels for free and contact volume (density label FV: 1.0, density label CV: 2.0); the two volumes are subsequently reweighted cv_thickness : float, optional=0 width of the contact volume in Angstrom (default: 0, i.e. no CV is calculated) **Note:** good first approx.: 2*min(dye_radii) cv_fraction : float, optional=0 fraction of dyes that are within the contact volume (e.g. as determined by fluorescence anisotropy) cloud_xyzq : ndarray, optional=None array of x-,y-,z-coordinates and corresponding weights with a shape [n_gridpts(+), 4] use_LabelLib : bool, optional=True make use of LabelLib library to compute FRET values and distances """ def __init__(self, grid_acv=None, cv_thickness=0, cv_fraction=0, cloud_xyzqt=None, use_LabelLib=True): if grid_acv is not None: self.n_gridpts = np.prod(grid_acv.shape) self.grid_1d, self.tag_1d = self._reweight_cv(grid_acv.grid, cv_thickness, cv_fraction) self.grid_3d = Volume.reshape_grid(self.grid_1d, grid_acv.shape) self.tag_3d = Volume.reshape_grid(self.tag_1d, grid_acv.shape) self.cloud_xyzqt = Volume.grid2pts( self.grid_3d, grid_acv.originXYZ, np.array([grid_acv.discStep] * 3), self.tag_3d ) if use_LabelLib and _LabelLib_found: self.ll_Grid3D = ll.Grid3D(grid_acv.shape, grid_acv.originXYZ, grid_acv.discStep) self.ll_Grid3D.grid = self.grid_1d else: self.ll_Grid3D = None else: self.cloud_xyzqt = cloud_xyzqt self.mp = Volume.mean_pos(self.cloud_xyzqt)
[docs] def _reweight_cv(self, grid, cv_thickness, cv_fraction): """Reweight the accessible volume based on contact and free volume Parameters ---------- cv_thickness : float width of the contact volume in Angstrom (default: 0, i.e. no CV is calculated) cv_fraction : float relative fraction spent inside the contact volume (i.e. stacked to the surface) Returns ------- ndarray one-dimensional array of grid points of length n_gridpts Notes ----- - A good first approximation of the CV thickness is about two times the smallest dye radius - The CV fraction can be calculated from the residual fluorescence anisotropy """ grid_1d = np.array(grid) tag_1d = self._tag_volume(grid_1d) if cv_thickness > 0: weight_cv = self._weight_factor(grid_1d, cv_fraction) grid_1d[grid_1d > 1.0] = weight_cv grid_1d = np.clip(grid_1d, 0, None) return grid_1d, tag_1d
[docs] @staticmethod def _weight_factor(grid_1d, cv_fraction): """Calculate the weight of contact volume grid points. Note ---- The weight factor corresponds to the factor by which the points of the contact volume (CV, dye trapped on surface) are favored over those belonging the free volume (FV, free dye diffusion). This accounts for the (experimentally) determined fraction of dyes populating the CV. Parameters ---------- grid_1d : ndarray one-dimensional array of grid points of length n_gridpts cv_fraction : float fraction of dyes that are within the contact volume Returns ------- float weight of each grid point that belongs to the contact volume """ n_CV = np.count_nonzero(grid_1d > 1.0) if n_CV != 0: n_FV = np.count_nonzero([(grid_1d > 0.0) & (grid_1d <= 1.0)]) if n_FV != 0: weight_cv = n_FV / n_CV * cv_fraction / (1.0 - cv_fraction) else: print("no fraction of free dyes, all stacked") weight_cv = 2 else: weight_cv = 1 return weight_cv
[docs] def _tag_volume(self, grid_1d): """Assign a tag to the grid values depending on their location in the cloud (1: free volume, 2: contact volume) Parameters ---------- grid_1d : ndarray one-dimensional array of grid points length n_gridpts Returns ------- ndarray one-dimensional array of length n_gridpts """ tag_1d = np.full(self.n_gridpts, 1) tag_1d[grid_1d > 1.0] = 2 return tag_1d
[docs]class FRET: """FRET class of distance and transfer efficiency for a pair of donor and acceptor ACVs Parameters ---------- volume1, volume2 : instances of the Volume class fret_pair : str Distance key specifying the donor acceptor pair labels : dict dye, linker and setup parameters for the accessible volume calculation R_DA : ndarray donor acceptor distance distribution and associate weights (optional) verbose : bool Examples -------- >>> ft.Molecule(volume1, volume2) >>> ft.Molecule(volume1, volume2, use_LabelLib=False) """ def __init__(self, volume1, volume2, fret_pair, labels, R_DA=None, verbose=True): try: if volume1.acv is None or volume2.acv is None: raise TypeError except TypeError: if verbose: print("One accessible volume is empty") else: self.fret_pair = fret_pair self.R0 = labels["Distance"][fret_pair]["R0"] self.n_dist = labels["Distance"][fret_pair]["n_dist"] self.use_LabelLib = np.all([volume1.use_LabelLib, volume2.use_LabelLib]) if self.use_LabelLib and _LabelLib_found: if R_DA is None: R_DA = fret.dists_DA_ll(volume1.acv, volume2.acv, n_dist=self.n_dist, return_weights=True) self.mean_R_DA = fret.mean_dist_DA_ll(volume1.acv, volume2.acv, n_dist=self.n_dist) self.sigma_R_DA = fret.std_dist_DA(volume1.acv, volume2.acv, R_DA=R_DA) E_DA = fret.FRET_DA(volume1.acv, volume2.acv, R_DA=R_DA, R0=self.R0) self.mean_E_DA = fret.mean_FRET_DA_ll(volume1.acv, volume2.acv, R0=self.R0, n_dist=self.n_dist) self.sigma_E_DA = fret.std_FRET_DA(volume1.acv, volume2.acv, E_DA=E_DA) else: if R_DA is None: R_DA = fret.dists_DA(volume1.acv, volume2.acv, n_dist=self.n_dist, return_weights=True) self.mean_R_DA = fret.mean_dist_DA(volume1.acv, volume2.acv, R_DA=R_DA) self.sigma_R_DA = fret.std_dist_DA(volume1.acv, volume2.acv, R_DA=R_DA) E_DA = fret.FRET_DA(volume1.acv, volume2.acv, R_DA=R_DA, R0=self.R0) self.mean_E_DA = fret.mean_FRET_DA(volume1.acv, volume2.acv, E_DA=E_DA) self.sigma_E_DA = fret.std_FRET_DA(volume1.acv, volume2.acv, E_DA=E_DA) self.mean_R_DA_E = fret.mean_dist_DA_fromFRET( volume1.acv, volume2.acv, mean_E_DA=self.mean_E_DA, R0=self.R0 ) self.sigma_R_DA_E = fret.std_dist_DA_fromFRET( volume1.acv, volume2.acv, mean_E_DA=self.mean_E_DA, sigma_E_DA=self.sigma_E_DA, R0=self.R0 ) self.R_attach = fret.dist_attach(volume1.attach_xyz, volume2.attach_xyz) self.R_mp = fret.dist_mp(volume1.acv, volume2.acv)
[docs] @classmethod def from_volumes(cls, volume_list1, volume_list2, fret_pair, labels, R_DA=None): """Alternative constructor for the FRET class by reading in a list of donor and acceptor volumes Parameters ---------- volume_list1, volume_list2 : array_like lists of Volume instances fret_pair : str identifier for donor acceptor pair labels : dict dye, linker and setup parameters for the accessible volume calculation R_DA : ndarray donor acceptor distance distribution and associate weights (optional) """ n_vols1 = len(volume_list1) n_vols2 = len(volume_list2) try: if n_vols1 != n_vols2: raise ValueError except ValueError: print("The length of volume_list1 and volume_list2 is not the same") else: printProgressBar(0, n_vols1) fret = [] for i in range(n_vols1): fret_value = FRET(volume_list1[i], volume_list2[i], fret_pair, labels, R_DA) if volume_list1[i].acv is None or volume_list2[i].acv is None: print("Skip list entry {:d}".format(i)) else: fret.append(fret_value) printProgressBar(i + 1, n_vols1) return fret
[docs] def save_fret(self, filename): """Write the FRET calculation to a json file Parameters ---------- filename : str filename for FRET prediction Examples -------- >>> obj.save_FRET('parameters.json') """ self.values.to_json(filename, indent=2)
@property def values(self): """ Pandas Dataframe of FRET parameters Returns ------- pandas.DataFrame """ fret_results = { "R0 (A)": (float(f"{self.R0 :0.1f}"), np.nan), "<R_DA> (A)": (float(f"{self.mean_R_DA :0.1f}"), float(f"{self.sigma_R_DA :0.1f}")), "<E_DA>": (float(f"{self.mean_E_DA :0.2f}"), float(f"{self.sigma_E_DA :0.2f}")), "<R_DA_E> (A)": (float(f"{self.mean_R_DA_E :0.1f}"), float(f"{self.sigma_R_DA_E :0.1f}")), "R_attach (A)": (float(f"{self.R_attach :0.1f}"), np.nan), "R_mp (A)": (float(f"{self.R_mp :0.1f}"), np.nan), } fret_results = pd.DataFrame(fret_results, index=["value", "std"]) return fret_results
[docs]class Trajectory: """Trajectory class of distances and transfer efficiencies from the FRET class Parameters ---------- fret : instance of the FRET class timestep : int time difference between two frames in picoseconds """ def __init__(self, fret, timestep=None, kappasquare=None): n = len(fret) self.mean_E_DA = np.array([fret[i].mean_E_DA for i in range(n) if hasattr(fret[i], "mean_E_DA")]).round(2) self.mean_R_DA = np.array([fret[i].mean_R_DA for i in range(n) if hasattr(fret[i], "mean_R_DA")]).round(1) self.mean_R_DA_E = np.array([fret[i].mean_R_DA_E for i in range(n) if hasattr(fret[i], "mean_R_DA_E")]).round(1) self.R_attach = np.array([fret[i].R_attach for i in range(n) if hasattr(fret[i], "R_attach")]).round(1) self.R_mp = np.array([fret[i].R_mp for i in range(n) if hasattr(fret[i], "R_mp")]).round(1) self.timestep = timestep self.kappasquare = kappasquare @property def dataframe(self): """Pandas Dataframe view of the Trajectory object Returns ------- pandas.DataFrame Dataframe with distances and transfer efficiencies of the FRET trajectory """ df = pd.DataFrame( (self.mean_R_DA, self.mean_E_DA, self.mean_R_DA_E, self.R_attach, self.R_mp), index=["<R_DA> (A)", "<E_DA>", "<R_DA_E> (A)", "R_attach (A)", "R_mp (A)"], ).T if self.timestep: df = pd.concat((df, pd.Series(range(df.shape[0]), name="time (ps)") * self.timestep), axis=1) if self.kappasquare: df = pd.concat((df, pd.Series(np.ones(df.shape[0]), name="kappasquare") * self.kappasquare), axis=1) return df
[docs] def save_traj(self, filename, format="csv", units="A", header=True, R_kappa_only=False): """Save the trajectory as a .csv file Parameters ---------- filename : str filename for .csv trajectory format : {'csv', 'txt'} units : {'A', 'nm'}, optional='A' distance units ('A': Angstroms, 'nm': nanometers) header : bool, default=True include header in the output file R_kappa_only : bool, default=False include only time, R_DA and kappasquare columns in the output file (as *gmx dyecoupl*, Gromacs) """ df = self.dataframe with open(filename, "w") as f: if format == "txt": separator = "\t" else: separator = "," if units == "nm": df["<R_DA> (A)"] = self.dataframe["<R_DA> (A)"] / 10 df["<R_DA_E> (A)"] = self.dataframe["<R_DA_E> (A)"] / 10 df["R_attach (A)"] = self.dataframe["R_attach (A)"] / 10 df["R_mp (A)"] = self.dataframe["R_mp (A)"] / 10 df.rename( {header: header.replace("(A)", "(nm)") for header in df.columns.values}, axis="columns", inplace=True, ) if R_kappa_only: if self.timestep and self.kappasquare: if units == "nm": columns = ["time (ps)", "<R_DA> (nm)", "kappasquare"] else: columns = ["time (ps)", "<R_DA> (A)", "kappasquare"] else: raise KeyError("Time or kappasquare is missing in the DataFrame") else: columns = None f.write(df.to_csv(index=False, sep=separator, header=header, columns=columns))
[docs]class Volume: """Class object holding the accessible contact volume of a specific labeling position Parameters ---------- structure : mdtraj.Trajectory trajectory of atom coordinates loaded from a pdb, xtc or other file site : str reference identifier for the labeling position labels : dict dye, linker and setup parameters for the accessible volume calculation cloud_xyzqt : ndarray array of x-,y-,z-coordinates with corresponding weights and tags with a shape [n_gridpts(+), 5] """ def __init__(self, structure, site, labels, cloud_xyzqt=None, verbose=True): self.structure = structure self.attach_id = labels["Position"][site]["attach_id"] if self.attach_id is not None: self.labeling_site = site self.mol_selection = labels["Position"][site]["mol_selection"] self.linker_length = labels["Position"][site]["linker_length"] self.linker_width = labels["Position"][site]["linker_width"] self.simulation_type = labels["Position"][site]["simulation_type"] self.dye_radii = np.array( [ labels["Position"][site]["dye_radius1"], labels["Position"][site]["dye_radius2"], labels["Position"][site]["dye_radius3"], ] ) self.grid_spacing = labels["Position"][site]["grid_spacing"] self.cv_thickness = labels["Position"][site]["cv_thickness"] self.cv_fraction = labels["Position"][site]["cv_fraction"] self.state = labels["Position"][site]["state"] self.frame_mdtraj = labels["Position"][site]["frame_mdtraj"] self.use_LabelLib = labels["Position"][site]["use_LabelLib"] try: self.n_atoms = self.structure.n_atoms except: print("{} is no valid mdtraj.Trajectory object".format(self.structure)) else: try: if self.frame_mdtraj != self.state - 1: raise ValueError except ValueError: print( "The state {:d} and frame_mdtraj {:d} are not compatible. The frame_mdtraj should be equal to state - 1".format( self.state, self.frame_mdtraj ) ) self.av = None self.acv = None else: try: if self.state > self.structure.n_frames: raise IndexError except IndexError: print( "The state {:d} and mdtraj frame {:d} are out of range, select a state within the range 1 - {:d} or a mdtraj frame within the range 0 - {:d}".format( self.state, self.frame_mdtraj, self.structure.n_frames, self.structure.n_frames - 1 ) ) self.av = None self.acv = None else: try: if self.attach_id < 1 or self.attach_id > self.n_atoms: raise IndexError except IndexError: print( "The attachment position {:d} is out of range, select an index within 1 - {:d}".format( self.attach_id, self.n_atoms ) ) self.av = None self.acv = None else: self.attach_id_mdtraj = labels["Position"][site]["attach_id"] - 1 self.resi_atom = self.structure.top.atom(self.attach_id_mdtraj) self.av = self.calc_av(self.use_LabelLib) try: if not np.any(np.array(self.av.grid) > 0): raise ValueError except ValueError: if verbose: print( "Empty Accessible volume at position {:d}. Is your attachment point buried?".format( self.attach_id ) ) self.acv = None else: self.acv = self.calc_acv(self.use_LabelLib) elif cloud_xyzqt is not None: self.acv = ACV(cloud_xyzqt=cloud_xyzqt) else: print("Attachment point is unknown")
[docs] @classmethod def from_frames(cls, structure, site, labels, frames_mdtraj): """Alternative constructor for the Volume class by reading in one or multiple frames using the same dye and grid parameters Parameters ---------- structure : mdtraj.Trajectory trajectory of atom coordinates loaded from a pdb, xtc or other file site : str reference identifier for the labeling position labels : dict dye, linker and setup parameters for the accessible volume calculation frames_mdtraj : int or list list of frames on the trajectory to be used for the ACV calculation Returns ------- list list of Volume instances """ n_fr = len(frames_mdtraj) printProgressBar(0, n_fr) multiframe_volumes = [] _labels = copy.copy(labels) for i, frame in enumerate(frames_mdtraj): _labels["Position"][site]["frame_mdtraj"] = frame _labels["Position"][site]["state"] = frame + 1 multiframe_volumes.append(cls(structure, site, _labels)) printProgressBar(i + 1, n_fr) return multiframe_volumes
[docs] @classmethod def from_attachID(cls, structure, site, labels, attachID): """Alternative constructor for the Volume class by reading in one or multiple attachment points using the same dye and grid parameters Parameters ---------- structure : mdtraj.Trajectory trajectory of atom coordinates loaded from a pdb, xtc or other file site : str reference identifier for the labeling position labels : dict dye, linker and setup parameters for the accessible volume calculation attachID : int or list list of attachment ids on the structure to be used for the ACV calculation Returns ------- list list of Volume instances Examples -------- >>> struct = md.load_pdb('data/2m23.pdb') >>> labels_global = ft.labeling_params('data/labels_global.json') >>> from_attachID(struct, 0, labelsglobals, [2,929]) """ n_aID = len(attachID) printProgressBar(0, n_aID) multisite_volumes = [] _labels = copy.copy(labels) for i, attach_id in enumerate(attachID): _labels["Position"][site]["attach_id"] = attach_id multisite_volumes.append(cls(structure, site, _labels)) printProgressBar(i + 1, n_aID) return multisite_volumes
[docs] @staticmethod def reshape_grid(grid, shape): """Convert a 1D-grid to a 3D-Grid Parameters ---------- grid : array_like one-dimensional grid array/list of length n_gridpts shape : list Returns ------- ndarray 3-dimensional array of grid points with a shape given by n_xyz Examples -------- >>> grid = numpy.full(27,1) >>> ft.cloud.Volume.reshape_grid(grid, (3,3,3)) array([[[1., 1., 1.], [1., 1., 1.], [1., 1., 1.]], ... [[1., 1., 1.], [1., 1., 1.], [1., 1., 1.]]]) """ grid_3d = np.array(grid, dtype=np.float64).reshape(shape, order="F") return grid_3d
[docs] @staticmethod @nb.jit(nopython=True) def grid2pts(grid_3d, xyz_min, d_xyz, *args): """Convert 3D-grid with density values to xyz coordinates with a weight (q) Parameters ---------- grid_3d : ndarray([nx,ny,nz]) 3-dimensional array of grid points with a shape given by n_xyz xyz_min : ndarray origin coordinates of the grid d_xyz : ndarray grid spacing in x-,y- and z-direction Returns ------- ndarray array of x-,y-,z-coordinates with corresponding weights and tags with a shape [n_gridpts(+), 5] Examples -------- >>> grid_3d = numpy.zeros((3,3,3)) >>> grid_3d[(1,1,1)] = 1 # FV >>> grid_3d[(1,2,1)] = 10 # CV >>> ft.cloud.Volume.grid2pts(grid_3d, (0,0,0), (1,1,1)) array([[ 0.5, 0.5, 0.5, 1. 1], [ 0.5, 1. , 0.5, 10. 1]]) >>> tag_3d = numpy.zeros((3,3,3)) >>> grid_3d[(1,1,1)] = 1 # FV >>> grid_3d[(1,1,1)] = 2 # CV >>> ft.cloud.Volume.grid2pts(grid_3d, (0,0,0), (1,1,1), tag_3d) array([[ 0.5, 0.5, 0.5, 1. 1], [ 0.5, 1. , 0.5, 10. 2]]) """ xmin, ymin, zmin = xyz_min nx, ny, nz = grid_3d.shape dx, dy, dz = d_xyz if len(args) != 0: tag_3d = args[0] else: tag_3d = np.full(grid_3d.shape, 1) cloud_xyzqt = np.empty((nx * ny * nz, 5), dtype=np.float64) gdx = np.arange(0, nx, dtype=np.float64) * dx gdy = np.arange(0, ny, dtype=np.float64) * dy gdz = np.arange(0, nz, dtype=np.float64) * dz n = 0 for ix in range(nx): for iy in range(ny): for iz in range(nz): d = grid_3d[ix, iy, iz] if d > 0: cloud_xyzqt[n, 0] = gdx[ix] + xmin cloud_xyzqt[n, 1] = gdy[iy] + ymin cloud_xyzqt[n, 2] = gdz[iz] + zmin cloud_xyzqt[n, 3] = d cloud_xyzqt[n, 4] = tag_3d[ix, iy, iz] n += 1 return cloud_xyzqt[:n]
@property def mol_xyzr(self): """Get coordinates and vdW radii of all selected atoms in the structure Returns ------- ndarray array of x-,y-,z-coordinates and VdW radii with a shape [n_atoms, 4] Examples -------- >>> avobj.mol_xyzr """ frame_mdtraj = self.frame_mdtraj struct = self.structure try: radii = np.array([VDW_RADIUS[atom.element.symbol] / 100 for atom in struct.top.atoms], ndmin=2).T except KeyError as e: print(f"Atom symbol {e} in topology not recognized") raise else: radii[self.attach_id_mdtraj] = 0.0 try: sele = struct.top.select(self.mol_selection) except ValueError: print( "{} is not a valid expression, please see http://mdtraj.org/latest/atom_selection.html".format( self.mol_selection ) ) print('Falling back to "all"') sele = struct.top.select("all") finally: if self.attach_id_mdtraj not in sele: np.append(sele, self.attach_id_mdtraj) xyz = struct.xyz[frame_mdtraj] * 10 xyzr = np.hstack((xyz[sele], radii[sele])) return xyzr @property def attach_xyz(self): """Get coordinates of the dye attachment point Returns ------- ndarray one-dimensional array of x-,y-,z-coordinates of the attachment point Examples -------- >>> avobj.attach_xyz """ xyz = self.structure.xyz[self.frame_mdtraj][self.attach_id_mdtraj] * 10 return xyz.astype(np.float64)
[docs] def save_acv(self, filename, format="pdb", **kwargs): """Write accessible contact volume to file Parameters ---------- filename : str filename for ACV format : {'pdb', 'xyz', 'openDX'} file format for ACV **kwargs : dict key value pairs for .xyz files are {write_weights : bool, encode_element : bool} Notes ----- - `write_weights` includes the weights of each point as an additional column in the .xyz file - `encode_element` uses the element column (first col.) to encode the FV (=F) or CV (=C) Examples -------- >>> obj.save_acv('cloud.xyz', format='xyz', write_weights=False) """ if format == "xyz": try: write_weights = kwargs["write_weights"] except KeyError: write_weights = True try: encode_element = kwargs["encode_element"] except KeyError: encode_element = False file_str = export.xyz(self.acv.cloud_xyzqt, self.acv.mp, write_weights, encode_element) elif format == "open_dx": d_xyz = np.array([self.grid_spacing] * 3) xyz_min = self.av.originXYZ file_str = export.open_dx(self.acv.grid_3d, xyz_min, d_xyz) else: file_str = export.pdb(self.acv.cloud_xyzqt, self.acv.mp) with open(filename, "w") as fname: fname.write(file_str)
[docs] def calc_av(self, use_LabelLib): """Calculate the dye accessible volume Returns ------- LabelLib.Grid3D object containing a 3-dimensional array of grid points and additional attributes (see Notes) Notes ----- The attributes of the av LabelLib.Grid3D object are: - discStep : float (the grid spacing) - originXYZ : list (x-/y-/z-coordinate of the grid origin) - shape : list (number of grid points in x-/y-/z-direction) - grid : list (flattened list of grid point values) See Also -------- calc_acv : Calculate accessible contact volume References ---------- .. [3] Kalinin, S. et al. "A toolkit and benchmark study for FRET-restrained high-precision structural modeling", *Nat. Methods* **9**, 1218–1225 (2012). .. [4] Sindbert, S. et al. "Accurate distance determination of nucleic acids via Förster resonance energy transfer: implications of dye linker length and rigidity", *J. Am. Chem. Soc.* **133**, 2463–2480 (2011). Examples -------- >>> avobj.calc_av() """ mol_xyzr = self.mol_xyzr attach_xyz = self.attach_xyz if use_LabelLib and _LabelLib_found: if self.simulation_type == "AV1": av = ll.dyeDensityAV1( mol_xyzr.T, attach_xyz, self.linker_length, self.linker_width, self.dye_radii[0], self.grid_spacing ) else: av = ll.dyeDensityAV3( mol_xyzr.T, attach_xyz, self.linker_length, self.linker_width, self.dye_radii, self.grid_spacing ) else: av = grid.Grid3D( self.mol_xyzr, self.attach_xyz, self.linker_length, self.linker_width, self.dye_radii, self.grid_spacing, self.simulation_type, ) return av
[docs] def _dye_acc_surf(self): """Calculate dye accessible surface by padding the vdW radius with the thickness of the contact volume Returns ------- ndarray array with dimensions (5,n_atoms) of marked coordinates and padded vdW radii (n_atoms = number of atoms in mdtraj.Trajectory) """ cv_label = 2.0 das_marker = np.full(self.mol_xyzr.shape[0], cv_label) das_xyzrm = np.vstack([self.mol_xyzr.T, das_marker]) das_xyzrm[3] += self.cv_thickness return das_xyzrm
[docs] def calc_acv(self, use_LabelLib): """Partition the accessible volume into a free and a contact volume Returns ------- acv : fretraj.cloud.ACV Notes ----- The attributes from the LabelLib.Grid3D object are: - discStep : float (the grid spacing) - originXYZ : list (x-/y-/z-coordinate of the grid origin) - shape : list (number of grid points in x-/y-/z-direction) - grid : list (flattened list of grid point values) Additional attributes are: - grid_3d : ndarray([shape]) - n_gridpts : int - tag : ndarray([1,n_gridpts]) See Also -------- calc_av : Calculate accessible volume References ---------- .. [1] Steffen, F. D., Sigel, R. K. O. & Börner, R. "An atomistic view on carbocyanine photophysics in the realm of RNA", *Phys. Chem. Chem. Phys.* **18**, 29045–29055 (2016). .. [2] Dimura, M. et al. Quantitative FRET studies and integrative modeling unravel the structure and dynamics of biomolecular systems", *Curr. Opin. Struct. Biol.* **40**, 163–185 (2016). Examples -------- >>> avobj.calc_acv() """ das_xyzrm = self._dye_acc_surf() if use_LabelLib and _LabelLib_found: grid_acv = ll.addWeights(self.av, das_xyzrm) else: self.av.grid_3d = self.av.addWeights(das_xyzrm.T) self.av.grid = self.av.grid_3d.flatten(order="F") grid_acv = self.av acv = ACV(grid_acv, self.cv_thickness, self.cv_fraction, use_LabelLib=use_LabelLib) return acv
[docs] @staticmethod def mean_pos(cloud_xyzqt): """Calculate mean dye position of the accessible contact volume Parameters ---------- cloud_xyzq : ndarray array of x-,y-,z-coordinates and corresponding weights with a shape [n_gridpts(+), 4] Returns ------- ndarray mean position """ x = np.dot(cloud_xyzqt[:, 0], cloud_xyzqt[:, 3]) y = np.dot(cloud_xyzqt[:, 1], cloud_xyzqt[:, 3]) z = np.dot(cloud_xyzqt[:, 2], cloud_xyzqt[:, 3]) mp = np.array((x, y, z)) / cloud_xyzqt[:, 3].sum() return mp
[docs] @staticmethod def _weighted_quantile_1D(arr_1D, weights, quantile): """Compute the weighted quantile of a 1D-array Parameters ---------- arr_1D : ndarray one-dimensional array weights : ndarray weight of each element of the array quantile : float quantile to compute (median=0.5) Returns ------- quantile : float """ if (quantile > 1) or (quantile < 0): raise ValueError("Quantiles must be in the range [0, 1]") idx_sorted = np.argsort(arr_1D) arr_1D_sorted = arr_1D[idx_sorted] weights_sorted = weights[idx_sorted] weights_cs = np.cumsum(weights_sorted) xp = (weights_cs - (1 - quantile) * weights_sorted) / weights_cs[-1] return np.interp(quantile, xp, arr_1D_sorted)
[docs] def save_mp(self, filename, format="plain", units="A"): """Write mean dye position to a file Parameters ---------- filename : str filename for the mean position format : {'plain', 'json'} file format (plain .txt or .json) units : {'A', 'nm'}, optional='A' distance units ('A': Angstroms, 'nm': nanometers) Examples -------- >>> avobj.save_mp('mp.dat') """ if units == "nm": mp = self.acv.mp / 10 else: mp = self.acv.mp with open(filename, "w") as f: if format == "json": mp_dict = {"x": round(mp[0], 3), "y": round(mp[1], 3), "z": round(mp[2], 3)} json.dump(mp_dict, f) else: f.write("{:0.3f} {:0.3f} {:0.3f}\n".format(*mp))