Spaces:
Sleeping
Sleeping
| import sys, json, os, glob, re, requests | |
| from PIL import Image | |
| from io import BytesIO | |
| from datetime import datetime | |
| from contextlib import redirect_stdout, redirect_stderr | |
| # Immediately redirect all output to stderr except for our final JSON | |
| original_stdout = sys.stdout | |
| sys.stdout = sys.stderr | |
| # Suppress all PaddleOCR output | |
| os.environ['PADDLEOCR_LOG_LEVEL'] = 'ERROR' | |
| os.environ['QT_QPA_PLATFORM'] = 'offscreen' | |
| os.environ['DISPLAY'] = ':99' | |
| # Import PaddleOCR after setting environment variables | |
| from paddleocr import PaddleOCR | |
| def dprint(msg, obj=None): | |
| try: | |
| print(f"DEBUG: {msg}" + (f": {obj}" if obj is not None else ""), file=sys.stderr) | |
| except Exception: | |
| pass | |
| def clean_cache(): | |
| files = ['temp_image.jpg', 'temp_image_ocr_res_img.jpg', 'temp_image_preprocessed_img.jpg', 'temp_image_res.json'] | |
| for f in files: | |
| if os.path.exists(f): | |
| os.remove(f) | |
| dprint("Removed cache file", f) | |
| if os.path.exists("output"): | |
| import shutil | |
| shutil.rmtree("output") | |
| dprint("Removed output directory") | |
| def download_image(url, output_path='temp_image.jpg'): | |
| dprint("Starting download", url) | |
| clean_cache() | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| r = requests.get(url, headers=headers, timeout=30) | |
| dprint("HTTP status", r.status_code) | |
| r.raise_for_status() | |
| img = Image.open(BytesIO(r.content)) | |
| if img.mode == 'RGBA': | |
| bg = Image.new('RGB', img.size, (255, 255, 255)) | |
| bg.paste(img, mask=img.split()[-1]) | |
| img = bg | |
| elif img.mode != 'RGB': | |
| img = img.convert('RGB') | |
| img.save(output_path, 'JPEG', quality=95) | |
| dprint("Saved image", output_path) | |
| return output_path | |
| def format_date(s): | |
| if not s: | |
| return None | |
| raw = str(s).strip() | |
| t = raw.replace(' ', '').replace('\\','/').replace('.','/') | |
| # 1960/01/28 or 1960-01-28 | |
| if re.match(r'^\d{4}[-/]\d{2}[-/]\d{2}$', t): | |
| return t.replace('/', '-') | |
| # 01/28/1960 | |
| if re.match(r'^\d{2}/\d{2}/\d{4}$', raw): | |
| m, d, y = raw.split('/') | |
| return f"{y}-{int(m):02d}-{int(d):02d}" | |
| # Month name variants | |
| try: | |
| return datetime.strptime(raw, "%B %d, %Y").strftime("%Y-%m-%d") | |
| except Exception: | |
| pass | |
| try: | |
| return datetime.strptime(raw, "%b %d, %Y").strftime("%Y-%m-%d") | |
| except Exception: | |
| pass | |
| return raw | |
| def cap_words(s): | |
| return None if not s else ' '.join(w.capitalize() for w in s.split()) | |
| def take_within(lines, i, k=5): | |
| out = [] | |
| for j in range(1, k+1): | |
| if i+j < len(lines): | |
| t = str(lines[i+j]).strip() | |
| if t: | |
| out.append(t) | |
| return out | |
| def is_crn_text(t): | |
| # UMID CRN like 0028-1215160-9 or 002812151609 | |
| z = str(t).strip() | |
| return bool(re.match(r'^\d{4}-\d{7}-\d$', z)) or bool(re.match(r'^\d{12,13}$', z)) | |
| def normalize_name(last, given): | |
| last = (last or '').strip() | |
| tokens = [t for t in (given or '').strip().split(' ') if t] | |
| kept = tokens[:2] # First [+Second], ignore middles beyond that | |
| name = ' '.join(kept + [last]).strip() | |
| return cap_words(name) if name else None | |
| def glue_address(lines, start_idx): | |
| parts = [] | |
| stop_labels = ['crn', 'surname', 'given', 'middle', 'sex', 'date', 'birth'] | |
| for k in range(1, 6): # take up to 5 lines after ADDRESS | |
| idx = start_idx + k | |
| if idx >= len(lines): | |
| break | |
| t = str(lines[idx]).strip() | |
| if not t: | |
| continue | |
| low = t.lower() | |
| if any(lbl in low for lbl in stop_labels): | |
| break | |
| parts.append(t) | |
| # collapse extra spaces and commas | |
| address = ', '.join(parts) | |
| address = re.sub(r'\s{2,}', ' ', address) | |
| address = address.replace(' ,', ',') | |
| return address or None | |
| def extract_umid_info(lines): | |
| dprint("Lines to extract", lines) | |
| crn = None | |
| full_name = None | |
| birth_date = None | |
| sex = None | |
| address = None | |
| last_name_txt = None | |
| given_name_txt = None | |
| L = [str(x or '').strip() for x in lines] | |
| i = 0 | |
| while i < len(L): | |
| line = L[i] | |
| low = line.lower() | |
| dprint("Line", {"i": i, "text": line}) | |
| # CRN | |
| crn_candidate = extract_crn_from_text(line) | |
| if crn is None and crn_candidate: | |
| crn = crn_candidate | |
| dprint("Found CRN", crn) | |
| elif crn is None and i+1 < len(L): | |
| crn_candidate = extract_crn_from_text(L[i+1]) | |
| if crn_candidate: | |
| crn = crn_candidate | |
| dprint("Found CRN (next)", crn) | |
| # Surname / Given Name / Middle Name (ignore middle) | |
| if 'surname' in low: | |
| ahead = take_within(L, i, 3) | |
| for t in ahead: | |
| tl = t.lower() | |
| if not any(k in tl for k in ['given', 'middle', 'sex', 'date', 'birth', 'address', 'crn']): | |
| last_name_txt = t | |
| dprint("Captured last_name", last_name_txt) | |
| break | |
| if 'given name' in low or 'given' in low: | |
| if i+1 < len(L): | |
| # sometimes value is same line (rare), often next line | |
| val = L[i+1] if L[i+1] else line.replace('given name', '').strip() | |
| given_name_txt = val if val else None | |
| dprint("Captured given_name", given_name_txt) | |
| # Sex and Date of Birth (handles same-line case like "SEXM DATEOFBIRTH 196O/O1/28") | |
| if 'sex' in low: | |
| # sex inline | |
| if re.search(r'\bF(EMALE)?\b', line, flags=re.I): sex = 'F' | |
| if re.search(r'\bM(ALE)?\b', line, flags=re.I): sex = 'M' | |
| # lookahead fallback | |
| if sex is None: | |
| for t in take_within(L, i, 2): | |
| tt = t.strip().upper() | |
| if tt in ('M','F','MALE','FEMALE'): | |
| sex = 'M' if tt.startswith('M') else 'F' | |
| break | |
| # Date of Birth inline or in next tokens | |
| if 'date' in low and 'birth' in low: | |
| # inline date | |
| m = re.search(r'(\d[0-9OIl]{3}[/\-][0-1OIl]\d[/\-]\d{2,4})', line) | |
| if m: | |
| birth_date = format_date(normalize_digits(m.group(1))) | |
| dprint("Found birth_date (inline)", birth_date) | |
| if birth_date is None: | |
| for t in take_within(L, i, 3): | |
| # fix digits then try | |
| cand = normalize_digits(t) | |
| if re.search(r'\d', cand) and format_date(cand): | |
| birth_date = format_date(cand) | |
| dprint("Found birth_date (lookahead)", birth_date) | |
| break | |
| # Also catch standalone yyyy/mm/dd line | |
| if birth_date is None and re.match(r'^\d{4}[-/]\d{2}[-/]\d{2}$', normalize_digits(line)): | |
| birth_date = format_date(normalize_digits(line)) | |
| dprint("Found standalone birth_date", birth_date) | |
| # Address block | |
| if 'address' in low and address is None: | |
| address = glue_address(L, i) | |
| dprint("Found address", address) | |
| i += 1 | |
| # Compose final name | |
| if full_name is None: | |
| full_name = normalize_name(last_name_txt, given_name_txt) | |
| dprint("Composed full_name", {"last": last_name_txt, "given": given_name_txt, "full": full_name}) | |
| result = { | |
| "id_type": "UMID", | |
| "crn": crn, | |
| "id_number": crn, # frontend expects this | |
| "full_name": full_name, | |
| "birth_date": birth_date, | |
| "sex": sex, | |
| "address": address | |
| } | |
| dprint("Final result", result) | |
| return result | |
| def extract_ocr_lines(image_path): | |
| os.makedirs("output", exist_ok=True) | |
| dprint("Initializing PaddleOCR") | |
| with redirect_stdout(sys.stderr), redirect_stderr(sys.stderr): | |
| ocr = PaddleOCR( | |
| use_doc_orientation_classify=False, | |
| use_doc_unwarping=False, | |
| use_textline_orientation=False, | |
| lang='en' | |
| ) | |
| dprint("OCR initialized") | |
| dprint("Running OCR predict", image_path) | |
| try: | |
| results = ocr.predict(image_path) | |
| except Exception as e: | |
| dprint("predict() failed, trying ocr()", str(e)) | |
| if hasattr(ocr, 'ocr'): | |
| results = ocr.ocr(image_path) | |
| else: | |
| results = None | |
| try: | |
| count = len(results[0]) if results and isinstance(results, list) and len(results) > 0 and isinstance(results[0], list) else (len(results) if results else 0) | |
| except Exception: | |
| count = 0 | |
| dprint("OCR done, results_count", count) | |
| # Process OCR results - handle both old format (list) and new format (OCRResult object) | |
| all_text = [] | |
| try: | |
| # Handle both old format (list) and new format (OCRResult object) | |
| if results and isinstance(results, list) and len(results) > 0: | |
| first_item = results[0] | |
| item_type_name = type(first_item).__name__ | |
| is_ocr_result = 'OCRResult' in item_type_name or 'ocr_result' in str(type(first_item)).lower() | |
| if is_ocr_result: | |
| dprint("Detected OCRResult object format", f"type: {item_type_name}") | |
| # Access OCRResult as dictionary | |
| try: | |
| if hasattr(first_item, 'keys'): | |
| ocr_dict = dict(first_item) | |
| # Look for rec_texts key | |
| if 'rec_texts' in ocr_dict: | |
| rec_texts = ocr_dict['rec_texts'] | |
| if isinstance(rec_texts, list): | |
| all_text = [str(t) for t in rec_texts if t] | |
| dprint("Extracted text lines from rec_texts", len(all_text)) | |
| except Exception as e: | |
| dprint("Error accessing OCRResult", str(e)) | |
| else: | |
| # Old format - list of lists | |
| lines = results[0] if results and isinstance(results[0], list) else results | |
| for item in lines: | |
| if isinstance(item, (list, tuple)) and len(item) >= 2: | |
| meta = item[1] | |
| if isinstance(meta, (list, tuple)) and len(meta) >= 1: | |
| all_text.append(str(meta[0])) | |
| except Exception as e: | |
| dprint("Error processing OCR results", str(e)) | |
| import traceback | |
| dprint("Traceback", traceback.format_exc()) | |
| dprint("All direct texts", all_text) | |
| return extract_umid_info(all_text) if all_text else { | |
| "id_type": "UMID", | |
| "crn": None, | |
| "id_number": None, | |
| "full_name": None, | |
| "birth_date": None | |
| } | |
| def normalize_digits(s): | |
| # Fix common OCR digit confusions: O→0, o→0, I/l→1, S→5, B→8 | |
| return ( | |
| str(s) | |
| .replace('O','0').replace('o','0') | |
| .replace('I','1').replace('l','1') | |
| .replace('S','5') | |
| .replace('B','8') | |
| ) | |
| def extract_crn_from_text(t): | |
| # Accept "CRN-0028-1215160-9" or plain digits/hyphens | |
| m = re.search(r'crn[^0-9]*([0-9OIl\-]{10,})', t, flags=re.IGNORECASE) | |
| if m: | |
| val = normalize_digits(m.group(1)) | |
| # Keep hyphens; also accept compact digits | |
| if re.match(r'^\d{4}-\d{7}-\d$', val) or re.match(r'^\d{12,13}$', val): | |
| return val | |
| # Or whole token is the number | |
| val = normalize_digits(t.strip()) | |
| if re.match(r'^\d{4}-\d{7}-\d$', val) or re.match(r'^\d{12,13}$', val): | |
| return val | |
| return None | |
| if len(sys.argv) < 2: | |
| sys.stdout = original_stdout | |
| print(json.dumps({"error": "No image URL provided"})) | |
| sys.exit(1) | |
| image_url = sys.argv[1] | |
| dprint("Processing image URL", image_url) | |
| try: | |
| image_path = download_image(image_url) | |
| dprint("Image downloaded to", image_path) | |
| ocr_results = extract_ocr_lines(image_path) | |
| dprint("OCR results ready") | |
| # Restore stdout and print only the JSON response | |
| sys.stdout = original_stdout | |
| sys.stdout.write(json.dumps({"success": True, "ocr_results": ocr_results})) | |
| sys.stdout.flush() | |
| except Exception as e: | |
| dprint("Exception", str(e)) | |
| # Restore stdout for error JSON | |
| sys.stdout = original_stdout | |
| sys.stdout.write(json.dumps({"error": str(e)})) | |
| sys.stdout.flush() | |
| sys.exit(1) | |
| finally: | |
| # Clean up | |
| try: | |
| clean_cache() | |
| except: | |
| pass | |