amirali1985's picture
More fixes.
f1573b6
import os
import glob
from pathlib import Path
import gradio as gr
import jpype
import jpype.imports
import pandas as pd
from graphviz import Digraph
from jpype import JClass, getDefaultJVMPath
from pslpython.partition import Partition
def _find_psl_jars() -> list[str]:
"""
Priority:
1) Any *.jar inside PSL_JARS_DIR (if set)
2) Any *.jar in ./jars next to this script
3) Installed pslpython runtime jar
"""
jars: list[str] = []
# 2) Local ./jars
this_dir = Path(__file__).resolve().parent
jars_dir = f"{this_dir}/.jars"
jars.extend(glob.glob(f"{jars_dir}/*.jar"))
# Deduplicate while preserving order
dedup = []
seen = set()
for j in jars:
if j not in seen and Path(j).is_file():
seen.add(j)
dedup.append(j)
return dedup
def start_psl_jvm(verbose: bool = True) -> list[str]:
"""
Start a JVM with a classpath that includes PSL jars.
Returns the list of jars used.
"""
if jpype.isJVMStarted():
if verbose:
print("[PSL] JVM already started.")
return []
jars = _find_psl_jars()
if not jars:
raise RuntimeError(
"No PSL jars found. Place jars under /jars"
)
classpath = os.pathsep.join(jars)
# If getDefaultJVMPath() fails on your platform, set JAVA_HOME first.
jvm_path = getDefaultJVMPath()
# Start JVM
jpype.startJVM(
jvm_path,
f"-Djava.class.path={classpath}",
# Optional: tighten memory or logging here
# "-Xms256m", "-Xmx1024m",
)
# Sanity check that PSL classes are visible
GA = JClass("org.linqs.psl.model.atom.GroundAtom")
if verbose:
print(f"[PSL] JVM started with {len(jars)} jars.")
for j in jars:
print(f" - {j}")
print(f"[PSL] Sanity check: loaded {GA}")
return jars
# =========================
# Static PSL rules + graph app
# =========================
import os
import io
import gradio as gr
import pandas as pd
from graphviz import Digraph
import pslpython
from pslpython.model import Model
from pslpython.predicate import Predicate
from pslpython.rule import Rule
MODEL_NAME = 'minimal-circuit'
ADDITIONAL_PSL_OPTIONS = {
'runtime.log.level': 'INFO',
# 'runtime.db.type': 'Postgres',
# 'runtime.db.pg.name': 'psl',
}
# --- Master rule dictionary (single source of truth) ---
# Conventions:
# - body: list of literals (allow leading '!' for negation)
# - head: single literal string; '!' means negated head
# - weight: float for soft rules
# - hard: True -> hard constraint (no weight, trailing ".")
# - squared: True -> append '^2'
RULES = [
{"name":"R1_perfdrop_implies_critical", "weight":4.0,
"body":["InCircuit(E,C)", "PerfDrop(E,C)"], "head":"Critical(E,C)", "squared":True,
"comment":"High performance drop on removal implies the edge is critical (necessary) for C."},
{"name":"R2_safe_implies_removable", "weight":1.0,
"body":["InCircuit(E,C)", "SafeToRemove(E,C)"], "head":"Removable(E,C)", "squared":True,
"comment":"If an edge is in C and appears safe to remove, infer it’s removable."},
{"name":"R3_removable_excludes_critical", "weight":0.6,
"body":["Removable(E,C)"], "head":"!Critical(E,C)", "squared":True,
"comment":"Edges inferred removable should not be marked critical (soft mutual exclusion)."},
{"name":"R4_minimal_excludes_removable", "weight":1.0,
"body":["Minimal(C)", "InCircuit(E,C)"], "head":"!Removable(E,C)", "squared":True,
"comment":"A minimal circuit should not contain removable edges."},
{"name":"R5_remmasshi_neg_minimal", "weight":4.5,
"body":["RemMassHi(C)"], "head":"!Minimal(C)", "squared":True,
"comment":"High total removable mass argues against minimality."},
{"name":"R6_dominates_neg_minimal", "weight":8.0,
"body":["Dominates(C2,C)"], "head":"!Minimal(C)", "squared":True,
"comment":"If a cheaper sufficient rival dominates C, C is not minimal."},
{"name":"R7_insufficient_neg_minimal", "weight":8.0,
"body":["!Sufficient(C)"], "head":"!Minimal(C)", "squared":True,
"comment":"Minimality applies only to sufficient circuits."},
{"name":"R8_rand_circuit_better_neg_minimal", "weight":4.0,
"body":["RandCircuitBetter(C,Cr)"], "head":"!Minimal(C)", "squared":True,
"comment":"If random comparable circuits often outperform C, it is unlikely minimal."},
{"name":"R9_tie_uncovered_neg_minimal", "weight":1.2,
"body":["Tie(E1,E2,C)", "!TieCovered(E1,E2,C)"], "head":"!Minimal(C)", "squared":True,
"comment":"Near-ties must be explored; uncovered ties weaken claims of minimality."},
{"name":"R10_multiple_startseed_minimal", "weight":0.5,
"body":["FromMultipleStartSeeds(C)"], "head":"Minimal(C)", "squared":True,
"comment":"Consistent convergence to the same C from multiple starts nudges minimality upward."},
# Hard constraint example (no weight, trailing period)
{"name":"HC1_dominates_implies_not_minimal", "hard":True,
"body":["Dominates(C2,C)"], "head":"!Minimal(C)",
"comment":"HARD: Whenever C2 dominates C, C cannot be minimal."},
]
# --- Helpers to build PSL text and the graph ---
def _literal_to_psl(lit: str) -> str:
return lit
def _rule_to_psl(rule: dict) -> str:
body = " & ".join(_literal_to_psl(l) for l in rule["body"])
head = _literal_to_psl(rule["head"])
if rule.get("hard", False):
return f"{body} -> {head} ."
weight = rule["weight"]
exp = " ^2" if rule.get("squared", False) else ""
return f"{weight}: {body} -> {head}{exp}"
def add_rules(model: Model, rules=RULES):
for r in rules:
model.add_rule(Rule(_rule_to_psl(r)))
def _pred_name(lit: str) -> str:
return lit.lstrip('!').split('(')[0].strip()
def _is_negated(lit: str) -> bool:
return lit.strip().startswith('!')
def rules_to_graphviz_file(rules=RULES, basename: str = "rules_graph") -> str:
"""
Render Graphviz to a PNG on disk and return the filepath.
Produces 'basename.png' next to your app.
"""
g = Digraph(name="CircuitMinimality", format="png", engine="dot")
g.attr(rankdir="LR", fontname="Helvetica")
g.node_attr.update(shape="box", style="rounded,filled", fillcolor="#f8f8f8", fontname="Helvetica")
def pred_name(lit: str) -> str:
return lit.lstrip('!').split('(')[0].strip()
def is_negated(lit: str) -> bool:
return lit.strip().startswith('!')
# Nodes
preds = set()
for r in rules:
preds.update(pred_name(x) for x in (r["body"] + [r["head"]]))
for p in sorted(preds):
g.node(p)
# Edges
for r in rules:
head_lit = r["head"]
head_pred = pred_name(head_lit)
neg_head = is_negated(head_lit)
color = "#2ca02c" if not neg_head else "#d62728"
style = "solid"
label = f"{r['name']} ({'HARD' if r.get('hard', False) else r.get('weight', '')})".strip()
for b in r["body"]:
g.edge(pred_name(b), head_pred, color=color, fontcolor=color, style=style, penwidth="2", label=label)
# Render to file: returns full path without extension; add '.png'
out = g.render(filename=basename, cleanup=True) # writes basename.png
png_path = out if out.endswith(".png") else f"{out}.png"
return png_path
def _rules_commentary_md(rules=RULES) -> str:
lines = ["### Rule Commentary", ""]
for r in rules:
badge = "**HARD**" if r.get("hard", False) else f"**w={r.get('weight','')}**"
psl = _rule_to_psl(r)
lines.append(f"- **{r['name']}** ({badge}) \n {r['comment']} \n ` {psl} `")
return "\n".join(lines)
# --- Compute everything up-front (static app) ---
# 1) Start JVM and report status
try:
used_jars = start_psl_jvm(verbose=False) # defined in your earlier block
jvm_already = jpype.isJVMStarted() and (len(used_jars) == 0)
JVM_STATUS = "✅ JVM already running." if jvm_already else f"✅ JVM started with {len(used_jars)} jar(s)."
if used_jars:
JVM_STATUS += "\n" + "\n".join([f"- {p}" for p in used_jars])
# Quick class sanity
_ = JClass("org.linqs.psl.model.atom.GroundAtom")
except Exception as e:
JVM_STATUS = f"❌ JVM start failed: {e}"
# 2) Build commentary and graph image
COMMENTARY_MD = _rules_commentary_md(RULES)
def add_predicates(model):
# Structure & evaluation
model.add_predicate(Predicate('InCircuit', size=2)) # (E, C)
model.add_predicate(Predicate('Sufficient', size=1)) # (C)
model.add_predicate(Predicate('PerfDrop', size=2)) # (E, C)
model.add_predicate(Predicate('SafeToRemove', size=2)) # (E, C)
model.add_predicate(Predicate('RemMassHi', size=1)) # (C)
# Latents / targets
model.add_predicate(Predicate('Removable', size=2)) # (E, C)
model.add_predicate(Predicate('Critical', size=2)) # (E, C)
model.add_predicate(Predicate('Minimal', size=1)) # (C)
# Comparators / alternatives
model.add_predicate(Predicate('Dominates', size=2)) # (C2, C)
model.add_predicate(Predicate('RandCircuitBetter', size=2)) # (C, Cr)
# Search coverage (optional)
model.add_predicate(Predicate('Tie', size=3)) # (E1, E2, C)
model.add_predicate(Predicate('TieCovered', size=3)) # (E1, E2, C)
model.add_predicate(Predicate('FromMultipleStartSeeds', size=1)) # (C)
def add_rules(model, rules=RULES, attach_comments: bool = False):
"""Add rules to a pslpython model from the RULES dict."""
for r in rules:
psl_text = _rule_to_psl(r).replace("->", "->").replace("!", "!")
model.add_rule(Rule(psl_text))
if attach_comments and r.get("comment"):
print(f"# {r['name']}: {r['comment']}")
def infer(model):
"""Placeholder inference call (can later add data)."""
return model.infer(psl_options=ADDITIONAL_PSL_OPTIONS)
def build_model(model_name=MODEL_NAME):
"""Build a full PSL model from scratch."""
model = Model(model_name)
add_predicates(model)
add_rules(model)
return model
# -------------------------
# Build model + summary text
# -------------------------
model = build_model()
def summarize_model(model: Model):
"""Return Markdown summary of predicates and rules."""
preds = model.get_predicates()
lines = [
"### PSL Model Build Summary",
f"- Model name: **{model._name}**",
f"- Total predicates: {len(preds)}",
f"- Total rules: {len(RULES)}",
"",
"#### Predicates:"
]
for pred_name, p in preds.items():
lines.append(f"- `{pred_name}` (arity = {len(p._types)})")
lines.append("\n#### Rules:")
for r in RULES:
desc = r['comment']
lines.append(f"- **{r['name']}** — {desc}")
return "\n".join(lines)
MODEL_SUMMARY_MD = summarize_model(model)
# -------------------------
# Graphviz image (from before)
# -------------------------
GRAPH_PATH = rules_to_graphviz_file(RULES, basename="rules_graph")
# =========================
# ATOMS (facts) + loader + static summary table
# =========================
# Note that observations and targets may never overlap.
ATOMS = {
# Unary: Minimal(C)
"Minimal": {
"OBS": pd.DataFrame({"C": ["C4", "C5"], "VALUE": [1.0, 0.0]}),
"TARGETS": pd.DataFrame({"C": ["C1", "C2", "C3"], "VALUE": [0.5, 0.5, 0.5]}),
"TRUTH": pd.DataFrame({"C": ["C3"], "VALUE": [0.2]}),
},
# Unary: Sufficient(C)
"Sufficient": {
"OBS": pd.DataFrame({"C": ["C1", "C2", "C3"], "VALUE": [1.0, 1.0, 0.0]}),
"TARGETS": pd.DataFrame({"C": ["C4"], "VALUE": [0.5]}),
"TRUTH": pd.DataFrame({"C": ["C1", "C2", "C3", "C4"],"VALUE": [1.0, 1.0, 0.0, 1.0]}),
},
# Unary: RemMassHi(C)
"RemMassHi": {
"OBS": pd.DataFrame({"C": ["C2"], "VALUE": [1.0]}),
"TARGETS": pd.DataFrame({"C": ["C1", "C3"], "VALUE": [0.5, 0.5]}),
},
# Unary: FromMultipleStartSeeds(C)
"FromMultipleStartSeeds": {
"OBS": pd.DataFrame({"C": ["C1", "C2", "C3"], "VALUE": [1.0, 1.0, 0.0]}),
},
# Binary: InCircuit(E,C)
"InCircuit": {
"OBS": pd.DataFrame({"E": ["e1","e2","e3"], "C": ["C1","C1","C1"], "VALUE": [1.0, 1.0, 1.0]}),
"TARGETS": pd.DataFrame({"E": ["e4","e5"], "C": ["C2","C2"], "VALUE": [0.5, 0.5]}),
},
# Binary: PerfDrop(E,C)
"PerfDrop": {
"OBS": pd.DataFrame({"E": ["e1","e2","e3","e4"], "C": ["C1","C1","C1","C2"], "VALUE": [0.40, 0.10, 0.50, 0.20]}),
"TARGETS": pd.DataFrame({"E": ["e5","e6"], "C": ["C2","C2"], "VALUE": [0.5, 0.5]}),
},
# Binary: SafeToRemove(E,C)
"SafeToRemove": {
"OBS": pd.DataFrame({"E": ["e2","e4","e5"], "C": ["C1","C2","C2"], "VALUE": [0.90, 0.80, 0.30]}),
"TARGETS": pd.DataFrame({"E": ["e1","e3"], "C": ["C1","C1"], "VALUE": [0.5, 0.5]}),
},
# Binary: Removable(E,C)
"Removable": {
"TARGETS": pd.DataFrame({
"E": ["e1","e2","e3","e5"],
"C": ["C1","C1","C1","C2"],
"VALUE": [0.5, 0.5, 0.5, 0.5],
}),
"OBS": pd.DataFrame({"E": ["e4"], "C": ["C2"], "VALUE": [0.2]}),
},
# Binary: Critical(E,C)
"Critical": {
"TARGETS": pd.DataFrame({
"E": ["e1","e2","e3","e4","e5","e6"],
"C": ["C1","C1","C1","C2","C2","C2"],
"VALUE": [0.5, 0.5, 0.5, 0.5, 0.5, 0.5],
}),
},
# Binary: Dominates(C2,C)
"Dominates": {
"OBS": pd.DataFrame({"C2": ["C2","C3"], "C": ["C1","C1"], "VALUE": [1.0, 0.0]}),
"TARGETS": pd.DataFrame({"C2": ["C3"], "C": ["C2"], "VALUE": [0.5]}),
},
# Binary: RandCircuitBetter(C,Cr)
"RandCircuitBetter": {
"OBS": pd.DataFrame({"C": ["C1","C2"], "Cr": ["R1","R1"], "VALUE": [1.0, 0.0]}),
"TARGETS": pd.DataFrame({"C": ["C1"], "Cr": ["R2"], "VALUE": [0.5]}),
},
# Ternary: Tie(E1,E2,C)
"Tie": {
"OBS": pd.DataFrame({"E1": ["e1","e2","e3"], "E2": ["e2","e3","e4"], "C": ["C1","C1","C1"], "VALUE": [1.0, 0.0, 0.2]}),
"TARGETS": pd.DataFrame({"E1": ["e5","e6"], "E2": ["e6","e7"], "C": ["C2","C2"], "VALUE": [0.5, 0.5]}),
},
# Ternary: TieCovered(E1,E2,C)
"TieCovered": {
"OBS": pd.DataFrame({"E1": ["e1","e2"], "E2": ["e2","e3"], "C": ["C1","C1"], "VALUE": [1.0, 1.0]}),
"TARGETS": pd.DataFrame({"E1": ["e3","e5","e6"], "E2": ["e4","e6","e7"], "C": ["C1","C2","C2"], "VALUE": [0.5, 0.5, 0.5]}),
},
}
# 2) Loader: map strings -> Partition, and friendly columns -> 0..k with last=VALUE
_PARTITION_MAP = {
"OBS": Partition.OBSERVATIONS,
"TARGETS": Partition.TARGETS,
"TRUTH": Partition.TRUTH,
}
def _rename_cols_for_psl(df: pd.DataFrame) -> pd.DataFrame:
"""
Convert human-friendly columns to PSL's required integer columns:
args -> 0..k-1 in order of appearance, last column = k is VALUE.
"""
cols = list(df.columns)
assert "VALUE" in cols, "Each frame must include a VALUE column."
arg_cols = [c for c in cols if c != "VALUE"]
new_cols = {col: i for i, col in enumerate(arg_cols)}
new_cols["VALUE"] = len(arg_cols)
return df.rename(columns=new_cols)[list(range(len(arg_cols)+1))]
def load_atoms_config(model, atoms=ATOMS):
"""Load the ATOMS config into the PSL model, one predicate at a time."""
for pred_name, parts in atoms.items():
pred = model.get_predicate(pred_name)
for part_key, df in parts.items():
partition = _PARTITION_MAP[part_key]
pred.add_data(partition, _rename_cols_for_psl(df))
# --- Build & load once (static) ---
# Reuse the model you already built
model = build_model()
load_atoms_config(model, ATOMS)
# Make a compact summary table for display (rows=predicate, cols=counts)
import numpy as np
def atoms_summary_table(atoms=ATOMS) -> pd.DataFrame:
rows = []
for pname, parts in atoms.items():
obs = len(parts.get("OBS", pd.DataFrame()))
tgt = len(parts.get("TARGETS", pd.DataFrame()))
tru = len(parts.get("TRUTH", pd.DataFrame()))
rows.append({"Predicate": pname, "Observations": obs, "Targets": tgt, "Truth": tru})
df = pd.DataFrame(rows).sort_values("Predicate").reset_index(drop=True)
return df
ATOMS_TABLE = atoms_summary_table(ATOMS)
ATOMS_COMMENTARY = (
"### Facts/Atoms Overview\n"
"- **Observations** are known inputs (fixed evidence).\n"
"- **Targets** are latent atoms the model will **infer** (e.g., whether `C1`, `C2`, `C3` are `Minimal`).\n"
"- **Truth** (if provided) is held-out gold used to **evaluate** performance (not used during inference).\n"
"_Note: the same ground atom must not appear in both Observations and Targets._"
)
# =========================
# Learning + Inference + UI tab
# =========================
def get_rules_and_weights(model):
"""
Returns a dict: textual rule body -> weight (float).
Uses pslpython internals (_rules, _weight, _rule_body) as in your snippet.
"""
rules = model._rules
return {r._rule_body: r._weight for r in rules}
# 1) Capture starting weights
start_rules_to_weights = get_rules_and_weights(model)
# 2) Learn weights (supervised), then infer
# Note: learning expects some TRUTH atoms (you provided e.g., Minimal(C3), Sufficient, etc.)
model.learn(psl_options=ADDITIONAL_PSL_OPTIONS)
results = model.infer(psl_options=ADDITIONAL_PSL_OPTIONS)
# 3) Package results
# results is a dict-like {Predicate -> DataFrame}, or iterable of (Predicate, DataFrame)
# Build a name->df map similar to your snippet.
named_results = {}
for pred_obj, df in results.items():
# pslpython usually has .name or .name() depending on version
name = pred_obj.name() if hasattr(pred_obj, "name") and callable(pred_obj.name) else getattr(pred_obj, "name", str(pred_obj))
named_results[str(name).upper()] = df
# Minimal(C) inferred scores
minimal_results = named_results.get("MINIMAL", pd.DataFrame())
# Make Minimal table tidy: columns [C, VALUE]
if not minimal_results.empty:
cols = list(minimal_results.columns)
minimal_results_display = minimal_results[[0, "truth"]].sort_values(0).reset_index(drop=True)
minimal_results_display = minimal_results_display.rename(columns={"truth": "VALUE", "O": "Circuit_ID"})
else:
minimal_results_display = pd.DataFrame(columns=["C", "VALUE"])
# 4) Capture ending weights and build comparison
end_rules_to_weights = get_rules_and_weights(model)
def weights_comparison_df(start_w: dict, end_w: dict) -> pd.DataFrame:
# Join on rule body text; include rules that exist at either time
keys = sorted(set(start_w.keys()) | set(end_w.keys()))
rows = []
for k in keys:
sw = start_w.get(k, float("nan"))
ew = end_w.get(k, float("nan"))
rows.append({"Rule": k, "StartWeight": sw, "EndWeight": ew, "Delta": (ew - sw) if (pd.notna(sw) and pd.notna(ew)) else float("nan")})
df = pd.DataFrame(rows)
# Sort: biggest magnitude changes first
return df.sort_values("Delta", key=lambda s: s.abs(), ascending=False).reset_index(drop=True)
WEIGHTS_TABLE = weights_comparison_df(start_rules_to_weights, end_rules_to_weights)
# --- Add to the static Gradio app (below your existing panes) ---
# -------------------------
# Static Gradio UI
# -------------------------
# --- Static Gradio UI with Tabs ---
with gr.Blocks(title="PSL Minimal-Circuit • Static") as demo:
gr.Markdown("## PSL Minimal-Circuit • Static Overview")
with gr.Tabs():
with gr.Tab("Rules & Model"):
# gr.Markdown(f"**JVM status:**\n\n{JVM_STATUS}")
gr.Markdown(COMMENTARY_MD)
gr.Markdown(MODEL_SUMMARY_MD)
with gr.Tab("Dependency graph"):
gr.Image(value=GRAPH_PATH, label="Rule Graph (green = positive head, red dashed = negated head)", show_label=False)
with gr.Tab("Atoms"):
ATOMS_COMMENTARY = (
"### Facts/Atoms Overview\n"
"- **Observations** are known inputs (fixed evidence).\n"
"- **Targets** are latent atoms the model will **infer** "
"(e.g., whether `C1`, `C2`, `C3` are `Minimal`).\n"
"- **Truth** (if provided) is held-out gold used to **evaluate** performance; "
"it is not used during inference.\n"
"_Note: the same ground atom must not appear in both Observations and Targets._"
)
gr.Markdown(ATOMS_COMMENTARY)
# compact counts table: rows=predicate, cols=Observations/Targets/Truth
def _atoms_summary_table(atoms=ATOMS):
def fmt_atoms(pname, df):
if df is None or len(df) == 0:
return ""
var_cols = [c for c in df.columns if c != "VALUE"]
lines = []
for _, r in df.iterrows():
args = ", ".join(str(r[c]) for c in var_cols)
val = r["VALUE"] if "VALUE" in df.columns else ""
lines.append(f"{pname}({args}) = {val}")
return "\n".join(lines)
rows = []
for pname, parts in atoms.items():
rows.append({
"Predicate": pname,
"Observations": fmt_atoms(pname, parts.get("OBS", pd.DataFrame())),
"Targets": fmt_atoms(pname, parts.get("TARGETS", pd.DataFrame())),
"Truth": fmt_atoms(pname, parts.get("TRUTH", pd.DataFrame())),
})
return pd.DataFrame(rows)
gr.Dataframe(
value=_atoms_summary_table(ATOMS),
label="Atoms Summary (counts)",
interactive=False
)
with gr.Tab("Learning & Inference"):
gr.Markdown("### Training & Inference Results")
gr.Markdown(
"Below are the **inferred minimality scores** for each circuit `C`, "
"and a comparison of **rule weights** before and after `model.learn()`."
)
gr.Markdown("#### Inferred Minimality: `Minimal(C)`")
gr.Dataframe(value=minimal_results_display, interactive=False)
gr.Markdown("#### Rule Weights (Before vs After Learning)")
gr.Dataframe(value=WEIGHTS_TABLE, interactive=False)
gr.Markdown(
"_Notes:_ Learning adjusts **soft rule** weights to better explain the provided TRUTH. "
"Hard constraints remain fixed. Inference then computes truth values in `[0,1]` for target atoms."
)
if __name__ == "__main__":
demo.launch()