Skip to content

solidworks_mcp.ui.server

solidworks_mcp.ui.server

FastAPI backend for the Prefab CAD assistant dashboard.

REST API endpoints for dashboard actions: - GET /api/ui/state - Hydrate UI with current session state - POST /api/ui/brief/approve - Accept user goal - POST /api/ui/clarify - Call GitHub Copilot to generate Q&A - POST /api/ui/family/inspect - Call GitHub Copilot to classify design family - POST /api/ui/family/accept - Accept proposed family - POST /api/ui/checkpoints/execute-next - Execute next checkpoint (adapter-backed; unsupported tools marked MOCKED) - POST /api/ui/preview/refresh - Sync 3D view from SolidWorks export_image - POST /api/ui/manual-sync/reconcile - Detect manual edits via snapshot diff - GET /previews/* - Serve static PNG preview images - GET /docs - FastAPI OpenAPI UI for local endpoint inspection

LLM Integration: - Clarify button: GitHub Copilot (default: github:openai/gpt-4.1) - Inspect button: GitHub Copilot family classification - Requires: GH_TOKEN or GITHUB_API_KEY environment variable with models:read scope

Attributes

DEFAULT_API_ORIGIN module-attribute

DEFAULT_API_ORIGIN = getenv('SOLIDWORKS_UI_API_ORIGIN', 'http://127.0.0.1:8766')

DEFAULT_SESSION_ID module-attribute

DEFAULT_SESSION_ID = 'prefab-dashboard'

UI_HTTP_LOG_FILE module-attribute

UI_HTTP_LOG_FILE = UI_LOG_DIR / 'ui_http.log'

UI_LOG_DIR module-attribute

UI_LOG_DIR = Path('.solidworks_mcp') / 'ui_logs'

_UI_FILE_SINK_ID module-attribute

_UI_FILE_SINK_ID: int | None = None

app module-attribute

app = FastAPI(title='SolidWorks Prefab UI Server', version='0.1.0', summary='FastAPI backend for the interactive SolidWorks Prefab dashboard.')

Functions

_configure_ui_file_logging

_configure_ui_file_logging() -> None

Build internal configure ui file logging.

Returns:

Name Type Description
None None

None.

Source code in src/solidworks_mcp/ui/server.py
def _configure_ui_file_logging() -> None:
    """Build internal configure ui file logging.

    Returns:
        None: None.
    """

    global _UI_FILE_SINK_ID
    if _UI_FILE_SINK_ID is not None:
        return

    UI_LOG_DIR.mkdir(parents=True, exist_ok=True)
    _UI_FILE_SINK_ID = logger.add(
        str(UI_HTTP_LOG_FILE),
        level="INFO",
        rotation="10 MB",
        retention="14 days",
        encoding="utf-8",
        enqueue=True,
        filter=lambda record: "[ui." in record["message"],
        format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level:<8} | {message}",
    )
    logger.info(
        "[ui.logging] file sink enabled path={}", str(UI_HTTP_LOG_FILE.resolve())
    )

_decode_request_body

_decode_request_body(body: bytes) -> Any

Build internal decode request body.

Parameters:

Name Type Description Default
body bytes

The body value.

required

Returns:

Name Type Description
Any Any

The result produced by the operation.

Source code in src/solidworks_mcp/ui/server.py
def _decode_request_body(body: bytes) -> Any:
    """Build internal decode request body.

    Args:
        body (bytes): The body value.

    Returns:
        Any: The result produced by the operation.
    """

    if not body:
        return None
    try:
        parsed = json.loads(body)
    except json.JSONDecodeError:
        return body.decode("utf-8", errors="replace")
    return _sanitize_log_payload(parsed)

_sanitize_log_payload

_sanitize_log_payload(value: Any) -> Any

Build internal sanitize log payload.

Parameters:

Name Type Description Default
value Any

The value value.

required

Returns:

Name Type Description
Any Any

The result produced by the operation.

Source code in src/solidworks_mcp/ui/server.py
def _sanitize_log_payload(value: Any) -> Any:
    """Build internal sanitize log payload.

    Args:
        value (Any): The value value.

    Returns:
        Any: The result produced by the operation.
    """

    if isinstance(value, dict):
        sanitized: dict[str, Any] = {}
        for key, item in value.items():
            if key == "data" and isinstance(item, str):
                sanitized[key] = f"<omitted base64 payload len={len(item)}>"
            elif key == "uploaded_files" and isinstance(item, list):
                sanitized[key] = [_sanitize_log_payload(entry) for entry in item]
            else:
                sanitized[key] = _sanitize_log_payload(item)
        return sanitized
    if isinstance(value, list):
        return [_sanitize_log_payload(item) for item in value]
    return value

_should_log_request

_should_log_request(path: str) -> bool

Build internal should log request.

Parameters:

Name Type Description Default
path str

Filesystem path for the operation.

required

Returns:

Name Type Description
bool bool

True if should log request, otherwise False.

Source code in src/solidworks_mcp/ui/server.py
def _should_log_request(path: str) -> bool:
    """Build internal should log request.

    Args:
        path (str): Filesystem path for the operation.

    Returns:
        bool: True if should log request, otherwise False.
    """

    return path == "/api/health" or path.startswith("/api/ui/")

_startup_ingest_design_knowledge async

_startup_ingest_design_knowledge() -> None

Auto-ingest bundled design knowledge markdown files into FAISS on first startup.

Returns:

Name Type Description
None None

None.

Source code in src/solidworks_mcp/ui/server.py
@app.on_event("startup")
async def _startup_ingest_design_knowledge() -> None:
    """Auto-ingest bundled design knowledge markdown files into FAISS on first startup.

    Returns:
        None: None.
    """
    import logging as _logging

    _log = _logging.getLogger(__name__)
    knowledge_dir = FilePath(__file__).parent.parent / "agents" / "design_knowledge"
    if not knowledge_dir.is_dir():
        return
    md_files = sorted(knowledge_dir.glob("*.md"))
    if not md_files:
        return
    try:
        from ..agents.vector_rag import VectorRAGIndex  # noqa: PLC0415

        namespace = "solidworks-design-knowledge"
        rag_dir = FilePath(".solidworks_mcp") / "rag"
        idx = VectorRAGIndex.load(namespace=namespace, rag_dir=rag_dir)
        # Only re-ingest if the index is empty (fresh install) or stale
        if idx.chunk_count == 0:
            for md_file in md_files:
                text = md_file.read_text(encoding="utf-8")
                idx.ingest_text(
                    text, source=md_file.name, tags=[namespace, md_file.stem]
                )
            idx.save()
            _log.info(
                "Auto-ingested %d design knowledge files into FAISS namespace '%s'",
                len(md_files),
                namespace,
            )
        else:
            _log.debug(
                "FAISS namespace '%s' already loaded (%d chunks) — skipping auto-ingest",
                namespace,
                idx.chunk_count,
            )
    except ImportError:
        _log.debug(
            "faiss/sentence-transformers not installed — skipping design knowledge auto-ingest"
        )
    except Exception as exc:
        _log.warning("Design knowledge auto-ingest failed (non-fatal): %s", exc)

accept_family_choice

accept_family_choice(session_id: str, family: str | None = None, *, db_path: Path | None = None) -> dict[str, Any]

Accept the proposed family classification and advance session status.

Parameters:

Name Type Description Default
session_id str

Dashboard session identifier.

required
family str | None

Family name to accept; falls back to proposed_family in metadata.

None
db_path Path | None

Optional override for the SQLite database path.

None

Returns:

Type Description
dict[str, Any]

Full dashboard state payload.

Source code in src/solidworks_mcp/ui/services/session_service.py
def accept_family_choice(
    session_id: str,
    family: str | None = None,
    *,
    db_path: Path | None = None,
) -> dict[str, Any]:
    """Accept the proposed family classification and advance session status.

    Args:
        session_id: Dashboard session identifier.
        family: Family name to accept; falls back to ``proposed_family`` in metadata.
        db_path: Optional override for the SQLite database path.

    Returns:
        Full dashboard state payload.
    """
    session_row = ensure_dashboard_session(session_id, db_path=db_path)
    metadata = parse_json_blob(session_row.get("metadata_json"))
    accepted_family = family or metadata.get("proposed_family") or "unknown"
    upsert_design_session(
        session_id=session_id,
        user_goal=session_row.get("user_goal") or DEFAULT_USER_GOAL,
        source_mode=session_row.get("source_mode") or DEFAULT_SOURCE_MODE,
        accepted_family=accepted_family,
        status="planned",
        current_checkpoint_index=session_row.get("current_checkpoint_index") or 0,
        metadata_json=session_row.get("metadata_json"),
        db_path=db_path,
    )
    persist_ui_action(
        session_id,
        tool_name="ui.accept_family",
        db_path=db_path,
        metadata_updates={
            "accepted_family": accepted_family,
            "latest_message": f"Family accepted: {accepted_family}.",
            "latest_error_text": "",
            "remediation_hint": "",
        },
        input_payload={"family": accepted_family},
    )
    return build_dashboard_state(session_id, db_path=db_path)

approve_design_brief

approve_design_brief(session_id: str, user_goal: str, *, db_path: Path | None = None) -> dict[str, Any]

Persist the accepted design goal and return updated dashboard state.

Parameters:

Name Type Description Default
session_id str

Dashboard session identifier.

required
user_goal str

Design goal text approved by the user.

required
db_path Path | None

Optional override for the SQLite database path.

None

Returns:

Type Description
dict[str, Any]

Full dashboard state payload.

Source code in src/solidworks_mcp/ui/services/session_service.py
def approve_design_brief(
    session_id: str,
    user_goal: str,
    *,
    db_path: Path | None = None,
) -> dict[str, Any]:
    """Persist the accepted design goal and return updated dashboard state.

    Args:
        session_id: Dashboard session identifier.
        user_goal: Design goal text approved by the user.
        db_path: Optional override for the SQLite database path.

    Returns:
        Full dashboard state payload.
    """
    ensure_dashboard_session(session_id, user_goal=user_goal, db_path=db_path)
    persist_ui_action(
        session_id,
        tool_name="ui.approve_brief",
        db_path=db_path,
        user_goal=user_goal,
        metadata_updates={
            "normalized_brief": user_goal,
            "latest_message": "Brief accepted.",
            "latest_error_text": "",
            "remediation_hint": "",
        },
        input_payload={"user_goal": user_goal},
        output_metadata=True,
    )
    return build_dashboard_state(session_id, db_path=db_path)

build_dashboard_state

build_dashboard_state(session_id: str = DEFAULT_SESSION_ID, *, db_path: Path | None = None, api_origin: str = DEFAULT_API_ORIGIN) -> dict[str, Any]

Assemble the complete dashboard payload consumed by the Prefab UI renderer.

This function is the single read-path for all UI state: it reads the session database, merges every sub-component (checkpoints, evidence, snapshots, preview URLs, provider readiness), and returns the DashboardUIState model as a dict.

Parameters:

Name Type Description Default
session_id str

Dashboard session identifier.

DEFAULT_SESSION_ID
db_path Path | None

Optional override for the SQLite database path.

None
api_origin str

Base URL used to construct preview and viewer URLs.

DEFAULT_API_ORIGIN

Returns:

Type Description
dict[str, Any]

DashboardUIState model dumped to a plain dict.

Source code in src/solidworks_mcp/ui/services/session_service.py
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
def build_dashboard_state(
    session_id: str = DEFAULT_SESSION_ID,
    *,
    db_path: Path | None = None,
    api_origin: str = DEFAULT_API_ORIGIN,
) -> dict[str, Any]:
    """Assemble the complete dashboard payload consumed by the Prefab UI renderer.

    This function is the single read-path for all UI state: it reads the session
    database, merges every sub-component (checkpoints, evidence, snapshots, preview
    URLs, provider readiness), and returns the ``DashboardUIState`` model as a dict.

    Args:
        session_id: Dashboard session identifier.
        db_path: Optional override for the SQLite database path.
        api_origin: Base URL used to construct preview and viewer URLs.

    Returns:
        ``DashboardUIState`` model dumped to a plain dict.
    """
    import os  # local import

    session_row = ensure_dashboard_session(session_id, db_path=db_path)
    metadata = parse_json_blob(session_row.get("metadata_json"))
    db_ready = bool(session_row)
    workflow_mode = normalize_workflow_mode(metadata.get("workflow_mode"))
    active_model_path = sanitize_model_path_text(metadata.get("active_model_path"))
    is_new_design_clean = workflow_mode == "new_design" and not active_model_path

    # --- Checkpoints ---
    checkpoints = _build_checkpoint_rows(
        session_id, db_path=db_path, is_new_design_clean=is_new_design_clean
    )
    structured_rendering_enabled = bool(checkpoints)
    checkpoints_text = (
        " | ".join(
            f"{item['step']}. {item['goal']} [{item['status']}] via {item['tools']}"
            for item in checkpoints
        )
        if checkpoints
        else "No checkpoints available yet."
    )

    # --- Evidence rows ---
    evidence_rows = _build_evidence_rows(
        session_id,
        db_path=db_path,
        active_model_path=active_model_path,
        is_new_design_clean=is_new_design_clean,
    )
    evidence_rows_text = (
        " | ".join(
            f"{item['source']}: {item['detail']} (score {item['score']})"
            for item in evidence_rows
        )
        if evidence_rows
        else "No evidence links captured yet."
    )

    # --- Tool history ---
    tool_history = list_tool_call_records(session_id, db_path=db_path)
    latest_tool = tool_history[-1]["tool_name"] if tool_history else "waiting"
    tool_history_text = trace_json(trace_tool_records(tool_history[-20:]))

    # --- Preview URL ---
    import time as _time

    preview_url = ""
    preview_status = "No preview captured yet."
    snapshots = list_model_state_snapshots(session_id, db_path=db_path)
    latest_snapshot_path = snapshots[0].get("screenshot_path") if snapshots else None
    if latest_snapshot_path:
        preview_path = Path(latest_snapshot_path)
        if preview_path.exists():
            ts = int(preview_path.stat().st_mtime)
            preview_url = f"{api_origin}/previews/{preview_path.name}?ts={ts}"
            preview_status = (
                f"Synced from SolidWorks current view. Last file: {preview_path.name}"
            )

    # --- Feature tree ---
    selected_feature_name = str(metadata.get("selected_feature_name") or "")
    feature_tree_items = _build_feature_tree(
        session_id,
        db_path=db_path,
        is_new_design_clean=is_new_design_clean,
        selected_feature_name=selected_feature_name,
    )

    # --- 3D viewer URL ---
    preview_viewer_url = sanitize_preview_viewer_url(
        metadata.get("preview_viewer_url"),
        session_id=session_id,
        api_origin=api_origin,
    )
    if (
        not preview_viewer_url
        and bool(metadata.get("preview_stl_ready"))
        and metadata.get("active_model_path")
    ):
        preview_viewer_url = (
            f"{api_origin}/api/ui/viewer/{session_id}?session_id={session_id}&t=0"
        )

    preview_status = sanitize_ui_text(metadata.get("preview_status"), preview_status)

    # --- Family / clarification ---
    family = (
        session_row.get("accepted_family")
        or metadata.get("proposed_family")
        or "unclassified"
    )
    confidence = metadata.get("family_confidence", "pending")
    evidence_text = (
        " | ".join(metadata.get("family_evidence", [])) or "No family evidence yet."
    )
    warning_text = (
        " | ".join(metadata.get("family_warnings", [])) or "No blocking warnings."
    )
    questions = metadata.get("clarifying_questions", [])
    question_text = (
        "\n".join(f"- {item}" for item in questions)
        if questions
        else "No outstanding clarification questions."
    )

    # --- Model / provider ---
    model_name = sanitize_ui_text(
        metadata.get("model_name"),
        os.getenv("SOLIDWORKS_UI_MODEL", "github:openai/gpt-4.1"),
    )
    model_provider = str(
        metadata.get("model_provider") or provider_from_model_name(model_name)
    )
    model_profile = str(metadata.get("model_profile") or "balanced")

    # --- Active model status ---
    active_model_status = sanitize_ui_text(metadata.get("active_model_status"), "")
    if active_model_path and not active_model_status:
        active_model_status = (
            f"Model path set: {Path(active_model_path).name} (connect pending)."
        )
    if not active_model_path and not active_model_status:
        active_model_status = "No active model connected yet."

    # --- Workflow copy ---
    workflow_label, workflow_guidance_text, flow_header_text = workflow_copy(
        workflow_mode, active_model_path
    )

    # --- Local model ---
    local_endpoint = sanitize_ui_text(
        metadata.get("local_endpoint"),
        os.getenv("SOLIDWORKS_UI_LOCAL_ENDPOINT", "http://127.0.0.1:11434/v1"),
    )

    # --- Readiness ---
    readiness = _compute_readiness(metadata, db_ready=db_ready)

    # --- Context text ---
    active_model_name = Path(active_model_path).name if active_model_path else "<none>"
    preview_views = metadata.get("preview_view_urls") or {}
    model_context_lines = [
        f"Model file: {active_model_name}",
        f"Absolute path: {active_model_path or '<none>'}",
        f"Model type: {str(metadata.get('active_model_type') or '<unknown>')}",
        f"Configuration: {str(metadata.get('active_model_configuration') or '<unknown>')}",
        f"Feature tree rows: {len(feature_tree_items)}",
        f"Selected feature: {selected_feature_name or '<none>'}",
        f"Feature targets: {str(metadata.get('feature_target_text') or '<none>')}",
        f"Preview views captured: {', '.join(sorted(preview_views.keys())) or '<none>'}",
        f"Latest preview status: {preview_status}",
    ]
    model_context_text = "\n".join(model_context_lines)
    context_summary = (
        f"{active_model_name} | {str(metadata.get('active_model_type') or 'unknown')}"
        f" | config {str(metadata.get('active_model_configuration') or '<unknown>')}"
        f" | features {len(feature_tree_items)}"
    )

    fg_warning = feature_grounding_warning_text(
        active_model_path=active_model_path,
        feature_target_text=str(metadata.get("feature_target_text") or ""),
        feature_tree_count=len(feature_tree_items),
    )

    canonical_prompt_text = "\n".join(
        [
            f"Goal: {session_row.get('user_goal') or DEFAULT_USER_GOAL}",
            f"Assumptions: {sanitize_ui_text(metadata.get('assumptions_text'), '') or '<none>'}",
            f"Active model path: {active_model_path or '<none>'}",
            f"Active model status: {active_model_status}",
            f"Feature targets: {str(metadata.get('feature_target_text') or '<none>')}",
            f"Feature target status: {str(metadata.get('feature_target_status') or '<none>')}",
            f"Accepted/proposed family: {session_row.get('accepted_family') or metadata.get('proposed_family') or '<none>'}",
            f"RAG provenance: {str(metadata.get('rag_provenance_text') or '<none>')}",
            f"Docs context: {str(metadata.get('docs_context_text') or '<none>')}",
            f"Engineering notes: {str(metadata.get('notes_text') or '<none>')}",
        ]
    )

    state = DashboardUIState(
        session_id=session_id,
        workflow_mode=workflow_mode,
        workflow_label=workflow_label,
        workflow_guidance_text=workflow_guidance_text,
        user_goal=session_row.get("user_goal") or DEFAULT_USER_GOAL,
        flow_header_text=flow_header_text,
        assumptions_text=sanitize_ui_text(
            metadata.get("assumptions_text"),
            "Assume PETG, 0.4mm nozzle, 0.2mm layers, and 0.30mm mating clearance unless overridden.",
        ),
        active_model_path=active_model_path,
        active_model_status=active_model_status,
        active_model_type=str(metadata.get("active_model_type") or ""),
        active_model_configuration=str(
            metadata.get("active_model_configuration") or ""
        ),
        feature_target_text=str(metadata.get("feature_target_text") or ""),
        feature_target_status=str(
            metadata.get("feature_target_status")
            or "No grounded feature target selected."
        ),
        feature_grounding_warning_text=fg_warning,
        normalized_brief=(
            metadata.get("normalized_brief")
            or session_row.get("user_goal")
            or DEFAULT_USER_GOAL
        ),
        clarifying_questions_text=question_text,
        proposed_family=family,
        family_confidence=confidence,
        family_evidence_text=evidence_text,
        family_warning_text=warning_text,
        accepted_family=session_row.get("accepted_family") or "",
        checkpoints=checkpoints,
        checkpoints_text=checkpoints_text,
        evidence_rows=evidence_rows,
        evidence_rows_text=evidence_rows_text,
        structured_rendering_enabled=structured_rendering_enabled,
        manual_sync_ready=False,
        preview_url=preview_url,
        preview_status=preview_status,
        preview_orientation=metadata.get(
            "preview_orientation", DEFAULT_PREVIEW_ORIENTATION
        ),
        latest_message=metadata.get("latest_message", "Ready."),
        latest_tool=latest_tool,
        latest_error_text=str(metadata.get("latest_error_text") or ""),
        remediation_hint=str(metadata.get("remediation_hint") or ""),
        model_provider=model_provider,
        model_name=model_name,
        model_profile=model_profile,
        local_endpoint=local_endpoint,
        local_model_status_text=str(
            metadata.get("local_model_status_text") or "Local model controls idle."
        ),
        local_model_busy=bool(metadata.get("local_model_busy") or False),
        local_model_available=bool(metadata.get("local_model_available") or False),
        local_model_recommended_tier=str(
            metadata.get("local_model_recommended_tier") or ""
        ),
        local_model_recommended_ollama_model=str(
            metadata.get("local_model_recommended_ollama_model") or ""
        ),
        local_model_pull_command=str(metadata.get("local_model_pull_command") or ""),
        local_model_label=str(metadata.get("local_model_label") or ""),
        rag_source_path=str(metadata.get("rag_source_path") or ""),
        rag_namespace=str(metadata.get("rag_namespace") or "engineering-reference"),
        rag_status=str(
            metadata.get("rag_status") or "No retrieval source ingested yet."
        ),
        rag_index_path=str(metadata.get("rag_index_path") or ""),
        rag_chunk_count=int(metadata.get("rag_chunk_count") or 0),
        rag_provenance_text=str(
            metadata.get("rag_provenance_text")
            or "No retrieval provenance available yet."
        ),
        docs_query=str(metadata.get("docs_query") or "SolidWorks MCP endpoints"),
        docs_context_text=str(
            metadata.get("docs_context_text") or "No docs context loaded yet."
        ),
        notes_text=str(metadata.get("notes_text") or ""),
        orchestration_status=str(metadata.get("orchestration_status") or "Ready."),
        context_save_status=str(metadata.get("context_save_status") or ""),
        context_load_status=str(metadata.get("context_load_status") or ""),
        context_name_input=str(metadata.get("context_name_input") or session_id),
        context_file_input=str(metadata.get("last_context_file") or ""),
        readiness_provider_configured=readiness["readiness_provider_configured"],
        readiness_adapter_mode=readiness["readiness_adapter_mode"],
        readiness_preview_ready=readiness["readiness_preview_ready"],
        readiness_db_ready=readiness["readiness_db_ready"],
        readiness_summary=readiness["readiness_summary"],
        context_used_pct=38,
        context_text=context_summary,
        model_context_text=model_context_text,
        canonical_prompt_text=canonical_prompt_text,
        tool_history_text=tool_history_text,
        api_origin=api_origin,
        preview_viewer_url=preview_viewer_url,
        preview_view_urls=metadata.get("preview_view_urls") or {},
        user_clarification_answer=str(metadata.get("user_clarification_answer") or ""),
        mocked_tools_text=(
            "MOCKED tools: " + ", ".join(metadata.get("mocked_tools", []))
            if metadata.get("mocked_tools")
            else ""
        ),
        feature_tree_items=feature_tree_items,
        selected_feature_name=str(metadata.get("selected_feature_name") or ""),
    ).model_dump()

    logger.debug(
        "[ui.trace.state] session_id={} model_path={} selected={} feature_rows={} preview_views={} latest_tool={}",
        session_id,
        state.get("active_model_path") or "<none>",
        state.get("selected_feature_name") or "<none>",
        len(state.get("feature_tree_items") or []),
        list((state.get("preview_view_urls") or {}).keys()),
        state.get("latest_tool") or "waiting",
    )
    return state

build_dashboard_trace_payload

build_dashboard_trace_payload(session_id: str = DEFAULT_SESSION_ID, *, db_path: Path | None = None, api_origin: str = DEFAULT_API_ORIGIN) -> dict[str, Any]

Assemble the verbose debug/trace payload for the operator trace panel.

Includes the raw session row, full metadata, complete tool-call history, and the assembled DashboardUIState — all serialised to both Python dicts and pretty-printed JSON strings for easy inspection in the UI.

Parameters:

Name Type Description Default
session_id str

Dashboard session identifier.

DEFAULT_SESSION_ID
db_path Path | None

Optional override for the SQLite database path.

None
api_origin str

API origin used for URL generation.

DEFAULT_API_ORIGIN

Returns:

Type Description
dict[str, Any]

Dict with session_row, metadata, state, and tool_records sections.

Source code in src/solidworks_mcp/ui/services/session_service.py
def build_dashboard_trace_payload(
    session_id: str = DEFAULT_SESSION_ID,
    *,
    db_path: Path | None = None,
    api_origin: str = DEFAULT_API_ORIGIN,
) -> dict[str, Any]:
    """Assemble the verbose debug/trace payload for the operator trace panel.

    Includes the raw session row, full metadata, complete tool-call history, and the
    assembled ``DashboardUIState`` — all serialised to both Python dicts and
    pretty-printed JSON strings for easy inspection in the UI.

    Args:
        session_id: Dashboard session identifier.
        db_path: Optional override for the SQLite database path.
        api_origin: API origin used for URL generation.

    Returns:
        Dict with ``session_row``, ``metadata``, ``state``, and ``tool_records`` sections.
    """
    ensure_dashboard_session(session_id, db_path=db_path)
    session_row = get_design_session(session_id, db_path=db_path) or {}
    metadata = parse_json_blob(session_row.get("metadata_json"))
    state = build_dashboard_state(session_id, db_path=db_path, api_origin=api_origin)
    tool_records = trace_tool_records(
        list_tool_call_records(session_id, db_path=db_path)
    )
    session_row_payload = trace_session_row(session_row)

    payload = {
        "session_id": session_id,
        "workflow_mode": state.get("workflow_mode", DEFAULT_WORKFLOW_MODE),
        "latest_message": state.get("latest_message", "Ready."),
        "latest_error_text": state.get("latest_error_text", ""),
        "debug_summary": (
            f"workflow={state.get('workflow_mode', DEFAULT_WORKFLOW_MODE)}"
            f" | model_path={state.get('active_model_path', '') or '<none>'}"
            f" | latest_tool={state.get('latest_tool', 'waiting')}"
            f" | tool_records={len(tool_records)}"
        ),
        "session_row": session_row_payload,
        "session_row_text": trace_json(session_row_payload),
        "metadata": metadata,
        "metadata_text": trace_json(metadata),
        "state": state,
        "state_text": trace_json(state),
        "tool_records": tool_records,
        "tool_records_text": trace_json(tool_records),
    }
    logger.debug(
        "[ui.trace.snapshot] session_id={} model_path={} selected={} tool_records={} preview_views={}",
        session_id,
        state.get("active_model_path") or "<none>",
        state.get("selected_feature_name") or "<none>",
        len(tool_records),
        list((state.get("preview_view_urls") or {}).keys()),
    )
    return payload

connect_target_model async

connect_target_model(session_id: str, *, model_path: str | None = None, uploaded_files: list[dict[str, Any]] | None = None, feature_target_text: str | None = None, db_path: Path | None = None, api_origin: str = DEFAULT_API_ORIGIN) -> dict[str, Any]

Open a target model, inspect its feature tree, and trigger a preview refresh.

Parameters:

Name Type Description Default
session_id str

Dashboard session identifier.

required
model_path str | None

Absolute path to the SolidWorks model file.

None
uploaded_files list[dict[str, Any]] | None

List of base64-encoded file upload dicts.

None
feature_target_text str | None

Optional comma-separated feature target references.

None
db_path Path | None

Optional SQLite path override.

None
api_origin str

Base URL of the running FastAPI server.

DEFAULT_API_ORIGIN

Returns:

Type Description
dict[str, Any]

Full dashboard state payload (includes preview state after the refresh).

Source code in src/solidworks_mcp/ui/services/model_service.py
async def connect_target_model(
    session_id: str,
    *,
    model_path: str | None = None,
    uploaded_files: list[dict[str, Any]] | None = None,
    feature_target_text: str | None = None,
    db_path: Path | None = None,
    api_origin: str = DEFAULT_API_ORIGIN,
) -> dict[str, Any]:
    """Open a target model, inspect its feature tree, and trigger a preview refresh.

    Args:
        session_id: Dashboard session identifier.
        model_path: Absolute path to the SolidWorks model file.
        uploaded_files: List of base64-encoded file upload dicts.
        feature_target_text: Optional comma-separated feature target references.
        db_path: Optional SQLite path override.
        api_origin: Base URL of the running FastAPI server.

    Returns:
        Full dashboard state payload (includes preview state after the refresh).
    """
    from .session_service import build_dashboard_state, ensure_dashboard_session  # noqa: PLC0415
    from .preview_service import refresh_preview  # noqa: PLC0415

    ensure_dashboard_session(session_id, db_path=db_path)
    adapter = None
    resolved_path: Path | None = None

    logger.info(
        "[ui.connect_target_model] session_id={} model_path={} uploaded_files={} feature_targets={}",
        session_id,
        model_path,
        len(uploaded_files) if uploaded_files else 0,
        feature_target_text or "",
    )

    resolved_path = _resolve_model_path(
        session_id,
        model_path=model_path,
        uploaded_files=uploaded_files,
        feature_target_text=feature_target_text,
        db_path=db_path,
        api_origin=api_origin,
    )
    if resolved_path is None:
        return build_dashboard_state(session_id, db_path=db_path, api_origin=api_origin)

    config = load_config()
    adapter = await create_adapter(config)
    model_info: dict[str, Any] = {}
    features: list[dict[str, Any]] = []
    attach_succeeded = False
    tool_input = {
        "model_path": str(resolved_path.resolve()),
        "uploaded_file_name": uploaded_files[0].get("name") if uploaded_files else None,
        "feature_target_text": feature_target_text or "",
    }
    try:
        await adapter.connect()
        open_result = await adapter.open_model(str(resolved_path.resolve()))
        if not open_result.is_success:
            raise RuntimeError(open_result.error or "Failed to open target model.")

        if hasattr(adapter, "get_model_info"):
            info_result = await adapter.get_model_info()
            if info_result.is_success and isinstance(info_result.data, dict):
                model_info = info_result.data

        if hasattr(adapter, "list_features"):
            feature_result = await adapter.list_features(include_suppressed=True)
            if feature_result.is_success and isinstance(feature_result.data, list):
                features = [
                    item for item in feature_result.data if isinstance(item, dict)
                ]

        from ._utils import feature_target_status  # noqa: PLC0415

        classification = classify_feature_tree_snapshot(model_info, features)
        target_status, matched_targets, missing_targets = feature_target_status(
            features, feature_target_text
        )

        snapshot_id = insert_model_state_snapshot(
            session_id=session_id,
            model_path=str(resolved_path.resolve()),
            feature_tree_json=json.dumps(features, ensure_ascii=True),
            state_fingerprint=f"{resolved_path.resolve()}::{resolved_path.stat().st_mtime_ns}",
            db_path=db_path,
        )
        for evidence_line in classification.get("evidence", []):
            insert_evidence_link(
                session_id=session_id,
                source_type="active_model",
                source_id=str(resolved_path.resolve()),
                relevance_score=0.96,
                rationale=str(evidence_line),
                payload_json=json.dumps(classification, ensure_ascii=True),
                db_path=db_path,
            )

        if matched_targets or missing_targets:
            insert_evidence_link(
                session_id=session_id,
                source_type="feature_target",
                source_id=str(resolved_path.resolve()),
                relevance_score=0.9 if matched_targets else 0.4,
                rationale=target_status,
                payload_json=json.dumps(
                    {"matched": matched_targets, "missing": missing_targets},
                    ensure_ascii=True,
                ),
                db_path=db_path,
            )

        metadata = merge_metadata(
            session_id,
            db_path=db_path,
            workflow_mode="edit_existing",
            active_model_path=str(resolved_path.resolve()),
            active_model_status=(
                f"Attached model: {resolved_path.name}"
                f" | type={model_info.get('type', 'unknown')}"
                f" | features={len(features)}"
            ),
            active_model_type=str(model_info.get("type") or ""),
            active_model_configuration=str(
                model_info.get("configuration") or "Default"
            ),
            feature_target_text=feature_target_text or "",
            feature_target_status=target_status,
            proposed_family=classification.get("family") or "unknown",
            family_confidence=classification.get("confidence") or "low",
            family_evidence=classification.get("evidence") or [],
            family_warnings=classification.get("warnings") or [],
            latest_message=(
                f"Opened target model {resolved_path.name}. Generating preview views..."
            ),
            preview_status="Generating preview views from attached model...",
            preview_stl_ready=False,
            preview_png_ready=False,
            preview_viewer_url="",
            latest_error_text="",
            remediation_hint="",
            latest_snapshot_id=snapshot_id,
        )
        insert_tool_call_record(
            session_id=session_id,
            tool_name="ui.connect_target_model",
            input_json=json.dumps(tool_input, ensure_ascii=True),
            output_json=json.dumps(metadata, ensure_ascii=True),
            success=True,
            db_path=db_path,
        )
        attach_succeeded = True
    except Exception as exc:
        logger.exception(
            "[ui.connect_target_model] failed session_id={} path={} error={}",
            session_id,
            str(resolved_path.resolve()) if resolved_path else "<none>",
            exc,
        )
        merge_metadata(
            session_id,
            db_path=db_path,
            workflow_mode="edit_existing",
            active_model_path=str(resolved_path.resolve()),
            feature_target_text=feature_target_text or "",
            latest_message="Failed to attach target model.",
            preview_status="Preview generation failed while attaching model.",
            latest_error_text=str(exc),
            remediation_hint="Open SolidWorks, verify COM access, and retry with a valid .sldprt/.sldasm path.",
        )
        insert_tool_call_record(
            session_id=session_id,
            tool_name="ui.connect_target_model",
            input_json=json.dumps(tool_input, ensure_ascii=True),
            output_json=json.dumps({"error": str(exc)}, ensure_ascii=True),
            success=False,
            db_path=db_path,
        )

    if attach_succeeded:
        try:
            return await refresh_preview(
                session_id,
                orientation=DEFAULT_PREVIEW_ORIENTATION,
                db_path=db_path,
                preview_dir=ensure_preview_dir(),
                api_origin=api_origin,
                adapter_override=adapter,
                active_model_path_override=str(resolved_path.resolve()),
                reopen_active_model=False,
            )
        except Exception as refresh_exc:
            logger.warning(
                "[ui.connect_target_model] post-attach preview refresh failed: {}",
                str(refresh_exc),
            )

    if adapter is not None:
        try:
            await adapter.disconnect()
        except Exception:
            logger.debug("Adapter disconnect failed during target-model cleanup")

    return build_dashboard_state(session_id, db_path=db_path, api_origin=api_origin)

ensure_preview_dir

ensure_preview_dir(preview_dir: Path | None = None) -> Path

Create and return the preview image directory.

Parameters:

Name Type Description Default
preview_dir Path | None

Override directory; defaults to .solidworks_mcp/ui_previews.

None

Returns:

Type Description
Path

Resolved Path that is guaranteed to exist.

Source code in src/solidworks_mcp/ui/services/_utils.py
def ensure_preview_dir(preview_dir: Path | None = None) -> Path:
    """Create and return the preview image directory.

    Args:
        preview_dir: Override directory; defaults to ``.solidworks_mcp/ui_previews``.

    Returns:
        Resolved ``Path`` that is guaranteed to exist.
    """
    resolved = preview_dir or _DEFAULT_PREVIEW_DIR
    resolved.mkdir(parents=True, exist_ok=True)
    return resolved

execute_next_checkpoint async

execute_next_checkpoint(session_id: str = DEFAULT_SESSION_ID, *, db_path: Path | None = None) -> dict[str, Any]

Find and execute the next un-executed checkpoint in the session plan.

Parameters:

Name Type Description Default
session_id str

Dashboard session identifier.

DEFAULT_SESSION_ID
db_path Path | None

Optional SQLite path override.

None

Returns:

Type Description
dict[str, Any]

Full dashboard state payload.

Source code in src/solidworks_mcp/ui/services/checkpoint_service.py
async def execute_next_checkpoint(
    session_id: str = DEFAULT_SESSION_ID,
    *,
    db_path: Path | None = None,
) -> dict[str, Any]:
    """Find and execute the next un-executed checkpoint in the session plan.

    Args:
        session_id: Dashboard session identifier.
        db_path: Optional SQLite path override.

    Returns:
        Full dashboard state payload.
    """
    from .session_service import build_dashboard_state, ensure_dashboard_session  # noqa: PLC0415

    session_row = ensure_dashboard_session(session_id, db_path=db_path)
    checkpoints = list_plan_checkpoints(session_id, db_path=db_path)
    target = next((row for row in checkpoints if not row["executed"]), None)
    if target is None:
        merge_metadata(
            session_id,
            db_path=db_path,
            latest_message="All checkpoints have already been executed.",
        )
        return build_dashboard_state(session_id, db_path=db_path)

    planned = parse_json_blob(target["planned_action_json"])
    session_meta = parse_json_blob(session_row.get("metadata_json"))

    if _should_open_empty_part(session_meta, planned):
        await _open_empty_part_before_checkpoint(
            session_id=session_id,
            session_row=session_row,
            db_path=db_path,
            api_origin=DEFAULT_API_ORIGIN,
        )

    run_summary = await _run_checkpoint_tools(
        planned,
        session_id=session_id,
        checkpoint_index=int(target["checkpoint_index"]),
        checkpoint_title=str(target["title"]),
        repair_context={
            "active_sketch_plane": session_meta.get("active_sketch_plane"),
            "last_sketch_name": session_meta.get("last_sketch_name"),
            "user_goal": session_row.get("user_goal") or DEFAULT_USER_GOAL,
        },
    )
    failed_tools = run_summary["failed_tools"]
    tool_runs = run_summary["tool_runs"]
    mocked_tools = run_summary.get("mocked_tools", [])
    executed = not failed_tools

    if failed_tools:
        message = (
            f"Checkpoint {target['checkpoint_index']} failed on tools: "
            f"{', '.join(failed_tools)}."
        )
    elif mocked_tools and not any(r.get("status") == "success" for r in tool_runs):
        message = (
            f"Checkpoint {target['checkpoint_index']} executed "
            f"(tools MOCKED: {', '.join(mocked_tools)})."
        )
    else:
        message = (
            f"Executed checkpoint {target['checkpoint_index']}: {target['title']}."
        )

    result_json = json.dumps(
        {
            "status": "success" if executed else "error",
            "message": message,
            "tools": _planned_tools(planned),
            "tool_runs": tool_runs,
            "failed_tools": failed_tools,
            "script_path": run_summary.get("script_path", ""),
            "script_text": run_summary.get("script_text", ""),
            "stdout_tail": str(run_summary.get("stdout_text", ""))[-4000:],
            "stderr_tail": str(run_summary.get("stderr_text", ""))[-4000:],
            "validation_failures": run_summary.get("validation_failures", []),
        },
        ensure_ascii=True,
    )
    update_plan_checkpoint(
        int(target["id"]),
        approved_by_user=True,
        executed=executed,
        result_json=result_json,
        db_path=db_path,
    )

    for tool_run in tool_runs:
        insert_tool_call_record(
            session_id=session_id,
            checkpoint_id=int(target["id"]),
            tool_name=tool_run["tool"],
            input_json=json.dumps(planned, ensure_ascii=True),
            output_json=json.dumps(tool_run, ensure_ascii=True),
            success=tool_run["status"] == "success",
            db_path=db_path,
        )

    upsert_design_session(
        session_id=session_id,
        user_goal=session_row.get("user_goal") or DEFAULT_USER_GOAL,
        source_mode=session_row.get("source_mode") or DEFAULT_SOURCE_MODE,
        accepted_family=session_row.get("accepted_family"),
        status="executing" if executed else "error",
        current_checkpoint_index=(
            target["checkpoint_index"]
            if executed
            else session_row.get("current_checkpoint_index") or 0
        ),
        metadata_json=session_row.get("metadata_json"),
        db_path=db_path,
    )
    merge_metadata(
        session_id,
        db_path=db_path,
        latest_message=message,
        latest_error_text=(message if failed_tools else ""),
        remediation_hint=(
            "Open the generated checkpoint script, repair planned parameters, then rerun this checkpoint."
            if failed_tools
            else ""
        ),
    )
    return build_dashboard_state(session_id, db_path=db_path)

health async

health() -> dict[str, str]

Health check endpoint.

Source code in src/solidworks_mcp/ui/server.py
@app.get("/api/health")
async def health() -> dict[str, str]:
    """Health check endpoint."""
    return {"status": "ok"}

ingest_reference_source

ingest_reference_source(session_id: str, *, source_path: str, namespace: str, chunk_size: int = 1200, overlap: int = 200, db_path: Path | None = None) -> dict[str, Any]

Ingest a local file or URL into the simple local retrieval index.

Parameters:

Name Type Description Default
session_id str

Dashboard session identifier.

required
source_path str

Absolute file path or http/https URL.

required
namespace str

Namespace key that isolates this index from others.

required
chunk_size int

Maximum characters per chunk.

1200
overlap int

Overlapping characters between adjacent chunks.

200
db_path Path | None

Optional override for the SQLite database path.

None

Returns:

Type Description
dict[str, Any]

Full dashboard state payload.

Source code in src/solidworks_mcp/ui/services/docs_service.py
def ingest_reference_source(
    session_id: str,
    *,
    source_path: str,
    namespace: str,
    chunk_size: int = 1200,
    overlap: int = 200,
    db_path: Path | None = None,
) -> dict[str, Any]:
    """Ingest a local file or URL into the simple local retrieval index.

    Args:
        session_id: Dashboard session identifier.
        source_path: Absolute file path or http/https URL.
        namespace: Namespace key that isolates this index from others.
        chunk_size: Maximum characters per chunk.
        overlap: Overlapping characters between adjacent chunks.
        db_path: Optional override for the SQLite database path.

    Returns:
        Full dashboard state payload.
    """
    from .session_service import build_dashboard_state, ensure_dashboard_session  # noqa: PLC0415

    ensure_dashboard_session(session_id, db_path=db_path)
    source_reference = (source_path or "").strip()
    resolved_namespace = (
        namespace or "engineering-reference"
    ).strip() or "engineering-reference"

    try:
        if is_url_reference(source_reference):
            source_identifier = source_reference
            source_text, source_label = read_reference_url(source_reference)
        else:
            resolved_source = Path(source_reference).expanduser()
            if not resolved_source.exists():
                merge_metadata(
                    session_id,
                    db_path=db_path,
                    rag_source_path=str(resolved_source),
                    rag_namespace=resolved_namespace,
                    rag_status="Reference source path was not found.",
                    latest_error_text=f"Missing reference source: {resolved_source}",
                    remediation_hint=(
                        "Provide an absolute path or an http/https URL for a PDF, "
                        "markdown, text, or HTML source."
                    ),
                )
                return build_dashboard_state(session_id, db_path=db_path)
            source_identifier = str(resolved_source.resolve())
            source_label = resolved_source.name
            source_text = read_reference_source(resolved_source)

        chunks = _chunk_text(source_text, chunk_size=chunk_size, overlap=overlap)
        output_path = DEFAULT_RAG_DIR / f"{resolved_namespace}.json"
        output_path.parent.mkdir(parents=True, exist_ok=True)
        payload = {
            "version": "1.0",
            "namespace": resolved_namespace,
            "source_location": source_identifier,
            "chunk_count": len(chunks),
            "chunks": [
                {
                    "id": f"{resolved_namespace}-{index}",
                    "source": source_identifier,
                    "text": chunk,
                }
                for index, chunk in enumerate(chunks, start=1)
            ],
        }
        output_path.write_text(
            json.dumps(payload, indent=2, ensure_ascii=True), encoding="utf-8"
        )

        # --- FAISS vector index (best-effort; skipped if optional deps missing) ---
        try:
            from ...agents.vector_rag import VectorRAGIndex  # noqa: PLC0415

            idx = VectorRAGIndex.load(
                namespace=resolved_namespace, rag_dir=DEFAULT_RAG_DIR
            )
            for chunk in payload["chunks"]:
                idx.ingest_text(
                    chunk["text"], source=source_identifier, tags=[resolved_namespace]
                )
            idx.save()
            logger.info(
                "[ui.ingest_reference_source] FAISS index updated namespace={} chunks={}",
                resolved_namespace,
                len(chunks),
            )
        except ImportError:
            logger.debug(
                "[ui.ingest_reference_source] FAISS not available; skipping vector index"
            )
        except Exception as faiss_exc:
            logger.warning(
                "[ui.ingest_reference_source] FAISS indexing failed (non-fatal): {}",
                faiss_exc,
            )

        insert_evidence_link(
            session_id=session_id,
            source_type="rag_ingest",
            source_id=source_identifier,
            relevance_score=0.88,
            rationale=f"Ingested {len(chunks)} chunk(s) into namespace '{resolved_namespace}'.",
            payload_json=json.dumps(
                {
                    "namespace": resolved_namespace,
                    "index_path": str(output_path.resolve()),
                    "chunk_count": len(chunks),
                },
                ensure_ascii=True,
            ),
            db_path=db_path,
        )
        merge_metadata(
            session_id,
            db_path=db_path,
            rag_source_path=source_identifier,
            rag_namespace=resolved_namespace,
            rag_status=f"Ingested {len(chunks)} chunk(s) from {source_label}.",
            rag_index_path=str(output_path.resolve()),
            rag_chunk_count=len(chunks),
            rag_provenance_text=(
                f"Namespace {resolved_namespace} | source {source_label} | chunks {len(chunks)}"
            ),
            latest_message=f"Reference source {source_label} ingested for retrieval.",
            latest_error_text="",
            remediation_hint="",
        )
        insert_tool_call_record(
            session_id=session_id,
            tool_name="ui.ingest_reference_source",
            input_json=json.dumps(
                {
                    "source_path": source_identifier,
                    "namespace": resolved_namespace,
                    "chunk_size": chunk_size,
                    "overlap": overlap,
                },
                ensure_ascii=True,
            ),
            output_json=json.dumps(
                {"index_path": str(output_path.resolve()), "chunk_count": len(chunks)},
                ensure_ascii=True,
            ),
            success=True,
            db_path=db_path,
        )
    except Exception as exc:
        merge_metadata(
            session_id,
            db_path=db_path,
            rag_source_path=source_reference,
            rag_namespace=resolved_namespace,
            rag_status="Reference ingestion failed.",
            latest_error_text=str(exc),
            remediation_hint=(
                "Use a readable local file or http/https URL and ensure optional "
                "PDF dependencies are installed."
            ),
        )
        insert_tool_call_record(
            session_id=session_id,
            tool_name="ui.ingest_reference_source",
            input_json=json.dumps(
                {"source_path": source_reference, "namespace": resolved_namespace},
                ensure_ascii=True,
            ),
            output_json=json.dumps({"error": str(exc)}, ensure_ascii=True),
            success=False,
            db_path=db_path,
        )

    return build_dashboard_state(session_id, db_path=db_path)

inspect_family async

inspect_family(session_id: str, user_goal: str, *, db_path: Path | None = None, model_name: str | None = None) -> dict[str, Any]

Run LLM-backed family classification and suggested checkpoints.

Parameters:

Name Type Description Default
session_id str

Dashboard session identifier.

required
user_goal str

Free-text description of the design intent.

required
db_path Path | None

Optional SQLite path override.

None
model_name str | None

Optional provider-qualified model override.

None

Returns:

Type Description
dict[str, Any]

Full dashboard state payload.

Source code in src/solidworks_mcp/ui/services/llm_service.py
async def inspect_family(
    session_id: str,
    user_goal: str,
    *,
    db_path: Path | None = None,
    model_name: str | None = None,
) -> dict[str, Any]:
    """Run LLM-backed family classification and suggested checkpoints.

    Args:
        session_id: Dashboard session identifier.
        user_goal: Free-text description of the design intent.
        db_path: Optional SQLite path override.
        model_name: Optional provider-qualified model override.

    Returns:
        Full dashboard state payload.
    """
    from .session_service import build_dashboard_state, ensure_dashboard_session  # noqa: PLC0415

    ensure_dashboard_session(session_id, user_goal=user_goal, db_path=db_path)
    session_row = get_design_session(session_id, db_path=db_path) or {}
    meta = _parse_json_blob(session_row.get("metadata_json"))
    resolved_model = normalize_model_name_for_provider(
        model_name or meta.get("model_name"),
        provider=sanitize_ui_text(meta.get("model_provider"), "github"),
        profile=sanitize_ui_text(meta.get("model_profile"), "balanced"),
    )
    resolved_endpoint = sanitize_ui_text(
        meta.get("local_endpoint"),
        os.getenv("SOLIDWORKS_UI_LOCAL_ENDPOINT", "http://127.0.0.1:11434/v1"),
    )

    local_family = meta.get("proposed_family", "") or "<not yet classified>"
    local_evidence = " | ".join(meta.get("family_evidence", [])) or "<none>"
    prompt = (
        "## TASK\n"
        "Apply the Feature-Tree-Reconstruction skill to classify the SolidWorks feature family "
        "and produce a human-reviewable checkpoint plan.\n\n"
        "## DESIGN GOAL\n"
        f"{user_goal}\n\n"
        "## MODEL CONTEXT\n"
        f"Active model path  : {meta.get('active_model_path', '') or '<none>'}\n"
        f"Active model status: {meta.get('active_model_status', '') or '<none>'}\n"
        f"Feature target refs: {meta.get('feature_target_text', '') or '<none>'}\n"
        f"Feature target status: {meta.get('feature_target_status', '') or '<none>'}\n\n"
        "## LOCAL CLASSIFIER EVIDENCE (pre-computed)\n"
        f"Family  : {local_family}\n"
        f"Evidence: {local_evidence}\n\n"
        "## REFERENCE CORPUS\n"
        f"{meta.get('rag_provenance_text', '') or '<none>'}\n\n"
        "## FEATURE-TREE RECONSTRUCTION SKILL\n"
        "Inspection sequence when model is available (use your mcp tools):\n"
        "  open_model → get_model_info → list_features(include_suppressed=True) "
        "→ get_mass_properties → classify_feature_tree\n"
        "Feature families: revolve | extrude | sheet_metal | advanced_solid | assembly | drawing | unknown\n"
        "Delegation rules:\n"
        "  - sheet_metal or advanced_solid → VBA-aware reconstruction path\n"
        "  - simple part family → direct MCP checkpoint plan\n"
        "  - assembly → component-first decomposition, part-level plan per component\n"
        "Guardrail: never reconstruct from silhouette only. "
        "If confidence is low and contradictory evidence exists, propose more inspection steps. "
        "When no model is attached, treat explicit user-supplied dimensions, named sketch phases, feature ordering, and manufacturing constraints as valid user-confirmed evidence for a conservative plan.\n\n"
        "## OUTPUT CONTRACT\n"
        "Return: family, confidence (high/medium/low), evidence[], warnings[], checkpoints[3-6].\n"
        "You must always emit at least 3 checkpoints when the prompt contains enough explicit geometry to start from an empty part, even if confidence is only low or medium.\n"
        "Each checkpoint: title, allowed_tools[] (from MCP tool catalog), rationale."
    )

    result = await _run_structured_agent(
        system_prompt=(
            "## ROLE\n"
            "You are a SolidWorks routing assistant applying the Feature-Tree-Reconstruction skill.\n"
            "Classify the feature family with evidence and confidence, then produce a safe "
            "checkpoint plan for human review.\n\n"
            "## ORCHESTRATION NOTES\n"
            "  - Inspection before planning: never produce a build plan without at least one "
            "evidence item from model inspection or user-confirmed context.\n"
            "  - Explicit user-specified dimensions, feature names, and operation ordering count as user-confirmed context when no model is attached.\n"
            "  - Propose 3-6 conservative checkpoints. Require human confirmation before each "
            "irreversible step.\n"
            "  - For sheet metal or unsupported advanced features, route to VBA-aware planning.\n"
            "  - Surface warnings when evidence is contradictory or confidence is low.\n"
            "  - Prefer 'extrude' for prompt-only parts built from named sketches and base extrusions unless stronger contrary evidence exists."
        ),
        user_prompt=prompt,
        result_type=FamilyInspection,
        model_name=resolved_model,
        local_endpoint=resolved_endpoint,
    )

    if isinstance(result, RecoverableFailure):
        ordered_features = _extract_explicit_feature_order(user_goal)
        if ordered_features:
            result = _coerce_explicit_feature_order_plan(
                user_goal,
                FamilyInspection(
                    family="extrude",
                    confidence="medium",
                    evidence=[
                        "Deterministic fallback engaged because model routing failed.",
                        "Goal includes an explicit feature-order sequence.",
                    ],
                    warnings=[
                        result.explanation,
                        "Using explicit feature-order checkpoints until model routing is healthy.",
                    ],
                    checkpoints=[],
                ),
            )
        else:
            merge_metadata(
                session_id,
                db_path=db_path,
                latest_message=result.explanation,
                latest_error_text=result.explanation,
                remediation_hint=(
                    result.remediation_steps[0]
                    if result.remediation_steps
                    else "Adjust provider/model settings, then retry inspect."
                ),
            )
            insert_tool_call_record(
                session_id=session_id,
                tool_name="ui.inspect_family",
                input_json=json.dumps({"user_goal": user_goal}, ensure_ascii=True),
                output_json=result.model_dump_json(),
                success=False,
                db_path=db_path,
            )
            return build_dashboard_state(session_id, db_path=db_path)

    result = _coerce_explicit_feature_order_plan(user_goal, result)

    evidence_payload = []
    for index, line in enumerate(result.evidence, start=1):
        insert_evidence_link(
            session_id=session_id,
            source_type="llm",
            source_id=f"family_evidence_{index}",
            relevance_score=0.85,
            rationale=line,
            payload_json=json.dumps({"family": result.family}, ensure_ascii=True),
            db_path=db_path,
        )
        evidence_payload.append(line)

    if result.checkpoints:
        replacement_rows: list[dict[str, Any]] = []
        for index, checkpoint in enumerate(result.checkpoints, start=1):
            replacement_rows.append(
                {
                    "checkpoint_index": index,
                    "title": checkpoint.title,
                    "planned_action_json": json.dumps(
                        {
                            "title": checkpoint.title,
                            "goal": checkpoint.title,
                            "tools": checkpoint.allowed_tools,
                            "rationale": checkpoint.rationale,
                            **checkpoint.execution,
                        },
                        ensure_ascii=True,
                    ),
                    "approved_by_user": index == 1,
                }
            )

        replace_plan_checkpoints(
            session_id=session_id,
            checkpoints=replacement_rows,
            db_path=db_path,
        )

    metadata = merge_metadata(
        session_id,
        db_path=db_path,
        user_goal=user_goal,
        proposed_family=result.family,
        family_confidence=result.confidence,
        family_evidence=evidence_payload,
        family_warnings=result.warnings,
        latest_message=f"Updated family classification to '{result.family}' from GitHub Copilot.",
        latest_error_text="",
        remediation_hint="",
    )
    insert_tool_call_record(
        session_id=session_id,
        tool_name="ui.inspect_family",
        input_json=json.dumps({"user_goal": user_goal}, ensure_ascii=True),
        output_json=result.model_dump_json(),
        success=True,
        db_path=db_path,
    )
    insert_evidence_link(
        session_id=session_id,
        source_type="llm",
        source_id="family_inspection",
        relevance_score=0.93,
        rationale="LLM family classification and checkpoint suggestions from GitHub Copilot.",
        payload_json=json.dumps(metadata, ensure_ascii=True),
        db_path=db_path,
    )
    return build_dashboard_state(session_id, db_path=db_path)

log_ui_requests async

log_ui_requests(request: Request, call_next)

Handle log ui requests.

Parameters:

Name Type Description Default
request Request

The request value.

required
call_next Any

The call next value.

required

Returns:

Name Type Description
Any

The result produced by the operation.

Source code in src/solidworks_mcp/ui/server.py
@app.middleware("http")
async def log_ui_requests(request: Request, call_next):
    """Handle log ui requests.

    Args:
        request (Request): The request value.
        call_next (Any): The call next value.

    Returns:
        Any: The result produced by the operation.
    """

    if not _should_log_request(request.url.path):
        return await call_next(request)

    started_at = time.perf_counter()
    raw_body = await request.body()
    payload = {
        "method": request.method,
        "path": request.url.path,
        "query": dict(request.query_params),
        "body": _decode_request_body(raw_body),
    }

    logger.info(
        "[ui.http.request] payload={}",
        json.dumps(payload, ensure_ascii=True, default=str),
    )

    async def receive() -> dict[str, Any]:
        """Handle receive.

        Returns:
            dict[str, Any]: A dictionary containing the resulting values.
        """

        return {"type": "http.request", "body": raw_body, "more_body": False}

    request = Request(request.scope, receive)

    try:
        response = await call_next(request)
    except Exception as exc:
        duration_ms = round((time.perf_counter() - started_at) * 1000, 2)
        logger.exception(
            "[ui.http.error] method={} path={} duration_ms={} error={}",
            request.method,
            request.url.path,
            duration_ms,
            exc,
        )
        raise

    duration_ms = round((time.perf_counter() - started_at) * 1000, 2)
    logger.info(
        "[ui.http.response] method={} path={} status_code={} duration_ms={}",
        request.method,
        request.url.path,
        response.status_code,
        duration_ms,
    )
    return response

main

main() -> None

Run the dashboard backend locally.

Source code in src/solidworks_mcp/ui/server.py
def main() -> None:
    """Run the dashboard backend locally."""
    uvicorn.run(app, host="127.0.0.1", port=8766)

reconcile_manual_edits

reconcile_manual_edits(session_id: str, *, db_path: Path | None = None) -> dict[str, Any]

Compare the two most-recent snapshots and summarise any detected changes.

Parameters:

Name Type Description Default
session_id str

Dashboard session identifier.

required
db_path Path | None

Optional override for the SQLite database path.

None

Returns:

Type Description
dict[str, Any]

Full dashboard state payload.

Source code in src/solidworks_mcp/ui/services/session_service.py
def reconcile_manual_edits(
    session_id: str,
    *,
    db_path: Path | None = None,
) -> dict[str, Any]:
    """Compare the two most-recent snapshots and summarise any detected changes.

    Args:
        session_id: Dashboard session identifier.
        db_path: Optional override for the SQLite database path.

    Returns:
        Full dashboard state payload.
    """
    ensure_dashboard_session(session_id, db_path=db_path)
    snapshots = list_model_state_snapshots(session_id, db_path=db_path)

    if len(snapshots) < 2:
        message = (
            "Not enough snapshots yet. Capture another preview after manual edits."
        )
    else:
        latest = snapshots[0]
        previous = snapshots[1]
        changed = latest.get("state_fingerprint") != previous.get(
            "state_fingerprint"
        ) or latest.get("screenshot_path") != previous.get("screenshot_path")
        message = (
            "Detected manual changes. Options: accept edits, patch toward goal, or rollback."
            if changed
            else "No visual/state change detected since the last accepted snapshot."
        )

    persist_ui_action(
        session_id,
        tool_name="ui.reconcile_manual_edits",
        db_path=db_path,
        metadata_updates={"latest_message": message},
        output_payload={"message": message},
    )
    return build_dashboard_state(session_id, db_path=db_path)

refresh_preview async

refresh_preview(session_id: str, *, orientation: str = DEFAULT_PREVIEW_ORIENTATION, db_path: Path | None = None, preview_dir: Path | None = None, api_origin: str = DEFAULT_API_ORIGIN, adapter_override: Any | None = None, active_model_path_override: str | None = None, reopen_active_model: bool = True) -> dict[str, Any]

Export the current SolidWorks viewport to a PNG preview and GLB/STL for the 3D viewer.

Parameters:

Name Type Description Default
session_id str

Dashboard session identifier.

required
orientation str

View orientation for the PNG export ("front", "top", "right", "isometric", "current").

DEFAULT_PREVIEW_ORIENTATION
db_path Path | None

Optional SQLite path override.

None
preview_dir Path | None

Override for the preview output directory.

None
api_origin str

Base URL of the running FastAPI server.

DEFAULT_API_ORIGIN
adapter_override Any | None

Pre-connected adapter (avoids creating a second connection when called from connect_target_model).

None
active_model_path_override str | None

Override for the model path to reopen.

None
reopen_active_model bool

When True, reopen the persisted active model path before exporting (default).

True

Returns:

Type Description
dict[str, Any]

Full dashboard state payload.

Source code in src/solidworks_mcp/ui/services/preview_service.py
async def refresh_preview(
    session_id: str,
    *,
    orientation: str = DEFAULT_PREVIEW_ORIENTATION,
    db_path: Path | None = None,
    preview_dir: Path | None = None,
    api_origin: str = DEFAULT_API_ORIGIN,
    adapter_override: Any | None = None,
    active_model_path_override: str | None = None,
    reopen_active_model: bool = True,
) -> dict[str, Any]:
    """Export the current SolidWorks viewport to a PNG preview and GLB/STL for the 3D viewer.

    Args:
        session_id: Dashboard session identifier.
        orientation: View orientation for the PNG export
            (``"front"``, ``"top"``, ``"right"``, ``"isometric"``, ``"current"``).
        db_path: Optional SQLite path override.
        preview_dir: Override for the preview output directory.
        api_origin: Base URL of the running FastAPI server.
        adapter_override: Pre-connected adapter (avoids creating a second connection when
            called from ``connect_target_model``).
        active_model_path_override: Override for the model path to reopen.
        reopen_active_model: When ``True``, reopen the persisted active model path before
            exporting (default).

    Returns:
        Full dashboard state payload.
    """
    from .session_service import build_dashboard_state, ensure_dashboard_session  # noqa: PLC0415

    ensure_dashboard_session(session_id, db_path=db_path)
    logger.info(
        "[ui.refresh_preview] session_id={} orientation={}",
        session_id,
        orientation,
    )
    session_row = get_design_session(session_id, db_path=db_path) or {}
    metadata = parse_json_blob(session_row.get("metadata_json"))
    preview_viewer_url = sanitize_preview_viewer_url(
        metadata.get("preview_viewer_url"),
        session_id=session_id,
        api_origin=api_origin,
    )
    resolved_preview_dir = ensure_preview_dir(preview_dir)
    preview_path = resolved_preview_dir / f"{session_id}.png"
    active_model_path = active_model_path_override or metadata.get("active_model_path")
    adapter = adapter_override
    owns_adapter = adapter_override is None

    try:
        if not active_model_path:
            raise RuntimeError(
                "No attached model path found for preview refresh. Attach a target model first."
            )

        if adapter is None:
            config = load_config()
            adapter = await create_adapter(config)
        if owns_adapter:
            await adapter.connect()
        if reopen_active_model and hasattr(adapter, "open_model"):
            await _reopen_target_model_for_preview(
                adapter, str(active_model_path), context="preview"
            )

        # --- Step 1: Export 3D geometry for the Three.js viewer (GLB preferred) ---
        glb_path = resolved_preview_dir / f"{session_id}.glb"
        stl_path = resolved_preview_dir / f"{session_id}.stl"
        viewer_ts = int(time.time())
        viewer_format = "none"
        try:
            glb_result = await adapter.export_file(str(glb_path.resolve()), "glb")
            if (
                glb_result.is_success
                and glb_path.exists()
                and glb_path.stat().st_size > 0
            ):
                viewer_format = "glb"
                viewer_ts = int(glb_path.stat().st_mtime)
                logger.info(
                    "[ui.refresh_preview] GLB export succeeded path={}",
                    str(glb_path.resolve()),
                )
        except Exception as _glb_exc:
            logger.warning("[ui.refresh_preview] GLB export failed: {}", str(_glb_exc))

        if viewer_format == "none":
            try:
                stl_result = await adapter.export_file(str(stl_path.resolve()), "stl")
                if (
                    stl_result.is_success
                    and stl_path.exists()
                    and stl_path.stat().st_size > 0
                ):
                    viewer_format = "stl"
                    viewer_ts = int(stl_path.stat().st_mtime)
                    logger.info(
                        "[ui.refresh_preview] STL export succeeded path={}",
                        str(stl_path.resolve()),
                    )
            except Exception:
                logger.debug("[ui.refresh_preview] STL export skipped (adapter error)")

        preview_viewer_url = (
            f"{api_origin}/api/ui/viewer/{session_id}?t={viewer_ts}&fmt={viewer_format}"
        )

        # --- Step 2: Export PNG screenshot (best-effort) ---
        png_payload = {
            "file_path": str(preview_path.resolve()),
            "format_type": "png",
            "width": 1280,
            "height": 720,
            "view_orientation": orientation,
        }
        png_ok = False
        png_error: str = ""
        snapshot_id: int | None = None
        try:
            result = await adapter.export_image(png_payload)
            if result.is_success and preview_path.exists():
                png_ok = True
                logger.info(
                    "[ui.refresh_preview] PNG export succeeded file_path={}",
                    str(preview_path.resolve()),
                )
                snapshot_id = insert_model_state_snapshot(
                    session_id=session_id,
                    screenshot_path=str(preview_path.resolve()),
                    state_fingerprint=f"preview-{preview_path.stat().st_mtime_ns}",
                    db_path=db_path,
                )
                insert_tool_call_record(
                    session_id=session_id,
                    tool_name="export_image",
                    input_json=json.dumps(png_payload, ensure_ascii=True),
                    output_json=json.dumps(result.data or {}, ensure_ascii=True),
                    success=True,
                    db_path=db_path,
                )
            else:
                png_error = result.error or "export_image returned failure"
                logger.warning("[ui.refresh_preview] PNG export failed: {}", png_error)
        except Exception as png_exc:
            png_error = str(png_exc)
            logger.warning("[ui.refresh_preview] PNG export exception: {}", png_exc)

        if owns_adapter:
            await adapter.disconnect()

        # --- Step 3: Export per-orientation PNG thumbnails ---
        VIEW_ORIENTATIONS = ["isometric", "front", "top", "right"]
        preview_view_urls: dict[str, str] = {}
        try:
            config2 = load_config()
            adapter2 = await create_adapter(config2)
            await adapter2.connect()
            if hasattr(adapter2, "open_model"):
                await _reopen_target_model_for_preview(
                    adapter2,
                    str(active_model_path),
                    context="orientation previews",
                )
            _sel_name = str(
                metadata.get("selected_feature_selector_name")
                or metadata.get("selected_feature_name")
                or ""
            ).strip()
            if _sel_name and hasattr(adapter2, "select_feature"):
                try:
                    await adapter2.select_feature(_sel_name)
                    logger.info(
                        "[ui.refresh_preview] re-selected '{}' before view screenshots",
                        _sel_name,
                    )
                except Exception as _sel_exc:
                    logger.debug(
                        "[ui.refresh_preview] re-select '{}' failed (non-fatal): {}",
                        _sel_name,
                        _sel_exc,
                    )
            for view_name in VIEW_ORIENTATIONS:
                view_path = resolved_preview_dir / f"{session_id}-{view_name}.png"
                try:
                    if _sel_name and hasattr(adapter2, "select_feature"):
                        await adapter2.select_feature(_sel_name)
                    view_result = await adapter2.export_image(
                        {
                            "file_path": str(view_path.resolve()),
                            "format_type": "png",
                            "width": 640,
                            "height": 480,
                            "view_orientation": view_name,
                        }
                    )
                    if view_result.is_success and view_path.exists():
                        ts = int(view_path.stat().st_mtime)
                        preview_view_urls[view_name] = (
                            f"{api_origin}/previews/{view_path.name}?ts={ts}"
                        )
                        logger.info(
                            "[ui.refresh_preview] view PNG {} exported", view_name
                        )
                    else:
                        logger.warning(
                            "[ui.refresh_preview] view PNG {} failed: {}",
                            view_name,
                            view_result.error or "no detail",
                        )
                except Exception as _ve:
                    logger.warning(
                        "[ui.refresh_preview] view PNG {} exception: {}",
                        view_name,
                        str(_ve),
                    )
            await adapter2.disconnect()
        except Exception as _views_exc:
            logger.warning(
                "[ui.refresh_preview] multi-view export failed: {}", str(_views_exc)
            )

        # Preserve existing view URLs when a refresh attempt returns no images.
        existing_view_urls = metadata.get("preview_view_urls")
        if isinstance(existing_view_urls, dict):
            if not preview_view_urls:
                preview_view_urls = dict(existing_view_urls)
            else:
                merged_view_urls = dict(existing_view_urls)
                merged_view_urls.update(preview_view_urls)
                preview_view_urls = merged_view_urls

        viewer_label = (
            f"3D viewer ({viewer_format.upper()})"
            if viewer_format != "none"
            else "3D viewer (no model)"
        )
        png_label = "PNG" if png_ok else f"no PNG ({png_error})"
        status_msg = f"Preview refreshed ({viewer_label}, {png_label})."

        merge_metadata(
            session_id,
            db_path=db_path,
            preview_orientation=orientation,
            latest_message=status_msg,
            preview_status=status_msg,
            latest_snapshot_id=(str(snapshot_id) if snapshot_id is not None else ""),
            preview_viewer_url=preview_viewer_url,
            preview_stl_ready=(viewer_format != "none"),
            preview_png_ready=png_ok,
            preview_view_urls=preview_view_urls,
            latest_error_text="",
            remediation_hint="",
        )
    except Exception as exc:
        logger.exception("[ui.refresh_preview] failed: {}", exc)
        insert_tool_call_record(
            session_id=session_id,
            tool_name="export_image",
            input_json=json.dumps({"orientation": orientation}, ensure_ascii=True),
            output_json=json.dumps({"error": str(exc)}, ensure_ascii=True),
            success=False,
            db_path=db_path,
        )
        merge_metadata(
            session_id,
            db_path=db_path,
            latest_message=f"Preview refresh failed: {exc}",
            preview_status=f"Preview refresh failed: {exc}",
            preview_orientation=orientation,
            latest_error_text=str(exc),
            remediation_hint="Open a model in SolidWorks and retry preview refresh.",
        )

    return build_dashboard_state(session_id, db_path=db_path, api_origin=api_origin)

request_clarifications async

request_clarifications(session_id: str, user_goal: str, *, user_answer: str = '', db_path: Path | None = None, model_name: str | None = None) -> dict[str, Any]

Generate focused follow-up questions for the current design goal using LLM.

Parameters:

Name Type Description Default
session_id str

Dashboard session identifier.

required
user_goal str

Free-text description of the design intent.

required
user_answer str

Any previous answers the user has already provided.

''
db_path Path | None

Optional SQLite path override.

None
model_name str | None

Optional provider-qualified model override.

None

Returns:

Type Description
dict[str, Any]

Full dashboard state payload.

Source code in src/solidworks_mcp/ui/services/llm_service.py
async def request_clarifications(
    session_id: str,
    user_goal: str,
    *,
    user_answer: str = "",
    db_path: Path | None = None,
    model_name: str | None = None,
) -> dict[str, Any]:
    """Generate focused follow-up questions for the current design goal using LLM.

    Args:
        session_id: Dashboard session identifier.
        user_goal: Free-text description of the design intent.
        user_answer: Any previous answers the user has already provided.
        db_path: Optional SQLite path override.
        model_name: Optional provider-qualified model override.

    Returns:
        Full dashboard state payload.
    """
    from .session_service import build_dashboard_state, ensure_dashboard_session  # noqa: PLC0415

    ensure_dashboard_session(session_id, user_goal=user_goal, db_path=db_path)
    session_row = get_design_session(session_id, db_path=db_path) or {}
    meta = _parse_json_blob(session_row.get("metadata_json"))
    resolved_model = normalize_model_name_for_provider(
        model_name or meta.get("model_name"),
        provider=sanitize_ui_text(meta.get("model_provider"), "github"),
        profile=sanitize_ui_text(meta.get("model_profile"), "balanced"),
    )
    resolved_endpoint = sanitize_ui_text(
        meta.get("local_endpoint"),
        os.getenv("SOLIDWORKS_UI_LOCAL_ENDPOINT", "http://127.0.0.1:11434/v1"),
    )

    answer_section = (
        f"\n## USER ANSWERS / CLARIFICATIONS\n{user_answer}" if user_answer else ""
    )
    assumptions_text = sanitize_ui_text(
        meta.get("assumptions_text"), "<none specified>"
    )
    prompt = (
        "## TASK\n"
        "Prepare a SolidWorks design brief using the Printer-Profile-Tolerancing skill.\n"
        "Return a normalized_brief and at most three clarifying_questions that unblock the next modeling step.\n\n"
        "## DESIGN GOAL\n"
        f"{user_goal}\n\n"
        "## MANUFACTURING ASSUMPTIONS\n"
        f"{assumptions_text}\n\n"
        "## MODEL CONTEXT\n"
        f"Active model path : {meta.get('active_model_path', '') or '<none>'}\n"
        f"Feature target refs: {meta.get('feature_target_text', '') or '<none>'}\n"
        f"Reference corpus   : {meta.get('rag_provenance_text', '') or '<none>'}"
        f"{answer_section}\n\n"
        "## OUTPUT CONTRACT\n"
        "normalized_brief: concise paragraph with explicit dimensions/tolerances where known (≥10 chars).\n"
        "questions       : list of up to 3 highest-leverage questions that unblock modeling.\n"
        "  - Return an empty question list when the goal, assumptions, and user answers already provide enough detail to sketch and dimension the next feature.\n"
        "  - Include material/layer-height/nozzle values if missing from assumptions.\n"
        "  - Include critical fit/clearance targets if unspecified.\n"
        "  - Do not ask questions already answered above."
    )

    result = await _run_structured_agent(
        system_prompt=(
            "## ROLE\n"
            "You are a CAD planning assistant applying the Printer-Profile-Tolerancing skill.\n"
            "Normalize goals into manufacturing-ready language with explicit tolerance/clearance "
            "targets (e.g. '0.30 mm mating clearance', '0.2 mm layer height'). "
            "Ask only the highest-leverage questions that unblock the SolidWorks modeling steps. "
            "Always surface material, nozzle size, and orientation constraints when present in the goal. "
            "If the user has already supplied explicit dimensions, wall thickness, fit targets, and feature placement, do not keep restating the same asks; return zero questions instead."
        ),
        user_prompt=prompt,
        result_type=ClarificationResponse,
        model_name=resolved_model,
        local_endpoint=resolved_endpoint,
    )

    if isinstance(result, RecoverableFailure):
        merge_metadata(
            session_id,
            db_path=db_path,
            latest_message=result.explanation,
            clarifying_questions=[],
            latest_error_text=result.explanation,
            remediation_hint=(
                result.remediation_steps[0]
                if result.remediation_steps
                else "Configure provider credentials and retry."
            ),
        )
        insert_tool_call_record(
            session_id=session_id,
            tool_name="ui.request_clarifications",
            input_json=json.dumps({"user_goal": user_goal}, ensure_ascii=True),
            output_json=result.model_dump_json(),
            success=False,
            db_path=db_path,
        )
        return build_dashboard_state(session_id, db_path=db_path)

    metadata = merge_metadata(
        session_id,
        db_path=db_path,
        user_goal=user_goal,
        normalized_brief=result.normalized_brief,
        clarifying_questions=result.questions,
        user_clarification_answer=user_answer,
        latest_message="Generated clarifying questions from GitHub Copilot.",
        latest_error_text="",
        remediation_hint="",
    )
    insert_tool_call_record(
        session_id=session_id,
        tool_name="ui.request_clarifications",
        input_json=json.dumps({"user_goal": user_goal}, ensure_ascii=True),
        output_json=result.model_dump_json(),
        success=True,
        db_path=db_path,
    )
    insert_evidence_link(
        session_id=session_id,
        source_type="llm",
        source_id="clarification_response",
        relevance_score=0.9,
        rationale="Normalized brief and follow-up questions from GitHub Copilot.",
        payload_json=json.dumps(metadata, ensure_ascii=True),
        db_path=db_path,
    )
    return build_dashboard_state(session_id, db_path=db_path)

select_workflow_mode

select_workflow_mode(session_id: str, *, workflow_mode: str, db_path: Path | None = None) -> dict[str, Any]

Persist the onboarding workflow branch and reset new-design state when switching.

Selecting "new_design" resets all model-attachment, preview, and clarification state so the user starts with a blank slate for the new part.

Parameters:

Name Type Description Default
session_id str

Dashboard session identifier.

required
workflow_mode str

Target mode ("edit_existing" or "new_design").

required
db_path Path | None

Optional override for the SQLite database path.

None

Returns:

Type Description
dict[str, Any]

Full dashboard state payload.

Source code in src/solidworks_mcp/ui/services/session_service.py
def select_workflow_mode(
    session_id: str,
    *,
    workflow_mode: str,
    db_path: Path | None = None,
) -> dict[str, Any]:
    """Persist the onboarding workflow branch and reset new-design state when switching.

    Selecting ``"new_design"`` resets all model-attachment, preview, and clarification
    state so the user starts with a blank slate for the new part.

    Args:
        session_id: Dashboard session identifier.
        workflow_mode: Target mode (``"edit_existing"`` or ``"new_design"``).
        db_path: Optional override for the SQLite database path.

    Returns:
        Full dashboard state payload.
    """
    session_row = ensure_dashboard_session(session_id, db_path=db_path)
    normalized_mode = normalize_workflow_mode(workflow_mode)
    workflow_label, workflow_guidance, _ = workflow_copy(normalized_mode)

    if normalized_mode == "new_design":
        metadata = parse_json_blob(session_row.get("metadata_json"))
        metadata.update(
            {
                "workflow_mode": normalized_mode,
                "active_model_path": "",
                "active_model_status": "No active model connected yet.",
                "active_model_type": "",
                "active_model_configuration": "",
                "feature_target_text": "",
                "feature_target_status": "No grounded feature target selected.",
                "selected_feature_name": "",
                "selected_feature_selector_name": "",
                "preview_viewer_url": "",
                "preview_view_urls": {},
                "preview_status": "No preview captured yet.",
                "preview_stl_ready": False,
                "preview_png_ready": False,
                "clarifying_questions": [],
                "user_clarification_answer": "",
                "proposed_family": "unclassified",
                "family_confidence": "pending",
                "family_evidence": [],
                "family_warnings": [],
                "mocked_tools": [],
                "rag_source_path": "",
                "rag_status": "No retrieval source ingested yet.",
                "rag_index_path": "",
                "rag_chunk_count": 0,
                "rag_provenance_text": "No retrieval provenance available yet.",
                "docs_context_text": "No docs context loaded yet.",
                "notes_text": "",
                "orchestration_status": "Ready.",
                "context_save_status": "",
                "context_load_status": "",
                "latest_message": f"Workflow selected: {workflow_label}.",
                "latest_error_text": "",
                "remediation_hint": "",
                "normalized_brief": "Describe the new part you want to design.",
            }
        )
        upsert_design_session(
            session_id=session_id,
            user_goal="Describe the new part you want to design.",
            source_mode=session_row.get("source_mode") or DEFAULT_SOURCE_MODE,
            accepted_family=None,
            status="inspect",
            current_checkpoint_index=0,
            metadata_json=json.dumps(metadata, ensure_ascii=True),
            db_path=db_path,
        )
        # Reset all checkpoints so Execute Next starts clean for the new design.
        for row in list_plan_checkpoints(session_id, db_path=db_path):
            update_plan_checkpoint(
                int(row.get("id") or 0),
                approved_by_user=False,
                executed=False,
                result_json="",
                db_path=db_path,
            )
    else:
        metadata = merge_metadata(
            session_id,
            db_path=db_path,
            workflow_mode=normalized_mode,
            latest_message=f"Workflow selected: {workflow_label}.",
            latest_error_text="",
            remediation_hint="",
        )

    persist_ui_action(
        session_id,
        tool_name="ui.select_workflow_mode",
        db_path=db_path,
        input_payload={"workflow_mode": normalized_mode},
        output_payload={
            "workflow_mode": normalized_mode,
            "workflow_label": workflow_label,
            "workflow_guidance_text": workflow_guidance,
            "metadata": metadata,
        },
    )
    return build_dashboard_state(session_id, db_path=db_path)

update_ui_preferences

update_ui_preferences(session_id: str, *, assumptions_text: str | None = None, model_provider: str | None = None, model_profile: str | None = None, model_name: str | None = None, local_endpoint: str | None = None, db_path: Path | None = None) -> dict[str, Any]

Persist manufacturing assumptions and LLM provider preferences.

Parameters:

Name Type Description Default
session_id str

Dashboard session identifier.

required
assumptions_text str | None

Free-form manufacturing assumption text.

None
model_provider str | None

LLM provider identifier ("github", "openai", etc.).

None
model_profile str | None

Capability tier ("small", "balanced", "large").

None
model_name str | None

Explicit provider-qualified model name override.

None
local_endpoint str | None

Ollama / local API base URL.

None
db_path Path | None

Optional override for the SQLite database path.

None

Returns:

Type Description
dict[str, Any]

Full dashboard state payload.

Source code in src/solidworks_mcp/ui/services/session_service.py
def update_ui_preferences(
    session_id: str,
    *,
    assumptions_text: str | None = None,
    model_provider: str | None = None,
    model_profile: str | None = None,
    model_name: str | None = None,
    local_endpoint: str | None = None,
    db_path: Path | None = None,
) -> dict[str, Any]:
    """Persist manufacturing assumptions and LLM provider preferences.

    Args:
        session_id: Dashboard session identifier.
        assumptions_text: Free-form manufacturing assumption text.
        model_provider: LLM provider identifier (``"github"``, ``"openai"``, etc.).
        model_profile: Capability tier (``"small"``, ``"balanced"``, ``"large"``).
        model_name: Explicit provider-qualified model name override.
        local_endpoint: Ollama / local API base URL.
        db_path: Optional override for the SQLite database path.

    Returns:
        Full dashboard state payload.
    """
    import os  # local import to keep module-level deps minimal

    ensure_dashboard_session(session_id, db_path=db_path)
    provider = (model_provider or "github").strip().lower()
    profile = (model_profile or "balanced").strip().lower()
    resolved_model = normalize_model_name_for_provider(
        model_name,
        provider=provider,
        profile=profile,
    )
    resolved_endpoint = sanitize_ui_text(
        local_endpoint,
        os.getenv("SOLIDWORKS_UI_LOCAL_ENDPOINT", "http://127.0.0.1:11434/v1"),
    )
    persist_ui_action(
        session_id,
        tool_name="ui.update_preferences",
        db_path=db_path,
        metadata_updates={
            "assumptions_text": sanitize_ui_text(
                assumptions_text,
                "No assumptions provided yet.",
            ),
            "model_provider": provider,
            "model_profile": profile,
            "model_name": resolved_model,
            "local_endpoint": resolved_endpoint,
            "latest_message": "Updated assumptions and model preferences.",
            "latest_error_text": "",
            "remediation_hint": "",
        },
        input_payload={
            "assumptions_text": assumptions_text,
            "model_provider": provider,
            "model_profile": profile,
            "model_name": resolved_model,
            "local_endpoint": resolved_endpoint,
        },
        output_metadata=True,
    )
    return build_dashboard_state(session_id, db_path=db_path)