|
|
"""
|
|
|
BLAST Integration
|
|
|
Sequence alignment and homology searching
|
|
|
"""
|
|
|
|
|
|
from pathlib import Path
|
|
|
from typing import Dict, List, Optional
|
|
|
import subprocess
|
|
|
import yaml
|
|
|
import logging
|
|
|
from Bio import SeqIO
|
|
|
from Bio.Blast import NCBIXML
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class BLASTRunner:
|
|
|
"""Run BLAST searches for sequence alignment"""
|
|
|
|
|
|
def __init__(self, config_path: str = "config.yml"):
|
|
|
with open(config_path, 'r') as f:
|
|
|
self.config = yaml.safe_load(f)['pipeline']['blast']
|
|
|
|
|
|
self.database = self.config.get('database', 'nt')
|
|
|
self.evalue = self.config.get('evalue', 0.001)
|
|
|
self.num_threads = self.config.get('num_threads', 4)
|
|
|
self.output_dir = Path(self.config['output_dir'])
|
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def run_blastn(
|
|
|
self,
|
|
|
query_file: Path,
|
|
|
output_file: Optional[Path] = None,
|
|
|
max_targets: int = 10
|
|
|
) -> Optional[Path]:
|
|
|
"""
|
|
|
Run BLASTN for nucleotide sequences
|
|
|
|
|
|
Args:
|
|
|
query_file: Input FASTA file with query sequences
|
|
|
output_file: Output XML file
|
|
|
max_targets: Maximum number of target sequences
|
|
|
|
|
|
Returns:
|
|
|
Path to output file or None if failed
|
|
|
"""
|
|
|
if output_file is None:
|
|
|
output_file = self.output_dir / f"{query_file.stem}_blastn.xml"
|
|
|
|
|
|
cmd = [
|
|
|
'blastn',
|
|
|
'-query', str(query_file),
|
|
|
'-db', self.database,
|
|
|
'-out', str(output_file),
|
|
|
'-evalue', str(self.evalue),
|
|
|
'-num_threads', str(self.num_threads),
|
|
|
'-max_target_seqs', str(max_targets),
|
|
|
'-outfmt', '5'
|
|
|
]
|
|
|
|
|
|
try:
|
|
|
logger.info(f"Running BLASTN on {query_file.name}")
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
|
|
logger.info(f"BLASTN completed: {output_file}")
|
|
|
return output_file
|
|
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
|
logger.error(f"BLASTN failed: {e.stderr}")
|
|
|
return None
|
|
|
except FileNotFoundError:
|
|
|
logger.warning("BLASTN not found - creating simulated results")
|
|
|
return self._simulate_blast_results(query_file, output_file)
|
|
|
|
|
|
def run_blastp(
|
|
|
self,
|
|
|
query_file: Path,
|
|
|
output_file: Optional[Path] = None,
|
|
|
max_targets: int = 10
|
|
|
) -> Optional[Path]:
|
|
|
"""
|
|
|
Run BLASTP for protein sequences
|
|
|
|
|
|
Args:
|
|
|
query_file: Input FASTA file with protein sequences
|
|
|
output_file: Output XML file
|
|
|
max_targets: Maximum number of target sequences
|
|
|
"""
|
|
|
if output_file is None:
|
|
|
output_file = self.output_dir / f"{query_file.stem}_blastp.xml"
|
|
|
|
|
|
cmd = [
|
|
|
'blastp',
|
|
|
'-query', str(query_file),
|
|
|
'-db', 'nr',
|
|
|
'-out', str(output_file),
|
|
|
'-evalue', str(self.evalue),
|
|
|
'-num_threads', str(self.num_threads),
|
|
|
'-max_target_seqs', str(max_targets),
|
|
|
'-outfmt', '5'
|
|
|
]
|
|
|
|
|
|
try:
|
|
|
logger.info(f"Running BLASTP on {query_file.name}")
|
|
|
subprocess.run(cmd, capture_output=True, text=True, check=True)
|
|
|
logger.info(f"BLASTP completed: {output_file}")
|
|
|
return output_file
|
|
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
|
logger.error(f"BLASTP failed: {e.stderr}")
|
|
|
return None
|
|
|
except FileNotFoundError:
|
|
|
logger.warning("BLASTP not found - creating simulated results")
|
|
|
return self._simulate_blast_results(query_file, output_file)
|
|
|
|
|
|
def _simulate_blast_results(self, query_file: Path, output_file: Path) -> Path:
|
|
|
"""Create simulated BLAST results for demo purposes"""
|
|
|
with open(output_file, 'w') as f:
|
|
|
f.write("""<?xml version="1.0"?>
|
|
|
<!DOCTYPE BlastOutput PUBLIC "-//NCBI//NCBI BlastOutput/EN" "http://www.ncbi.nlm.nih.gov/dtd/NCBI_BlastOutput.dtd">
|
|
|
<BlastOutput>
|
|
|
<BlastOutput_program>blastn</BlastOutput_program>
|
|
|
<BlastOutput_version>BLASTN 2.14.0+</BlastOutput_version>
|
|
|
<BlastOutput_reference>Simulated results for demo</BlastOutput_reference>
|
|
|
<BlastOutput_db>nt</BlastOutput_db>
|
|
|
<BlastOutput_query-ID>Query_1</BlastOutput_query-ID>
|
|
|
<BlastOutput_query-def>Sample sequence</BlastOutput_query-def>
|
|
|
<BlastOutput_query-len>100</BlastOutput_query-len>
|
|
|
<BlastOutput_iterations>
|
|
|
<Iteration>
|
|
|
<Iteration_iter-num>1</Iteration_iter-num>
|
|
|
<Iteration_query-ID>Query_1</Iteration_query-ID>
|
|
|
<Iteration_query-def>Sample sequence</Iteration_query-def>
|
|
|
<Iteration_query-len>100</Iteration_query-len>
|
|
|
<Iteration_hits>
|
|
|
</Iteration_hits>
|
|
|
</Iteration>
|
|
|
</BlastOutput_iterations>
|
|
|
</BlastOutput>
|
|
|
""")
|
|
|
return output_file
|
|
|
|
|
|
def parse_results(self, blast_output: Path) -> List[Dict]:
|
|
|
"""
|
|
|
Parse BLAST XML output
|
|
|
|
|
|
Returns:
|
|
|
List of hit dictionaries
|
|
|
"""
|
|
|
hits = []
|
|
|
|
|
|
try:
|
|
|
with open(blast_output, 'r') as f:
|
|
|
blast_records = NCBIXML.parse(f)
|
|
|
|
|
|
for blast_record in blast_records:
|
|
|
for alignment in blast_record.alignments:
|
|
|
for hsp in alignment.hsps:
|
|
|
hit = {
|
|
|
'query': blast_record.query,
|
|
|
'hit_id': alignment.hit_id,
|
|
|
'hit_def': alignment.hit_def,
|
|
|
'length': alignment.length,
|
|
|
'e_value': hsp.expect,
|
|
|
'score': hsp.score,
|
|
|
'identities': hsp.identities,
|
|
|
'positives': hsp.positives,
|
|
|
'gaps': hsp.gaps,
|
|
|
'query_start': hsp.query_start,
|
|
|
'query_end': hsp.query_end,
|
|
|
'hit_start': hsp.sbjct_start,
|
|
|
'hit_end': hsp.sbjct_end,
|
|
|
'alignment_length': hsp.align_length
|
|
|
}
|
|
|
hits.append(hit)
|
|
|
|
|
|
logger.info(f"Parsed {len(hits)} BLAST hits")
|
|
|
return hits
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error parsing BLAST results: {e}")
|
|
|
return []
|
|
|
|
|
|
def filter_hits(
|
|
|
self,
|
|
|
hits: List[Dict],
|
|
|
min_identity: float = 0.9,
|
|
|
max_evalue: float = 0.001
|
|
|
) -> List[Dict]:
|
|
|
"""
|
|
|
Filter BLAST hits by identity and e-value
|
|
|
|
|
|
Args:
|
|
|
hits: List of hit dictionaries
|
|
|
min_identity: Minimum identity percentage (0-1)
|
|
|
max_evalue: Maximum e-value threshold
|
|
|
"""
|
|
|
filtered = []
|
|
|
|
|
|
for hit in hits:
|
|
|
identity_pct = hit['identities'] / hit['alignment_length']
|
|
|
|
|
|
if identity_pct >= min_identity and hit['e_value'] <= max_evalue:
|
|
|
hit['identity_pct'] = identity_pct
|
|
|
filtered.append(hit)
|
|
|
|
|
|
logger.info(f"Filtered to {len(filtered)} high-quality hits")
|
|
|
return filtered
|
|
|
|
|
|
|
|
|
class SequenceAligner:
|
|
|
"""Sequence alignment utilities"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.blast_runner = BLASTRunner()
|
|
|
|
|
|
def align_to_reference(
|
|
|
self,
|
|
|
query_sequences: Path,
|
|
|
reference_db: str = 'nt'
|
|
|
) -> Dict:
|
|
|
"""
|
|
|
Align query sequences to reference database
|
|
|
|
|
|
Returns:
|
|
|
Alignment results and statistics
|
|
|
"""
|
|
|
|
|
|
blast_output = self.blast_runner.run_blastn(query_sequences)
|
|
|
|
|
|
if blast_output is None:
|
|
|
return {'error': 'BLAST search failed'}
|
|
|
|
|
|
|
|
|
hits = self.blast_runner.parse_results(blast_output)
|
|
|
|
|
|
|
|
|
stats = {
|
|
|
'total_queries': 0,
|
|
|
'queries_with_hits': 0,
|
|
|
'total_hits': len(hits),
|
|
|
'avg_identity': 0,
|
|
|
'avg_evalue': 0
|
|
|
}
|
|
|
|
|
|
if hits:
|
|
|
stats['avg_identity'] = sum(h.get('identity_pct', 0) for h in hits) / len(hits)
|
|
|
stats['avg_evalue'] = sum(h['e_value'] for h in hits) / len(hits)
|
|
|
|
|
|
return {
|
|
|
'statistics': stats,
|
|
|
'hits': hits,
|
|
|
'output_file': str(blast_output)
|
|
|
}
|
|
|
|
|
|
def find_homologs(
|
|
|
self,
|
|
|
sequence_file: Path,
|
|
|
min_identity: float = 0.8
|
|
|
) -> List[Dict]:
|
|
|
"""
|
|
|
Find homologous sequences
|
|
|
|
|
|
Args:
|
|
|
sequence_file: Input FASTA file
|
|
|
min_identity: Minimum identity threshold
|
|
|
"""
|
|
|
blast_output = self.blast_runner.run_blastn(sequence_file)
|
|
|
|
|
|
if blast_output:
|
|
|
hits = self.blast_runner.parse_results(blast_output)
|
|
|
return self.blast_runner.filter_hits(hits, min_identity=min_identity)
|
|
|
|
|
|
return []
|
|
|
|