handyhome-ocr-api / extract_tesda_ocr.py
takomattyy's picture
Upload 20 files
db10255 verified
raw
history blame
11.6 kB
import sys, json, os, glob, re, requests
from PIL import Image
from io import BytesIO
from datetime import datetime
from contextlib import redirect_stdout, redirect_stderr
# Immediately redirect all output to stderr except for our final JSON
original_stdout = sys.stdout
sys.stdout = sys.stderr
# Suppress all PaddleOCR output
os.environ['PADDLEOCR_LOG_LEVEL'] = 'ERROR'
os.environ['QT_QPA_PLATFORM'] = 'offscreen'
os.environ['DISPLAY'] = ':99'
# Import PaddleOCR after setting environment variables
from paddleocr import PaddleOCR
def dprint(msg, obj=None):
try:
print(f"DEBUG: {msg}" + (f": {obj}" if obj is not None else ""), file=sys.stderr)
except Exception:
pass
def clean_cache():
cache_files = ['temp_image.jpg', 'temp_image_ocr_res_img.jpg', 'temp_image_preprocessed_img.jpg', 'temp_image_res.json']
for f in cache_files:
if os.path.exists(f):
os.remove(f)
dprint("Removed cache file", f)
if os.path.exists("output"):
import shutil
shutil.rmtree("output")
dprint("Removed output directory")
def download_image(url, output_path='temp_image.jpg'):
dprint("Starting download", url)
clean_cache()
r = requests.get(url)
dprint("HTTP status", r.status_code)
r.raise_for_status()
img = Image.open(BytesIO(r.content))
if img.mode == 'RGBA':
bg = Image.new('RGB', img.size, (255,255,255))
bg.paste(img, mask=img.split()[-1])
img = bg
elif img.mode != 'RGB':
img = img.convert('RGB')
img.save(output_path, 'JPEG', quality=95)
dprint("Saved image", output_path)
return output_path
def format_date(s):
if not s: return None
raw = s.strip()
# Handle formats like "july 22,2022" (no space after comma)
raw = raw.replace(',', ', ')
t = raw.replace(' ', '').replace('\\','/').replace('.','/')
if re.match(r'^\d{4}[-/]\d{2}[-/]\d{2}$', t):
return t.replace('/', '-')
# Accept mm/dd/yyyy style
if re.match(r'^\d{2}/\d{2}/\d{4}$', raw):
m, d, y = raw.split('/')
return f"{y}-{int(m):02d}-{int(d):02d}"
# Month name variants - try different formats
date_formats = [
"%B %d, %Y", # July 22, 2022
"%b %d, %Y", # Jul 22, 2022
"%B %d %Y", # July 22 2022
"%b %d %Y", # Jul 22 2022
]
for fmt in date_formats:
try:
return datetime.strptime(raw.replace(' ', ' '), fmt).strftime("%Y-%m-%d")
except Exception:
continue
return raw
def cap_words(name):
return None if not name else ' '.join(w.capitalize() for w in name.split())
def normalize_name_from_parts(last, first_block):
last = (last or '').strip()
tokens = [t for t in (first_block or '').strip().split(' ') if t]
given_kept = tokens[:2] # keep up to two given names
composed = ' '.join(given_kept + [last]).strip()
return cap_words(composed) if composed else None
def take_within(lines, i, k=5):
out = []
for j in range(1, k+1):
if i+j < len(lines):
t = str(lines[i+j]).strip()
if t:
out.append(t)
return out
def extract_number_from_text(text):
# Remove all non-digit characters and return the result
return ''.join(c for c in text if c.isdigit())
def extract_tesda_info(lines):
dprint("Lines to extract", lines)
certificate_number = None
uli_number = None
cln_nq_number = None
full_name = None
# Collect name pieces
last_name_txt = None
first_name_txt = None
# Initialize other variables
issued_date = None
valid_until = None
qualification = None
qualification_level = None
L = [str(x or '').strip() for x in lines]
i = 0
while i < len(L):
line = L[i]
low = line.lower()
dprint("Line", {"i": i, "text": line})
# Certificate Number - more flexible pattern matching
if certificate_number is None:
# Try different patterns
cert_patterns = [
r'certificate\s*no\.?\s*(\d{14})',
r'certificate\s*number[:\s]*(\d{14})',
r'cert\s*no\.?\s*(\d{14})',
r'(\d{14})'
]
for pattern in cert_patterns:
match = re.search(pattern, low)
if match:
certificate_number = match.group(1)
dprint("Found certificate number", certificate_number)
break
# If not found in current line, check next lines
if not certificate_number:
ahead = take_within(L, i, 3)
for t in ahead:
# Look for 14-digit number
nums = extract_number_from_text(t)
if len(nums) == 14:
certificate_number = nums
dprint("Found certificate number in next lines", certificate_number)
break
# ULI Number
if uli_number is None and ('uli' in low or 'ops-' in low):
uli_pattern = r'(?:uli:?)?\s*([a-zA-Z]{3}-\d{2}-\d{3}-\d{5}-\d{3})'
match = re.search(uli_pattern, low, re.IGNORECASE)
if match:
uli_number = match.group(1).upper()
dprint("Found ULI number", uli_number)
# CLN-NQ Number
if cln_nq_number is None and ('cln' in low or 'nq' in low):
cln_pattern = r'(?:cln-nq-?)?(\d{7})'
match = re.search(cln_pattern, low)
if match:
cln_nq_number = match.group(1)
dprint("Found CLN-NQ number", cln_nq_number)
# Name appears after "is awarded to"
if 'awarded to' in low:
ahead = take_within(L, i, 3)
for t in ahead:
if t and not any(k in t.lower() for k in ['awarded', 'certificate', 'valid', 'for having']):
# Clean up the name - remove periods and fix spacing
cleaned_name = t.replace('.', ' ').replace(' ', ' ').strip()
full_name = cap_words(cleaned_name)
# Try to split into components
parts = full_name.split()
if len(parts) >= 2:
last_name_txt = parts[-1]
first_name_txt = ' '.join(parts[:-1])
dprint("Found full name", full_name)
break
# Qualification Level
if qualification_level is None and ('national certificate' in low or 'nc' in low):
qualification_level = cap_words(line)
dprint("Found qualification level", qualification_level)
# Qualification/Specialization
if qualification is None and ('in' in low and len(line.split()) > 1):
if i+1 < len(L):
qualification = cap_words(L[i+1])
dprint("Found qualification", qualification)
# Issued Date
if issued_date is None and ('issued' in low):
date_pattern = r'(?:issued\s*(?:on|:)?\s*)?([A-Za-z]+\s+\d{1,2},?\s*\d{4})'
match = re.search(date_pattern, low)
if match:
issued_date = format_date(match.group(1))
dprint("Found issued date", issued_date)
# Valid Until Date
if valid_until is None and ('valid' in low):
date_pattern = r'(?:valid\s*until\s*)?([A-Za-z]+\s+\d{1,2},?\s*\d{4})'
match = re.search(date_pattern, low)
if match:
valid_until = format_date(match.group(1))
dprint("Found valid until date", valid_until)
i += 1
# Compose name at the end
if full_name is None:
full_name = normalize_name_from_parts(last_name_txt, first_name_txt)
# Get first and last 4 digits of certificate number if available
cert_first_four = certificate_number[:4] if certificate_number else None
cert_last_four = certificate_number[-4:] if certificate_number else None
result = {
"id_type": "tesda",
"certificate_number": certificate_number,
"cert_first_four": cert_first_four,
"cert_last_four": cert_last_four,
"uli_number": uli_number,
"cln_nq_number": cln_nq_number,
"full_name": full_name,
"first_name": first_name_txt,
"last_name": last_name_txt,
"qualification_level": qualification_level,
"qualification": qualification,
"issued_date": issued_date,
"valid_until": valid_until
}
dprint("Final result", result)
return result
def extract_ocr_lines(image_path):
os.makedirs("output", exist_ok=True)
dprint("Initializing PaddleOCR")
# Redirect both stdout and stderr during PaddleOCR operations
with redirect_stdout(sys.stderr), redirect_stderr(sys.stderr):
ocr = PaddleOCR(
use_doc_orientation_classify=False,
use_doc_unwarping=False,
use_textline_orientation=False,
lang='en',
show_log=False
)
dprint("OCR initialized")
dprint("Running OCR", image_path)
results = ocr.ocr(image_path)
dprint("OCR done, results_count", len(results))
all_text = []
try:
lines = results[0] if results and isinstance(results[0], list) else results
for item in lines:
if isinstance(item, (list, tuple)) and len(item) >= 2:
meta = item[1]
if isinstance(meta, (list, tuple)) and len(meta) >= 1:
all_text.append(str(meta[0]))
except Exception as e:
dprint("Error processing OCR results", str(e))
dprint("All direct texts", all_text)
return extract_tesda_info(all_text) if all_text else {
"id_type": "tesda",
"certificate_number": None,
"cert_first_four": None,
"cert_last_four": None,
"uli_number": None,
"cln_nq_number": None,
"full_name": None,
"first_name": None,
"last_name": None,
"qualification_level": None,
"qualification": None,
"issued_date": None,
"valid_until": None
}
if len(sys.argv) < 2:
sys.stdout = original_stdout
print(json.dumps({"success": False, "error": "No image URL provided"}))
sys.exit(1)
image_url = sys.argv[1]
dprint("Processing image URL", image_url)
try:
image_path = download_image(image_url)
dprint("Image downloaded to", image_path)
ocr_results = extract_ocr_lines(image_path)
dprint("OCR results ready", ocr_results)
# Create the response object
response = {
"success": True,
"ocr_results": ocr_results
}
# Restore stdout and print only the JSON response
sys.stdout = original_stdout
sys.stdout.write(json.dumps(response))
sys.stdout.flush()
except Exception as e:
dprint("Exception", str(e))
# Restore stdout for error JSON
sys.stdout = original_stdout
sys.stdout.write(json.dumps({"success": False, "error": str(e)}))
sys.stdout.flush()
sys.exit(1)
finally:
# Clean up
try:
clean_cache()
except:
pass