Spaces:
Sleeping
Sleeping
| import sys, json, os, glob, re, requests | |
| from PIL import Image | |
| from io import BytesIO | |
| from datetime import datetime | |
| from contextlib import redirect_stdout, redirect_stderr | |
| # Immediately redirect all output to stderr except for our final JSON | |
| original_stdout = sys.stdout | |
| sys.stdout = sys.stderr | |
| # Suppress all PaddleOCR output | |
| os.environ['PADDLEOCR_LOG_LEVEL'] = 'ERROR' | |
| os.environ['QT_QPA_PLATFORM'] = 'offscreen' | |
| os.environ['DISPLAY'] = ':99' | |
| # Import PaddleOCR after setting environment variables | |
| from paddleocr import PaddleOCR | |
| def dprint(msg, obj=None): | |
| try: | |
| print(f"DEBUG: {msg}" + (f": {obj}" if obj is not None else ""), file=sys.stderr) | |
| except Exception: | |
| pass | |
| def clean_cache(): | |
| files = ['temp_image.jpg', 'temp_image_ocr_res_img.jpg', 'temp_image_preprocessed_img.jpg', 'temp_image_res.json'] | |
| for f in files: | |
| if os.path.exists(f): | |
| os.remove(f) | |
| dprint("Removed cache file", f) | |
| if os.path.exists("output"): | |
| import shutil | |
| shutil.rmtree("output") | |
| dprint("Removed output directory") | |
| def download_image(url, output_path='temp_image.jpg'): | |
| dprint("Starting download", url) | |
| clean_cache() | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| r = requests.get(url, headers=headers, timeout=30) | |
| dprint("HTTP status", r.status_code) | |
| r.raise_for_status() | |
| img = Image.open(BytesIO(r.content)) | |
| if img.mode == 'RGBA': | |
| bg = Image.new('RGB', img.size, (255, 255, 255)) | |
| bg.paste(img, mask=img.split()[-1]) | |
| img = bg | |
| elif img.mode != 'RGB': | |
| img = img.convert('RGB') | |
| img.save(output_path, 'JPEG', quality=95) | |
| dprint("Saved image", output_path) | |
| return output_path | |
| def cap_words(s): | |
| return None if not s else ' '.join(w.capitalize() for w in s.split()) | |
| def normalize_full_name(s): | |
| if not s: | |
| return None | |
| raw = ' '.join(s.split()) | |
| if ',' in raw: | |
| parts = [p.strip() for p in raw.split(',')] | |
| last = parts[0] if parts else '' | |
| given_block = ','.join(parts[1:]).strip() if len(parts) > 1 else '' | |
| tokens = [t for t in given_block.split(' ') if t] | |
| first = tokens[0] if tokens else '' | |
| second = tokens[1] if len(tokens) > 1 else '' | |
| if second: | |
| return f"{first} {second} {last}".strip() | |
| else: | |
| return f"{first} {last}".strip() | |
| else: | |
| tokens = [t for t in raw.split(' ') if t] | |
| if len(tokens) >= 3: | |
| return f"{tokens[0]} {tokens[1]} {tokens[-1]}".strip() | |
| elif len(tokens) == 2: | |
| return f"{tokens[0]} {tokens[1]}".strip() | |
| else: | |
| return raw | |
| def extract_sss_number(text): | |
| sss_pattern = r'\b(\d{2}-\d{7}-\d{1})\b' | |
| match = re.search(sss_pattern, text) | |
| if match: | |
| return match.group(1) | |
| return None | |
| def extract_sss_info(lines): | |
| dprint("Lines to extract", lines) | |
| full_name = None | |
| sss_number = None | |
| sss_id_number = None | |
| name_parts = [] | |
| L = [str(x or '').strip() for x in lines] | |
| i = 0 | |
| while i < len(L): | |
| line = L[i] | |
| low = line.lower() | |
| dprint("Line", {"i": i, "text": line}) | |
| # Look for SSS number pattern (XX-XXXXXXX-X) | |
| if not sss_number: | |
| sss_num = extract_sss_number(line) | |
| if sss_num: | |
| sss_number = sss_num | |
| sss_id_number = sss_num | |
| dprint("Found SSS number", sss_number) | |
| # Collect potential name parts (single words that look like names) | |
| if re.search(r'^[A-Z]{2,}$', line) and not re.search(r'[0-9]', line): | |
| skip_words = ['REPUBLIC', 'PHILIPPINES', 'SOCIAL', 'SECURITY', 'SYSTEM', 'SSS', 'PRESIDENT', 'PROUD', 'FILIPINO', 'CORAZON'] | |
| if not any(word in line.upper() for word in skip_words): | |
| name_parts.append(line) | |
| dprint("Added name part", line) | |
| # Look for multi-word names but don't set full_name yet | |
| if len(line.split()) >= 2 and re.search(r'[A-Z]{2,}', line) and not re.search(r'[0-9]{3,}', line): | |
| skip_words = ['REPUBLIC', 'PHILIPPINES', 'SOCIAL', 'SECURITY', 'SYSTEM', 'SSS', 'PRESIDENT', 'PROUD', 'FILIPINO'] | |
| if not any(word in line.upper() for word in skip_words): | |
| # Add multi-word names to name_parts instead of setting full_name directly | |
| name_parts.append(line) | |
| dprint("Added multi-word name part", line) | |
| i += 1 | |
| # Now compose the full name from all collected parts | |
| if name_parts: | |
| # Combine all name parts | |
| combined_name = ' '.join(name_parts) | |
| full_name = normalize_full_name(combined_name) | |
| dprint("Composed name from all parts", {"parts": name_parts, "result": full_name}) | |
| result = { | |
| "id_type": "SSS ID", | |
| "sss_number": sss_number, | |
| "id_number": sss_id_number, | |
| "full_name": full_name, | |
| "birth_date": None, | |
| "address": None, | |
| "sex": None, | |
| "nationality": "Filipino" | |
| } | |
| dprint("Final result", result) | |
| return result | |
| def extract_ocr_lines(image_path): | |
| os.makedirs("output", exist_ok=True) | |
| dprint("Initializing PaddleOCR") | |
| with redirect_stdout(sys.stderr), redirect_stderr(sys.stderr): | |
| ocr = PaddleOCR( | |
| use_doc_orientation_classify=False, | |
| use_doc_unwarping=False, | |
| use_textline_orientation=False, | |
| lang='en' | |
| ) | |
| dprint("OCR initialized") | |
| dprint("Running OCR predict", image_path) | |
| try: | |
| results = ocr.predict(image_path) | |
| except Exception as e: | |
| dprint("predict() failed, trying ocr()", str(e)) | |
| if hasattr(ocr, 'ocr'): | |
| results = ocr.ocr(image_path) | |
| else: | |
| results = None | |
| try: | |
| count = len(results[0]) if results and isinstance(results, list) and len(results) > 0 and isinstance(results[0], list) else (len(results) if results else 0) | |
| except Exception: | |
| count = 0 | |
| dprint("OCR done, results_count", count) | |
| # Process OCR results - handle both old format (list) and new format (OCRResult object) | |
| all_text = [] | |
| try: | |
| # Handle both old format (list) and new format (OCRResult object) | |
| if results and isinstance(results, list) and len(results) > 0: | |
| first_item = results[0] | |
| item_type_name = type(first_item).__name__ | |
| is_ocr_result = 'OCRResult' in item_type_name or 'ocr_result' in str(type(first_item)).lower() | |
| if is_ocr_result: | |
| dprint("Detected OCRResult object format", f"type: {item_type_name}") | |
| # Access OCRResult as dictionary | |
| try: | |
| if hasattr(first_item, 'keys'): | |
| ocr_dict = dict(first_item) | |
| # Look for rec_texts key | |
| if 'rec_texts' in ocr_dict: | |
| rec_texts = ocr_dict['rec_texts'] | |
| if isinstance(rec_texts, list): | |
| all_text = [str(t) for t in rec_texts if t] | |
| dprint("Extracted text lines from rec_texts", len(all_text)) | |
| except Exception as e: | |
| dprint("Error accessing OCRResult", str(e)) | |
| else: | |
| # Old format - list of lists | |
| lines = results[0] if results and isinstance(results[0], list) else results | |
| for item in lines: | |
| if isinstance(item, (list, tuple)) and len(item) >= 2: | |
| meta = item[1] | |
| if isinstance(meta, (list, tuple)) and len(meta) >= 1: | |
| all_text.append(str(meta[0])) | |
| except Exception as e: | |
| dprint("Error processing OCR results", str(e)) | |
| import traceback | |
| dprint("Traceback", traceback.format_exc()) | |
| dprint("All direct texts", all_text) | |
| return extract_sss_info(all_text) if all_text else { | |
| "id_type": "SSS ID", | |
| "sss_number": None, | |
| "id_number": None, | |
| "full_name": None, | |
| "birth_date": None | |
| } | |
| if len(sys.argv) < 2: | |
| sys.stdout = original_stdout | |
| print(json.dumps({"error": "No image URL provided"})) | |
| sys.exit(1) | |
| image_url = sys.argv[1] | |
| dprint("Processing image URL", image_url) | |
| try: | |
| image_path = download_image(image_url) | |
| dprint("Image downloaded to", image_path) | |
| ocr_results = extract_ocr_lines(image_path) | |
| dprint("OCR results ready") | |
| # Restore stdout and print only the JSON response | |
| sys.stdout = original_stdout | |
| sys.stdout.write(json.dumps({"success": True, "ocr_results": ocr_results})) | |
| sys.stdout.flush() | |
| except Exception as e: | |
| dprint("Exception", str(e)) | |
| # Restore stdout for error JSON | |
| sys.stdout = original_stdout | |
| sys.stdout.write(json.dumps({"error": str(e)})) | |
| sys.stdout.flush() | |
| sys.exit(1) | |
| finally: | |
| # Clean up | |
| try: | |
| clean_cache() | |
| except: | |
| pass |