Source code for pydna.assembly

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2013-2023 by Björn Johansson.  All rights reserved.
# This code is part of the Python-dna distribution and governed by its
# license.  Please see the LICENSE.txt file that should have been included
# as part of this package.

"""Assembly of sequences by homologous recombination.

Should also be useful for related techniques such as Gibson assembly and fusion
PCR. Given a list of sequences (Dseqrecords), all sequences are analyzed for
shared homology longer than the set limit.

A graph is constructed where each overlapping region form a node and
sequences separating the overlapping regions form edges.


::


                -- A --
    catgatctacgtatcgtgt     -- B --
                atcgtgtactgtcatattc
                            catattcaaagttct



    --x--> A --y--> B --z-->   (Graph)

    Nodes:

    A : atcgtgt
    B : catattc

    Edges:

    x : catgatctacgt
    y : actgt
    z : aaagttct


The NetworkX package is used to trace linear and circular paths through the
graph.
"""
import os
from Bio.SeqFeature import SeqFeature
from Bio.SeqFeature import ExactPosition
from Bio.SeqFeature import SimpleLocation
from Bio.SeqFeature import CompoundLocation
from pydna.utils import rc

from pydna._pretty import pretty_str as ps
from pydna.contig import Contig
from pydna.common_sub_strings import common_sub_strings, Match

from pydna.dseqrecord import Dseqrecord
import networkx as nx

from copy import deepcopy
from typing import Callable, Dict, List, NamedTuple, TypedDict
import itertools

from pydna.threading_timer_decorator_exit import exit_after


