""" FASTQ Processing Pipeline Quality control and preprocessing of sequencing data """ from pathlib import Path from typing import Dict, List, Optional import yaml import logging from Bio import SeqIO from Bio.SeqIO.QualityIO import FastqGeneralIterator logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class FASTQProcessor: """Process FASTQ sequencing files""" def __init__(self, config_path: str = "config.yml"): with open(config_path, 'r') as f: self.config = yaml.safe_load(f)['pipeline']['fastq'] self.quality_threshold = self.config['quality_threshold'] self.min_length = self.config['min_length'] self.output_dir = Path(self.config['output_dir']) self.output_dir.mkdir(parents=True, exist_ok=True) def quality_filter( self, input_file: Path, output_file: Optional[Path] = None ) -> Dict: """ Filter FASTQ reads by quality score Args: input_file: Input FASTQ file output_file: Output filtered FASTQ file Returns: Statistics dictionary """ if output_file is None: output_file = self.output_dir / f"{input_file.stem}_filtered.fastq" stats = { 'total_reads': 0, 'passed_reads': 0, 'failed_reads': 0, 'total_bases': 0, 'passed_bases': 0 } try: with open(input_file, 'r') as in_f, open(output_file, 'w') as out_f: for title, sequence, quality in FastqGeneralIterator(in_f): stats['total_reads'] += 1 stats['total_bases'] += len(sequence) # Calculate average quality score quality_scores = [ord(q) - 33 for q in quality] avg_quality = sum(quality_scores) / len(quality_scores) # Check filters if avg_quality >= self.quality_threshold and len(sequence) >= self.min_length: out_f.write(f"@{title}\n{sequence}\n+\n{quality}\n") stats['passed_reads'] += 1 stats['passed_bases'] += len(sequence) else: stats['failed_reads'] += 1 stats['pass_rate'] = stats['passed_reads'] / stats['total_reads'] if stats['total_reads'] > 0 else 0 logger.info(f"Filtered {input_file.name}: {stats['passed_reads']}/{stats['total_reads']} reads passed") return stats except Exception as e: logger.error(f"Error filtering FASTQ: {e}") return stats def trim_adapters( self, input_file: Path, adapter_sequence: str, output_file: Optional[Path] = None ) -> Path: """ Trim adapter sequences from reads Args: input_file: Input FASTQ file adapter_sequence: Adapter sequence to trim output_file: Output trimmed file """ if output_file is None: output_file = self.output_dir / f"{input_file.stem}_trimmed.fastq" trimmed_count = 0 try: with open(input_file, 'r') as in_f, open(output_file, 'w') as out_f: for title, sequence, quality in FastqGeneralIterator(in_f): # Find adapter adapter_pos = sequence.find(adapter_sequence) if adapter_pos != -1: # Trim at adapter position sequence = sequence[:adapter_pos] quality = quality[:adapter_pos] trimmed_count += 1 if len(sequence) >= self.min_length: out_f.write(f"@{title}\n{sequence}\n+\n{quality}\n") logger.info(f"Trimmed adapters from {trimmed_count} reads") return output_file except Exception as e: logger.error(f"Error trimming adapters: {e}") return input_file def calculate_statistics(self, fastq_file: Path) -> Dict: """ Calculate statistics for FASTQ file Returns: Dictionary with read count, length distribution, quality scores """ stats = { 'total_reads': 0, 'total_bases': 0, 'min_length': float('inf'), 'max_length': 0, 'avg_length': 0, 'avg_quality': 0, 'gc_content': 0 } lengths = [] qualities = [] gc_count = 0 try: with open(fastq_file, 'r') as f: for title, sequence, quality in FastqGeneralIterator(f): stats['total_reads'] += 1 seq_len = len(sequence) stats['total_bases'] += seq_len lengths.append(seq_len) stats['min_length'] = min(stats['min_length'], seq_len) stats['max_length'] = max(stats['max_length'], seq_len) # Quality scores quality_scores = [ord(q) - 33 for q in quality] qualities.extend(quality_scores) # GC content gc_count += sequence.count('G') + sequence.count('C') if stats['total_reads'] > 0: stats['avg_length'] = sum(lengths) / len(lengths) stats['avg_quality'] = sum(qualities) / len(qualities) stats['gc_content'] = (gc_count / stats['total_bases']) * 100 return stats except Exception as e: logger.error(f"Error calculating statistics: {e}") return stats def convert_to_fasta( self, input_file: Path, output_file: Optional[Path] = None ) -> Path: """Convert FASTQ to FASTA format""" if output_file is None: output_file = self.output_dir / f"{input_file.stem}.fasta" try: count = SeqIO.convert(str(input_file), "fastq", str(output_file), "fasta") logger.info(f"Converted {count} sequences to FASTA") return output_file except Exception as e: logger.error(f"Error converting to FASTA: {e}") return input_file class FASTQQualityControl: """Quality control analysis for FASTQ files""" def __init__(self): self.processor = FASTQProcessor() def run_qc(self, fastq_file: Path) -> Dict: """ Run comprehensive QC on FASTQ file Returns: QC report dictionary """ report = { 'file': str(fastq_file), 'statistics': {}, 'quality_check': 'PASS', 'warnings': [] } # Calculate statistics stats = self.processor.calculate_statistics(fastq_file) report['statistics'] = stats # Check for issues if stats['avg_quality'] < 20: report['warnings'].append('Low average quality score') report['quality_check'] = 'WARN' if stats['avg_length'] < 50: report['warnings'].append('Short average read length') report['quality_check'] = 'WARN' if stats['gc_content'] < 30 or stats['gc_content'] > 70: report['warnings'].append(f'Unusual GC content: {stats["gc_content"]:.1f}%') return report def generate_qc_report(self, fastq_files: List[Path]) -> Dict: """Generate QC report for multiple FASTQ files""" reports = {} for fastq_file in fastq_files: report = self.run_qc(fastq_file) reports[fastq_file.name] = report # Summary statistics summary = { 'total_files': len(fastq_files), 'passed': sum(1 for r in reports.values() if r['quality_check'] == 'PASS'), 'warnings': sum(1 for r in reports.values() if r['quality_check'] == 'WARN'), 'failed': sum(1 for r in reports.values() if r['quality_check'] == 'FAIL') } return { 'summary': summary, 'file_reports': reports }