|
|
"""
|
|
|
FASTQ Processing Pipeline
|
|
|
Quality control and preprocessing of sequencing data
|
|
|
"""
|
|
|
|
|
|
from pathlib import Path
|
|
|
from typing import Dict, List, Optional
|
|
|
import yaml
|
|
|
import logging
|
|
|
from Bio import SeqIO
|
|
|
from Bio.SeqIO.QualityIO import FastqGeneralIterator
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class FASTQProcessor:
|
|
|
"""Process FASTQ sequencing files"""
|
|
|
|
|
|
def __init__(self, config_path: str = "config.yml"):
|
|
|
with open(config_path, 'r') as f:
|
|
|
self.config = yaml.safe_load(f)['pipeline']['fastq']
|
|
|
|
|
|
self.quality_threshold = self.config['quality_threshold']
|
|
|
self.min_length = self.config['min_length']
|
|
|
self.output_dir = Path(self.config['output_dir'])
|
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def quality_filter(
|
|
|
self,
|
|
|
input_file: Path,
|
|
|
output_file: Optional[Path] = None
|
|
|
) -> Dict:
|
|
|
"""
|
|
|
Filter FASTQ reads by quality score
|
|
|
|
|
|
Args:
|
|
|
input_file: Input FASTQ file
|
|
|
output_file: Output filtered FASTQ file
|
|
|
|
|
|
Returns:
|
|
|
Statistics dictionary
|
|
|
"""
|
|
|
if output_file is None:
|
|
|
output_file = self.output_dir / f"{input_file.stem}_filtered.fastq"
|
|
|
|
|
|
stats = {
|
|
|
'total_reads': 0,
|
|
|
'passed_reads': 0,
|
|
|
'failed_reads': 0,
|
|
|
'total_bases': 0,
|
|
|
'passed_bases': 0
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
with open(input_file, 'r') as in_f, open(output_file, 'w') as out_f:
|
|
|
for title, sequence, quality in FastqGeneralIterator(in_f):
|
|
|
stats['total_reads'] += 1
|
|
|
stats['total_bases'] += len(sequence)
|
|
|
|
|
|
|
|
|
quality_scores = [ord(q) - 33 for q in quality]
|
|
|
avg_quality = sum(quality_scores) / len(quality_scores)
|
|
|
|
|
|
|
|
|
if avg_quality >= self.quality_threshold and len(sequence) >= self.min_length:
|
|
|
out_f.write(f"@{title}\n{sequence}\n+\n{quality}\n")
|
|
|
stats['passed_reads'] += 1
|
|
|
stats['passed_bases'] += len(sequence)
|
|
|
else:
|
|
|
stats['failed_reads'] += 1
|
|
|
|
|
|
stats['pass_rate'] = stats['passed_reads'] / stats['total_reads'] if stats['total_reads'] > 0 else 0
|
|
|
|
|
|
logger.info(f"Filtered {input_file.name}: {stats['passed_reads']}/{stats['total_reads']} reads passed")
|
|
|
return stats
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error filtering FASTQ: {e}")
|
|
|
return stats
|
|
|
|
|
|
def trim_adapters(
|
|
|
self,
|
|
|
input_file: Path,
|
|
|
adapter_sequence: str,
|
|
|
output_file: Optional[Path] = None
|
|
|
) -> Path:
|
|
|
"""
|
|
|
Trim adapter sequences from reads
|
|
|
|
|
|
Args:
|
|
|
input_file: Input FASTQ file
|
|
|
adapter_sequence: Adapter sequence to trim
|
|
|
output_file: Output trimmed file
|
|
|
"""
|
|
|
if output_file is None:
|
|
|
output_file = self.output_dir / f"{input_file.stem}_trimmed.fastq"
|
|
|
|
|
|
trimmed_count = 0
|
|
|
|
|
|
try:
|
|
|
with open(input_file, 'r') as in_f, open(output_file, 'w') as out_f:
|
|
|
for title, sequence, quality in FastqGeneralIterator(in_f):
|
|
|
|
|
|
adapter_pos = sequence.find(adapter_sequence)
|
|
|
|
|
|
if adapter_pos != -1:
|
|
|
|
|
|
sequence = sequence[:adapter_pos]
|
|
|
quality = quality[:adapter_pos]
|
|
|
trimmed_count += 1
|
|
|
|
|
|
if len(sequence) >= self.min_length:
|
|
|
out_f.write(f"@{title}\n{sequence}\n+\n{quality}\n")
|
|
|
|
|
|
logger.info(f"Trimmed adapters from {trimmed_count} reads")
|
|
|
return output_file
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error trimming adapters: {e}")
|
|
|
return input_file
|
|
|
|
|
|
def calculate_statistics(self, fastq_file: Path) -> Dict:
|
|
|
"""
|
|
|
Calculate statistics for FASTQ file
|
|
|
|
|
|
Returns:
|
|
|
Dictionary with read count, length distribution, quality scores
|
|
|
"""
|
|
|
stats = {
|
|
|
'total_reads': 0,
|
|
|
'total_bases': 0,
|
|
|
'min_length': float('inf'),
|
|
|
'max_length': 0,
|
|
|
'avg_length': 0,
|
|
|
'avg_quality': 0,
|
|
|
'gc_content': 0
|
|
|
}
|
|
|
|
|
|
lengths = []
|
|
|
qualities = []
|
|
|
gc_count = 0
|
|
|
|
|
|
try:
|
|
|
with open(fastq_file, 'r') as f:
|
|
|
for title, sequence, quality in FastqGeneralIterator(f):
|
|
|
stats['total_reads'] += 1
|
|
|
seq_len = len(sequence)
|
|
|
stats['total_bases'] += seq_len
|
|
|
|
|
|
lengths.append(seq_len)
|
|
|
stats['min_length'] = min(stats['min_length'], seq_len)
|
|
|
stats['max_length'] = max(stats['max_length'], seq_len)
|
|
|
|
|
|
|
|
|
quality_scores = [ord(q) - 33 for q in quality]
|
|
|
qualities.extend(quality_scores)
|
|
|
|
|
|
|
|
|
gc_count += sequence.count('G') + sequence.count('C')
|
|
|
|
|
|
if stats['total_reads'] > 0:
|
|
|
stats['avg_length'] = sum(lengths) / len(lengths)
|
|
|
stats['avg_quality'] = sum(qualities) / len(qualities)
|
|
|
stats['gc_content'] = (gc_count / stats['total_bases']) * 100
|
|
|
|
|
|
return stats
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error calculating statistics: {e}")
|
|
|
return stats
|
|
|
|
|
|
def convert_to_fasta(
|
|
|
self,
|
|
|
input_file: Path,
|
|
|
output_file: Optional[Path] = None
|
|
|
) -> Path:
|
|
|
"""Convert FASTQ to FASTA format"""
|
|
|
if output_file is None:
|
|
|
output_file = self.output_dir / f"{input_file.stem}.fasta"
|
|
|
|
|
|
try:
|
|
|
count = SeqIO.convert(str(input_file), "fastq", str(output_file), "fasta")
|
|
|
logger.info(f"Converted {count} sequences to FASTA")
|
|
|
return output_file
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error converting to FASTA: {e}")
|
|
|
return input_file
|
|
|
|
|
|
|
|
|
class FASTQQualityControl:
|
|
|
"""Quality control analysis for FASTQ files"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.processor = FASTQProcessor()
|
|
|
|
|
|
def run_qc(self, fastq_file: Path) -> Dict:
|
|
|
"""
|
|
|
Run comprehensive QC on FASTQ file
|
|
|
|
|
|
Returns:
|
|
|
QC report dictionary
|
|
|
"""
|
|
|
report = {
|
|
|
'file': str(fastq_file),
|
|
|
'statistics': {},
|
|
|
'quality_check': 'PASS',
|
|
|
'warnings': []
|
|
|
}
|
|
|
|
|
|
|
|
|
stats = self.processor.calculate_statistics(fastq_file)
|
|
|
report['statistics'] = stats
|
|
|
|
|
|
|
|
|
if stats['avg_quality'] < 20:
|
|
|
report['warnings'].append('Low average quality score')
|
|
|
report['quality_check'] = 'WARN'
|
|
|
|
|
|
if stats['avg_length'] < 50:
|
|
|
report['warnings'].append('Short average read length')
|
|
|
report['quality_check'] = 'WARN'
|
|
|
|
|
|
if stats['gc_content'] < 30 or stats['gc_content'] > 70:
|
|
|
report['warnings'].append(f'Unusual GC content: {stats["gc_content"]:.1f}%')
|
|
|
|
|
|
return report
|
|
|
|
|
|
def generate_qc_report(self, fastq_files: List[Path]) -> Dict:
|
|
|
"""Generate QC report for multiple FASTQ files"""
|
|
|
reports = {}
|
|
|
|
|
|
for fastq_file in fastq_files:
|
|
|
report = self.run_qc(fastq_file)
|
|
|
reports[fastq_file.name] = report
|
|
|
|
|
|
|
|
|
summary = {
|
|
|
'total_files': len(fastq_files),
|
|
|
'passed': sum(1 for r in reports.values() if r['quality_check'] == 'PASS'),
|
|
|
'warnings': sum(1 for r in reports.values() if r['quality_check'] == 'WARN'),
|
|
|
'failed': sum(1 for r in reports.values() if r['quality_check'] == 'FAIL')
|
|
|
}
|
|
|
|
|
|
return {
|
|
|
'summary': summary,
|
|
|
'file_reports': reports
|
|
|
}
|
|
|
|