[docs] class Assembly(object): """Assembly of a list of linear DNA fragments into linear or circular constructs. The Assembly is meant to replace the Assembly method as it is easier to use. Accepts a list of Dseqrecords (source fragments) to initiate an Assembly object. Several methods are available for analysis of overlapping sequences, graph construction and assembly. Parameters ---------- fragments : list a list of Dseqrecord objects. limit : int, optional The shortest shared homology to be considered algorithm : function, optional The algorithm used to determine the shared sequences. max_nodes : int The maximum number of nodes in the graph. This can be tweaked to manage sequences with a high number of shared sub sequences. Examples -------- >>> from pydna.assembly import Assembly >>> from pydna.dseqrecord import Dseqrecord >>> a = Dseqrecord("acgatgctatactgCCCCCtgtgctgtgctcta") >>> b = Dseqrecord("tgtgctgtgctctaTTTTTtattctggctgtatc") >>> c = Dseqrecord("tattctggctgtatcGGGGGtacgatgctatactg") >>> x = Assembly((a,b,c), limit=14) >>> x Assembly fragments....: 33bp 34bp 35bp limit(bp)....: 14 G.nodes......: 6 algorithm....: common_sub_strings >>> x.assemble_circular() [Contig(o59), Contig(o59)] >>> x.assemble_circular()[0].seq.watson 'acgatgctatactgCCCCCtgtgctgtgctctaTTTTTtattctggctgtatcGGGGGt' """ def __init__( self, frags: List[Dseqrecord], limit: int = 25, algorithm: Callable[[str, str, int], List[Match]] = common_sub_strings, ) -> None: # Fragments is a string subclass with some extra properties # The order of the fragments has significance fragments: List[_FragmentDict] = [ { "upper": str(f.seq).upper(), "mixed": str(f.seq), "name": f.name, "features": f.features, "nodes": [], } for f in frags ] # rcfragments is a dict with fragments as keys and the reverse # complement as value rcfragments: Dict[str, _FragmentDict] = { f["mixed"]: { "upper": str(frc.seq).upper(), "mixed": str(frc.seq), "name": frc.name, "features": frc.features, "nodes": [], } for f, frc in zip(fragments, (f.rc() for f in frags)) } # The nodemap dict holds nodes and their reverse complements nodemap = { "begin": "end", "end": "begin", "begin_rc": "end_rc", "end_rc": "begin_rc", } # all combinations of fragments are compared. # see https://docs.python.org/3.10/library/itertools.html # itertools.combinations('ABCD', 2)--> AB AC AD BC BD CD for first, secnd in itertools.combinations(fragments, 2): if first["upper"] == secnd["upper"]: continue firrc = rcfragments[first["mixed"]] secrc = rcfragments[secnd["mixed"]] # matches is a list of tuples of three integers describing # overlapping sequences: # (start position in first, start position in secnd, length) # This comparison is done using uppercase strings, see _ # Fragment class matches = algorithm(first["upper"], secnd["upper"], limit) for start_in_first, start_in_secnd, length in matches: # node is a string and represent the shared sequence in upper # case. node = first["upper"][start_in_first : start_in_first + length] first["nodes"].append(_NodeTuple(start_in_first, length, node)) secnd["nodes"].append(_NodeTuple(start_in_secnd, length, node)) # The same node exists between the reverse complements of # first and secnd # The new positions are calculated from the length of the # fragment and # the overlapping sequence start_in_firrc = len(first["upper"]) - start_in_first - length start_in_secrc = len(secnd["upper"]) - start_in_secnd - length # noderc is the reverse complement of node noderc = firrc["upper"][start_in_firrc : start_in_firrc + length] firrc["nodes"].append(_NodeTuple(start_in_firrc, length, noderc)) secrc["nodes"].append(_NodeTuple(start_in_secrc, length, noderc)) nodemap[node] = noderc # first is also compared to the rc of secnd matches = algorithm(first["upper"], secrc["upper"], limit) for start_in_first, start_in_secrc, length in matches: node = first["upper"][start_in_first : start_in_first + length] first["nodes"].append(_NodeTuple(start_in_first, length, node)) secrc["nodes"].append(_NodeTuple(start_in_secrc, length, node)) start_in_firrc, start_in_secnd = ( len(first["upper"]) - start_in_first - length, len(secnd["upper"]) - start_in_secrc - length, ) noderc = firrc["upper"][start_in_firrc : start_in_firrc + length] firrc["nodes"].append(_NodeTuple(start_in_firrc, length, noderc)) secnd["nodes"].append(_NodeTuple(start_in_secnd, length, noderc)) nodemap[node] = noderc # A directed graph class that can store multiedges. # Multiedges are multiple edges between two nodes. Each edge can hold # optional data or attributes. # https://networkx.github.io/documentation/stable/reference/classes/ # multidigraph.html order = 0 G = nx.MultiDiGraph() # loop through all fragments their and reverse complements for f in fragments: f["nodes"] = sorted(set(f["nodes"])) for f in rcfragments.values(): f["nodes"] = sorted(set(f["nodes"])) for f in itertools.chain(fragments, rcfragments.values()): # nodes are sorted in place in the order of their position # duplicates are removed (same position and sequence) # along the fragment since nodes are a tuple (position(int), # sequence(str)) before = G.order() G.add_nodes_from( (node, {"order": order + od, "length": length}) for od, (start, length, node) in enumerate( n for n in f["nodes"] if n[2] not in G ) ) order += G.order() - before for (start1, length1, node1), ( start2, length2, node2, ) in itertools.combinations(f["nodes"], 2): feats = [ ft for ft in f["features"] if start1 <= ft.location.start and start2 + G.nodes[node2]["length"] >= ft.location.end ] # for feat in feats: # feat.location += -start1 G.add_edge( node1, node2, # nodes (strings) piece=slice(start1, start2), # slice features=feats, # features seq=f["mixed"], # mixed case string name=f["name"], ) # string self.G = nx.create_empty_copy(G) self.G.add_edges_from( sorted( G.edges(data=True), key=lambda t: len(t[2].get("seq", 1)), reverse=True ) ) self.nodemap = {**nodemap, **{nodemap[i]: i for i in nodemap}} self.limit = limit self.fragments = fragments self.rcfragments = rcfragments self.algorithm = algorithm @exit_after(int(os.getenv("pydna_assembly_limit", 10))) def assemble_linear(self, start=None, end=None, max_nodes=None): G = nx.MultiDiGraph(self.G) G.add_nodes_from(["begin", "begin_rc", "end", "end_rc"], length=0) # add edges from "begin" to nodes in the first # sequence in self.fragments firstfragment = self.fragments[0] for start, length, node in firstfragment["nodes"][::-1]: G.add_edge( "begin", node, piece=slice(0, start), features=[ f for f in firstfragment["features"] if start + length >= f.location.end ], seq=firstfragment["mixed"], name=firstfragment["name"], ) # add edges from "begin_rc" to nodes in the reverse complement of the # first sequence firstfragmentrc = self.rcfragments[firstfragment["mixed"]] for start, length, node in firstfragmentrc["nodes"][::-1]: G.add_edge( "begin_rc", node, piece=slice(0, start), features=[ f for f in firstfragmentrc["features"] if start + length >= f.location.end ], seq=firstfragmentrc["mixed"], name=firstfragmentrc["name"], ) # add edges from nodes in last sequence to "end" lastfragment = self.fragments[-1] for start, length, node in lastfragment["nodes"]: G.add_edge( node, "end", piece=slice(start, len(lastfragment["mixed"])), features=[ f for f in lastfragment["features"] if start <= f.location.start ], seq=lastfragment["mixed"], name=lastfragment["name"], ) # breakpoint() # add edges from nodes in last reverse complement sequence to "end_rc" lastfragmentrc = self.rcfragments[lastfragment["mixed"]] for start, length, node in lastfragmentrc["nodes"]: G.add_edge( node, "end_rc", piece=slice(start, len(lastfragmentrc["mixed"])), features=[ f for f in lastfragmentrc["features"] if start <= f.location.start ], seq=lastfragmentrc["mixed"], name=lastfragmentrc["name"], ) max_nodes = max_nodes or len(self.fragments) linearpaths = list( itertools.chain( nx.all_simple_paths(nx.DiGraph(G), "begin", "end", cutoff=max_nodes), nx.all_simple_paths(nx.DiGraph(G), "begin", "end_rc", cutoff=max_nodes), nx.all_simple_paths(nx.DiGraph(G), "begin_rc", "end", cutoff=max_nodes), nx.all_simple_paths( nx.DiGraph(G), "begin_rc", "end_rc", cutoff=max_nodes ), ) ) lps = {} for lp in linearpaths: edgelol = [] for u, v in zip(lp, lp[1:]): e = [] for d in G[u][v].values(): e.append((u, v, d)) edgelol.append(e) for edges in itertools.product(*edgelol): # TODO explain if [ True for ((u, v, e), (x, y, z)) in zip(edges, edges[1:]) if ((e["seq"], e["piece"].stop) == (z["seq"], z["piece"].start)) ]: continue ct = "".join(e["seq"][e["piece"]] for u, v, e in edges) key = ct.upper() if key in lps: continue # TODO: is this test needed? sg = nx.DiGraph() sg.add_edges_from(edges) sg.add_nodes_from((n, d) for n, d in G.nodes(data=True) if n in lp) edgefeatures = [] offset = 0 for u, v, e in edges: feats = deepcopy(e["features"]) for f in feats: f.location += offset - e["piece"].start edgefeatures.extend(feats) offset += e["piece"].stop - e["piece"].start lps[key] = ct, edgefeatures, sg, {n: self.nodemap[n] for n in lp} return sorted( ( Contig.from_string( lp[0], features=lp[1], graph=lp[2], nodemap=lp[3], linear=True, circular=False, ) for lp in lps.values() ), key=len, reverse=True, ) @exit_after(int(os.getenv("pydna_assembly_limit", 10))) def assemble_circular(self, length_bound=None): cps = {} # circular assembly cpsrc = {} cpaths = sorted(nx.simple_cycles(self.G, length_bound=length_bound), key=len) cpaths_sorted = [] for cpath in cpaths: order, node = min((self.G.nodes[node]["order"], node) for node in cpath) i = cpath.index(node) cpaths_sorted.append((order, cpath[i:] + cpath[:i])) cpaths_sorted.sort() for ( _, cp, ) in ( cpaths_sorted ): # cpaths is a list of nodes representing a circular assembly edgelol = [] # edgelol is a list of lists of all edges along cp cp += cp[0:1] for u, v in zip(cp, cp[1:]): e = [] for d in self.G[u][v].values(): e.append((u, v, d)) edgelol.append(e) for edges in itertools.product(*edgelol): if [ True for ((u, v, e), (x, y, z)) in zip(edges, edges[1:]) if ((e["seq"], e["piece"].stop) == (z["seq"], z["piece"].start)) ]: continue ct = "".join(e["seq"][e["piece"]] for u, v, e in edges) key = ct.upper() if key in cps or key in cpsrc: continue # TODO: is test in cpsrc needed? sg = nx.DiGraph() sg.add_edges_from(edges) sg.add_nodes_from((n, d) for n, d in self.G.nodes(data=True) if n in cp) edgefeatures = [] offset = 0 for u, v, e in edges: feats = deepcopy(e["features"]) for feat in feats: feat.location += offset edgefeatures.extend(feats) offset += e["piece"].stop - e["piece"].start for f in edgefeatures: if f.location.start > len(ct) and f.location.end > len(ct): f.location += -len(ct) elif f.location.end > len(ct): f.location = CompoundLocation( ( SimpleLocation( f.location.start, ExactPosition(len(ct)) ), SimpleLocation( ExactPosition(0), f.location.end - len(ct) ), ) ) cps[key] = cpsrc[rc(key)] = ( ct, edgefeatures, sg, {n: self.nodemap[n] for n in cp[:-1]}, cp, ) return sorted( ( Contig.from_string( cp[0], features=cp[1], graph=cp[2], nodemap=cp[3], linear=False, circular=True, ) for cp in cps.values() ), key=len, reverse=True, ) def __repr__(self) -> ps: # https://pyformat.info return ps( "Assembly\n" "fragments..: {sequences}\n" "limit(bp)..: {limit}\n" "G.nodes....: {nodes}\n" "algorithm..: {al}".format( sequences=" ".join( "{}bp".format(len(x["mixed"])) for x in self.fragments ), limit=self.limit, nodes=self.G.order(), al=self.algorithm.__name__, ) )
example_fragments = ( Dseqrecord("AacgatCAtgctcc", name="a"), Dseqrecord("TtgctccTAAattctgc", name="b"), Dseqrecord("CattctgcGAGGacgatG", name="c"), ) linear_results = ( Dseqrecord("AacgatCAtgctccTAAattctgcGAGGacgatG", name="abc"), Dseqrecord("ggagcaTGatcgtCCTCgcagaatG", name="ac_rc"), Dseqrecord("AacgatG", name="ac"), ) circular_results = ( Dseqrecord("acgatCAtgctccTAAattctgcGAGG", name="abc", circular=True), Dseqrecord("ggagcaTGatcgtCCTCgcagaatTTA", name="abc_rc", circular=True), ) class _NodeTuple(NamedTuple): start: int length: int shared_seq: str # uppercase class _FragmentDict(TypedDict): upper: str mixed: str name: str features: List[SeqFeature] nodes: List[_NodeTuple]