# TO BE UPDATED WITH CTXCORE
import io
import logging
import numpy as np
import os
import pandas as pd
import pyranges as pr
import subprocess
import sys
import ray
from typing import Optional, Union
from typing import List, Iterable, Tuple, Dict
[docs]def cluster_buster(cbust_path: str,
path_to_motifs: str,
region_sets: Union[Dict[str, pr.PyRanges], Dict[str, List]] = None,
path_to_genome_fasta: str = None,
path_to_regions_fasta: str = None,
n_cpu: Optional[int] = 1,
motifs: Optional[List[str]] = None,
verbose: Optional[bool] = False,
**kwargs):
"""
Add motif annotation
Parameters
---------
cluster_buster_path: str
Path to cluster buster bin.
path_to_motifs: str, optional.
Path to motif collection folder (in .cb format). Only required if using a shuffled background.
region_sets: Dict
A dictionary of PyRanges containing region coordinates for the regions to be analyzed. Only required
if `path_to_regions_fasta` is not provided.
path_to_genome_fasta: str, optional.
Path to genome fasta file. Only required if `path_to_regions_fasta` is not provided. Default: None
path_to_regions_fasta: str, optional.
Path to regions fasta file. Only required if `path_to_genome_fasta` is not provided. Default: None
n_cpu: int, optional
Number of cores to use
motifs: List, optional
Names of the motif files to use (from `path_to_motifs`). Default: None (All)
verbose: bool, optional
Whether to print progress to screen
**kwargs:
Additional parameters to pass to `ray.init()`
References
---------
Frith, Martin C., Michael C. Li, and Zhiping Weng. "Cluster-Buster: Finding dense clusters of motifs
in DNA sequences." Nucleic acids research 31, no. 13 (2003): 3666-3668.
"""
# Create logger
level = logging.INFO
format = '%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
handlers = [logging.StreamHandler(stream=sys.stdout)]
logging.basicConfig(level = level, format = format, handlers = handlers)
log = logging.getLogger('Cluster-Buster')
# Generate fasta file
if path_to_regions_fasta is None:
path_to_regions_fasta = os.path.join(outdir,'regions.fa')
if not os.path.exists(path_to_regions_fasta):
log.info('Getting sequences')
pr_regions_names_dict = {key: pyranges2names(region_sets[key]) for key in region_sets.keys()}
pr_sequence_list = [pd.DataFrame([region_sets[key], pr.get_fasta(region_sets[key], path_to_genome_fasta).tolist()], index=['Name', 'Sequence'], columns=region_sets[key]) for key in region_sets.keys()]
seq_df = pd.concat(pr_sequence_list, axis=1)
seq_df = seq_df.loc[:,~seq_df.columns.duplicated()]
seq_df.T.to_csv(path_to_regions_fasta, header=False, index=False, sep='\n')
sequence_names = [seq[1:] for seq in seq_df.columns]
else:
sequence_names = get_sequence_names_from_fasta(path_to_regions_fasta)
# Get motifs and sequence name
if motifs is None:
motifs = os.listdir(path_to_motifs)
motifs = grep(motifs, '.cb')
log.info('Scoring sequences')
ray.init(num_cpus=n_cpu, **kwargs)
crm_scores = ray.get([run_cluster_buster_for_motif.remote(cbust_path, path_to_regions_fasta, path_to_motifs+motifs[i], motifs[i], i, len(motifs), verbose) for i in range(len(motifs))])
ray.shutdown()
crm_df = pd.concat(crm_scores, axis=1, sort=False).fillna(0).T
# Remove .cb from motifs names
crm_df.index = [x.replace('.cb','') for x in crm_df.index.tolist()]
log.info('Done!')
return crm_df
# Utils
@ray.remote
def run_cluster_buster_for_motif(cluster_buster_path: str,
fasta_filename: str,
motif_filename: str,
motif_name: str,
i: int,
nr_motifs: int,
verbose: Optional[bool] = False):
"""
Ray method to run cluster buster for one motif
"""
# Create logger
level = logging.INFO
format = '%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
handlers = [logging.StreamHandler(stream=sys.stdout)]
logging.basicConfig(level = level, format = format, handlers = handlers)
log = logging.getLogger('Cluster-Buster')
if verbose == True:
log.info('Scoring motif ' + str(i) + ' out of ' + str(nr_motifs) + ' motifs')
# Score each region in FASTA file with Cluster-Buster
# for motif and get top CRM score for each region.
clusterbuster_command = [cluster_buster_path,
'-f', '4',
'-c', '0.0',
'-r', '10000',
'-t', '1',
'-l', #Mask repeats
motif_filename,
fasta_filename]
try:
pid = subprocess.Popen(args=clusterbuster_command,
bufsize=1,
executable=None,
stdin=None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=None,
close_fds=False,
shell=False,
cwd=None,
env=None,
universal_newlines=False,
startupinfo=None,
creationflags=0)
stdout_data, stderr_data = pid.communicate()
except OSError as msg:
print("\nExecution error for: '" + ' '.join(clusterbuster_command) + "': " + str(msg),
file=sys.stderr)
sys.exit(1)
if pid.returncode != 0:
print("\nError: Non-zero exit status for: " + ' '.join(clusterbuster_command) + "'",
file=sys.stderr)
sys.exit(1)
crm_scores_df = pd.read_csv(
filepath_or_buffer=io.BytesIO(stdout_data),
sep='\t',
header=0,
names=['motifs', 'crm_score'],
index_col='motifs',
usecols=['motifs','crm_score'],
dtype={'crm_score': np.float32},
engine='c'
)
crm_scores_df.columns=[motif_name]
return crm_scores_df
# Utils functions for Cluster-buster
[docs]def get_sequence_names_from_fasta(fasta_filename: str):
"""
Retrieve sequence names from fasta
"""
sequence_names_list = list()
sequence_names_set = set()
duplicated_sequences = False
with open(fasta_filename, 'r') as fh:
for line in fh:
if line.startswith('>'):
# Get sequence name by getting everything after '>' up till the first whitespace.
sequence_name = line[1:].split(maxsplit=1)[0]
# Check if all sequence names only appear once.
if sequence_name in sequence_names_set:
print(
'Error: Sequence name "{0:s}" is not unique in FASTA file "{1:s}".'.format(
sequence_name,
fasta_filename
),
file=sys.stderr
)
duplicated_sequences = True
sequence_names_list.append(sequence_name)
sequence_names_set.add(sequence_name)
if duplicated_sequences:
sys.exit(1)
return sequence_names_list
[docs]def pyranges2names(regions: pr.PyRanges):
"""
Convert pyranges to sequence name (fasta format)
"""
return ['>'+str(chrom) + ":" + str(start) + '-' + str(end) for chrom, start, end in zip(list(regions.Chromosome), list(regions.Start), list(regions.End))]
[docs]def grep(l: List,
s: str):
"""
Helper for grep
"""
return [i for i in l if s in i]