CancerAtHomeV2 / backend /pipeline /fastq_processor.py
Mentors4EDU's picture
Upload 33 files
7a92197 verified
"""
FASTQ Processing Pipeline
Quality control and preprocessing of sequencing data
"""
from pathlib import Path
from typing import Dict, List, Optional
import yaml
import logging
from Bio import SeqIO
from Bio.SeqIO.QualityIO import FastqGeneralIterator
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class FASTQProcessor:
"""Process FASTQ sequencing files"""
def __init__(self, config_path: str = "config.yml"):
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)['pipeline']['fastq']
self.quality_threshold = self.config['quality_threshold']
self.min_length = self.config['min_length']
self.output_dir = Path(self.config['output_dir'])
self.output_dir.mkdir(parents=True, exist_ok=True)
def quality_filter(
self,
input_file: Path,
output_file: Optional[Path] = None
) -> Dict:
"""
Filter FASTQ reads by quality score
Args:
input_file: Input FASTQ file
output_file: Output filtered FASTQ file
Returns:
Statistics dictionary
"""
if output_file is None:
output_file = self.output_dir / f"{input_file.stem}_filtered.fastq"
stats = {
'total_reads': 0,
'passed_reads': 0,
'failed_reads': 0,
'total_bases': 0,
'passed_bases': 0
}
try:
with open(input_file, 'r') as in_f, open(output_file, 'w') as out_f:
for title, sequence, quality in FastqGeneralIterator(in_f):
stats['total_reads'] += 1
stats['total_bases'] += len(sequence)
# Calculate average quality score
quality_scores = [ord(q) - 33 for q in quality]
avg_quality = sum(quality_scores) / len(quality_scores)
# Check filters
if avg_quality >= self.quality_threshold and len(sequence) >= self.min_length:
out_f.write(f"@{title}\n{sequence}\n+\n{quality}\n")
stats['passed_reads'] += 1
stats['passed_bases'] += len(sequence)
else:
stats['failed_reads'] += 1
stats['pass_rate'] = stats['passed_reads'] / stats['total_reads'] if stats['total_reads'] > 0 else 0
logger.info(f"Filtered {input_file.name}: {stats['passed_reads']}/{stats['total_reads']} reads passed")
return stats
except Exception as e:
logger.error(f"Error filtering FASTQ: {e}")
return stats
def trim_adapters(
self,
input_file: Path,
adapter_sequence: str,
output_file: Optional[Path] = None
) -> Path:
"""
Trim adapter sequences from reads
Args:
input_file: Input FASTQ file
adapter_sequence: Adapter sequence to trim
output_file: Output trimmed file
"""
if output_file is None:
output_file = self.output_dir / f"{input_file.stem}_trimmed.fastq"
trimmed_count = 0
try:
with open(input_file, 'r') as in_f, open(output_file, 'w') as out_f:
for title, sequence, quality in FastqGeneralIterator(in_f):
# Find adapter
adapter_pos = sequence.find(adapter_sequence)
if adapter_pos != -1:
# Trim at adapter position
sequence = sequence[:adapter_pos]
quality = quality[:adapter_pos]
trimmed_count += 1
if len(sequence) >= self.min_length:
out_f.write(f"@{title}\n{sequence}\n+\n{quality}\n")
logger.info(f"Trimmed adapters from {trimmed_count} reads")
return output_file
except Exception as e:
logger.error(f"Error trimming adapters: {e}")
return input_file
def calculate_statistics(self, fastq_file: Path) -> Dict:
"""
Calculate statistics for FASTQ file
Returns:
Dictionary with read count, length distribution, quality scores
"""
stats = {
'total_reads': 0,
'total_bases': 0,
'min_length': float('inf'),
'max_length': 0,
'avg_length': 0,
'avg_quality': 0,
'gc_content': 0
}
lengths = []
qualities = []
gc_count = 0
try:
with open(fastq_file, 'r') as f:
for title, sequence, quality in FastqGeneralIterator(f):
stats['total_reads'] += 1
seq_len = len(sequence)
stats['total_bases'] += seq_len
lengths.append(seq_len)
stats['min_length'] = min(stats['min_length'], seq_len)
stats['max_length'] = max(stats['max_length'], seq_len)
# Quality scores
quality_scores = [ord(q) - 33 for q in quality]
qualities.extend(quality_scores)
# GC content
gc_count += sequence.count('G') + sequence.count('C')
if stats['total_reads'] > 0:
stats['avg_length'] = sum(lengths) / len(lengths)
stats['avg_quality'] = sum(qualities) / len(qualities)
stats['gc_content'] = (gc_count / stats['total_bases']) * 100
return stats
except Exception as e:
logger.error(f"Error calculating statistics: {e}")
return stats
def convert_to_fasta(
self,
input_file: Path,
output_file: Optional[Path] = None
) -> Path:
"""Convert FASTQ to FASTA format"""
if output_file is None:
output_file = self.output_dir / f"{input_file.stem}.fasta"
try:
count = SeqIO.convert(str(input_file), "fastq", str(output_file), "fasta")
logger.info(f"Converted {count} sequences to FASTA")
return output_file
except Exception as e:
logger.error(f"Error converting to FASTA: {e}")
return input_file
class FASTQQualityControl:
"""Quality control analysis for FASTQ files"""
def __init__(self):
self.processor = FASTQProcessor()
def run_qc(self, fastq_file: Path) -> Dict:
"""
Run comprehensive QC on FASTQ file
Returns:
QC report dictionary
"""
report = {
'file': str(fastq_file),
'statistics': {},
'quality_check': 'PASS',
'warnings': []
}
# Calculate statistics
stats = self.processor.calculate_statistics(fastq_file)
report['statistics'] = stats
# Check for issues
if stats['avg_quality'] < 20:
report['warnings'].append('Low average quality score')
report['quality_check'] = 'WARN'
if stats['avg_length'] < 50:
report['warnings'].append('Short average read length')
report['quality_check'] = 'WARN'
if stats['gc_content'] < 30 or stats['gc_content'] > 70:
report['warnings'].append(f'Unusual GC content: {stats["gc_content"]:.1f}%')
return report
def generate_qc_report(self, fastq_files: List[Path]) -> Dict:
"""Generate QC report for multiple FASTQ files"""
reports = {}
for fastq_file in fastq_files:
report = self.run_qc(fastq_file)
reports[fastq_file.name] = report
# Summary statistics
summary = {
'total_files': len(fastq_files),
'passed': sum(1 for r in reports.values() if r['quality_check'] == 'PASS'),
'warnings': sum(1 for r in reports.values() if r['quality_check'] == 'WARN'),
'failed': sum(1 for r in reports.values() if r['quality_check'] == 'FAIL')
}
return {
'summary': summary,
'file_reports': reports
}