Spaces:
Sleeping
Sleeping
| import sys, json, os, glob, re, requests | |
| from PIL import Image | |
| from io import BytesIO | |
| from datetime import datetime | |
| from contextlib import redirect_stdout, redirect_stderr | |
| # Immediately redirect all output to stderr except for our final JSON | |
| original_stdout = sys.stdout | |
| sys.stdout = sys.stderr | |
| # Suppress all PaddleOCR output | |
| os.environ['PADDLEOCR_LOG_LEVEL'] = 'ERROR' | |
| os.environ['QT_QPA_PLATFORM'] = 'offscreen' | |
| os.environ['DISPLAY'] = ':99' | |
| # Import PaddleOCR after setting environment variables | |
| from paddleocr import PaddleOCR | |
| def dprint(msg, obj=None): | |
| try: | |
| print(f"DEBUG: {msg}" + (f": {obj}" if obj is not None else ""), file=sys.stderr) | |
| except Exception: | |
| pass | |
| def clean_cache(): | |
| cache_files = ['temp_image.jpg', 'temp_image_ocr_res_img.jpg', 'temp_image_preprocessed_img.jpg', 'temp_image_res.json'] | |
| for f in cache_files: | |
| if os.path.exists(f): | |
| os.remove(f) | |
| dprint("Removed cache file", f) | |
| if os.path.exists("output"): | |
| import shutil | |
| shutil.rmtree("output") | |
| dprint("Removed output directory") | |
| def download_image(url, output_path='temp_image.jpg'): | |
| dprint("Starting download", url) | |
| clean_cache() | |
| r = requests.get(url) | |
| dprint("HTTP status", r.status_code) | |
| r.raise_for_status() | |
| img = Image.open(BytesIO(r.content)) | |
| if img.mode == 'RGBA': | |
| bg = Image.new('RGB', img.size, (255,255,255)) | |
| bg.paste(img, mask=img.split()[-1]) | |
| img = bg | |
| elif img.mode != 'RGB': | |
| img = img.convert('RGB') | |
| img.save(output_path, 'JPEG', quality=95) | |
| dprint("Saved image", output_path) | |
| return output_path | |
| def format_date(s): | |
| if not s: return None | |
| raw = s.strip() | |
| # Handle formats like "july 22,2022" (no space after comma) | |
| raw = raw.replace(',', ', ') | |
| t = raw.replace(' ', '').replace('\\','/').replace('.','/') | |
| if re.match(r'^\d{4}[-/]\d{2}[-/]\d{2}$', t): | |
| return t.replace('/', '-') | |
| # Accept mm/dd/yyyy style | |
| if re.match(r'^\d{2}/\d{2}/\d{4}$', raw): | |
| m, d, y = raw.split('/') | |
| return f"{y}-{int(m):02d}-{int(d):02d}" | |
| # Month name variants - try different formats | |
| date_formats = [ | |
| "%B %d, %Y", # July 22, 2022 | |
| "%b %d, %Y", # Jul 22, 2022 | |
| "%B %d %Y", # July 22 2022 | |
| "%b %d %Y", # Jul 22 2022 | |
| ] | |
| for fmt in date_formats: | |
| try: | |
| return datetime.strptime(raw.replace(' ', ' '), fmt).strftime("%Y-%m-%d") | |
| except Exception: | |
| continue | |
| return raw | |
| def cap_words(name): | |
| return None if not name else ' '.join(w.capitalize() for w in name.split()) | |
| def normalize_name_from_parts(last, first_block): | |
| last = (last or '').strip() | |
| tokens = [t for t in (first_block or '').strip().split(' ') if t] | |
| given_kept = tokens[:2] # keep up to two given names | |
| composed = ' '.join(given_kept + [last]).strip() | |
| return cap_words(composed) if composed else None | |
| def take_within(lines, i, k=5): | |
| out = [] | |
| for j in range(1, k+1): | |
| if i+j < len(lines): | |
| t = str(lines[i+j]).strip() | |
| if t: | |
| out.append(t) | |
| return out | |
| def extract_number_from_text(text): | |
| # Remove all non-digit characters and return the result | |
| return ''.join(c for c in text if c.isdigit()) | |
| def extract_tesda_info(lines): | |
| dprint("Lines to extract", lines) | |
| certificate_number = None | |
| uli_number = None | |
| cln_nq_number = None | |
| full_name = None | |
| # Collect name pieces | |
| last_name_txt = None | |
| first_name_txt = None | |
| # Initialize other variables | |
| issued_date = None | |
| valid_until = None | |
| qualification = None | |
| qualification_level = None | |
| L = [str(x or '').strip() for x in lines] | |
| i = 0 | |
| while i < len(L): | |
| line = L[i] | |
| low = line.lower() | |
| dprint("Line", {"i": i, "text": line}) | |
| # Certificate Number - more flexible pattern matching | |
| if certificate_number is None: | |
| # Try different patterns | |
| cert_patterns = [ | |
| r'certificate\s*no\.?\s*(\d{14})', | |
| r'certificate\s*number[:\s]*(\d{14})', | |
| r'cert\s*no\.?\s*(\d{14})', | |
| r'(\d{14})' | |
| ] | |
| for pattern in cert_patterns: | |
| match = re.search(pattern, low) | |
| if match: | |
| certificate_number = match.group(1) | |
| dprint("Found certificate number", certificate_number) | |
| break | |
| # If not found in current line, check next lines | |
| if not certificate_number: | |
| ahead = take_within(L, i, 3) | |
| for t in ahead: | |
| # Look for 14-digit number | |
| nums = extract_number_from_text(t) | |
| if len(nums) == 14: | |
| certificate_number = nums | |
| dprint("Found certificate number in next lines", certificate_number) | |
| break | |
| # ULI Number | |
| if uli_number is None and ('uli' in low or 'ops-' in low): | |
| uli_pattern = r'(?:uli:?)?\s*([a-zA-Z]{3}-\d{2}-\d{3}-\d{5}-\d{3})' | |
| match = re.search(uli_pattern, low, re.IGNORECASE) | |
| if match: | |
| uli_number = match.group(1).upper() | |
| dprint("Found ULI number", uli_number) | |
| # CLN-NQ Number | |
| if cln_nq_number is None and ('cln' in low or 'nq' in low): | |
| cln_pattern = r'(?:cln-nq-?)?(\d{7})' | |
| match = re.search(cln_pattern, low) | |
| if match: | |
| cln_nq_number = match.group(1) | |
| dprint("Found CLN-NQ number", cln_nq_number) | |
| # Name appears after "is awarded to" | |
| if 'awarded to' in low: | |
| ahead = take_within(L, i, 3) | |
| for t in ahead: | |
| if t and not any(k in t.lower() for k in ['awarded', 'certificate', 'valid', 'for having']): | |
| # Clean up the name - remove periods and fix spacing | |
| cleaned_name = t.replace('.', ' ').replace(' ', ' ').strip() | |
| full_name = cap_words(cleaned_name) | |
| # Try to split into components | |
| parts = full_name.split() | |
| if len(parts) >= 2: | |
| last_name_txt = parts[-1] | |
| first_name_txt = ' '.join(parts[:-1]) | |
| dprint("Found full name", full_name) | |
| break | |
| # Qualification Level | |
| if qualification_level is None and ('national certificate' in low or 'nc' in low): | |
| qualification_level = cap_words(line) | |
| dprint("Found qualification level", qualification_level) | |
| # Qualification/Specialization | |
| if qualification is None and ('in' in low and len(line.split()) > 1): | |
| if i+1 < len(L): | |
| qualification = cap_words(L[i+1]) | |
| dprint("Found qualification", qualification) | |
| # Issued Date | |
| if issued_date is None and ('issued' in low): | |
| date_pattern = r'(?:issued\s*(?:on|:)?\s*)?([A-Za-z]+\s+\d{1,2},?\s*\d{4})' | |
| match = re.search(date_pattern, low) | |
| if match: | |
| issued_date = format_date(match.group(1)) | |
| dprint("Found issued date", issued_date) | |
| # Valid Until Date | |
| if valid_until is None and ('valid' in low): | |
| date_pattern = r'(?:valid\s*until\s*)?([A-Za-z]+\s+\d{1,2},?\s*\d{4})' | |
| match = re.search(date_pattern, low) | |
| if match: | |
| valid_until = format_date(match.group(1)) | |
| dprint("Found valid until date", valid_until) | |
| i += 1 | |
| # Compose name at the end | |
| if full_name is None: | |
| full_name = normalize_name_from_parts(last_name_txt, first_name_txt) | |
| # Get first and last 4 digits of certificate number if available | |
| cert_first_four = certificate_number[:4] if certificate_number else None | |
| cert_last_four = certificate_number[-4:] if certificate_number else None | |
| result = { | |
| "id_type": "tesda", | |
| "certificate_number": certificate_number, | |
| "cert_first_four": cert_first_four, | |
| "cert_last_four": cert_last_four, | |
| "uli_number": uli_number, | |
| "cln_nq_number": cln_nq_number, | |
| "full_name": full_name, | |
| "first_name": first_name_txt, | |
| "last_name": last_name_txt, | |
| "qualification_level": qualification_level, | |
| "qualification": qualification, | |
| "issued_date": issued_date, | |
| "valid_until": valid_until | |
| } | |
| dprint("Final result", result) | |
| return result | |
| def extract_ocr_lines(image_path): | |
| os.makedirs("output", exist_ok=True) | |
| dprint("Initializing PaddleOCR") | |
| # Redirect both stdout and stderr during PaddleOCR operations | |
| with redirect_stdout(sys.stderr), redirect_stderr(sys.stderr): | |
| ocr = PaddleOCR( | |
| use_doc_orientation_classify=False, | |
| use_doc_unwarping=False, | |
| use_textline_orientation=False, | |
| lang='en', | |
| show_log=False | |
| ) | |
| dprint("OCR initialized") | |
| dprint("Running OCR", image_path) | |
| results = ocr.ocr(image_path) | |
| dprint("OCR done, results_count", len(results)) | |
| all_text = [] | |
| try: | |
| lines = results[0] if results and isinstance(results[0], list) else results | |
| for item in lines: | |
| if isinstance(item, (list, tuple)) and len(item) >= 2: | |
| meta = item[1] | |
| if isinstance(meta, (list, tuple)) and len(meta) >= 1: | |
| all_text.append(str(meta[0])) | |
| except Exception as e: | |
| dprint("Error processing OCR results", str(e)) | |
| dprint("All direct texts", all_text) | |
| return extract_tesda_info(all_text) if all_text else { | |
| "id_type": "tesda", | |
| "certificate_number": None, | |
| "cert_first_four": None, | |
| "cert_last_four": None, | |
| "uli_number": None, | |
| "cln_nq_number": None, | |
| "full_name": None, | |
| "first_name": None, | |
| "last_name": None, | |
| "qualification_level": None, | |
| "qualification": None, | |
| "issued_date": None, | |
| "valid_until": None | |
| } | |
| if len(sys.argv) < 2: | |
| sys.stdout = original_stdout | |
| print(json.dumps({"success": False, "error": "No image URL provided"})) | |
| sys.exit(1) | |
| image_url = sys.argv[1] | |
| dprint("Processing image URL", image_url) | |
| try: | |
| image_path = download_image(image_url) | |
| dprint("Image downloaded to", image_path) | |
| ocr_results = extract_ocr_lines(image_path) | |
| dprint("OCR results ready", ocr_results) | |
| # Create the response object | |
| response = { | |
| "success": True, | |
| "ocr_results": ocr_results | |
| } | |
| # Restore stdout and print only the JSON response | |
| sys.stdout = original_stdout | |
| sys.stdout.write(json.dumps(response)) | |
| sys.stdout.flush() | |
| except Exception as e: | |
| dprint("Exception", str(e)) | |
| # Restore stdout for error JSON | |
| sys.stdout = original_stdout | |
| sys.stdout.write(json.dumps({"success": False, "error": str(e)})) | |
| sys.stdout.flush() | |
| sys.exit(1) | |
| finally: | |
| # Clean up | |
| try: | |
| clean_cache() | |
| except: | |
| pass |