Skip to main content
The Rig object is injected into every test function by the framework. Test code never constructs a Rig — it only receives one.
async def test_brake_pressure_response(rig: Rig) -> None:
    await rig.can.send(message="BrakeDemand", fields={"Value": 80.0})
    result = await rig.can.expect(
        signal="BrakeStatus.Pressure",
        condition=lambda v: v >= 75.0,
        timeout=2.0,
    )
    assert result.passed, result.fail_msg

Signal naming

All signals are identified as "MessageName.SignalName" — the exact names from the DBC file. This convention works identically for CAN and Ethernet signals.
# CAN signal
await rig.can.expect(signal="EngineData.RPM", ...)

# Ethernet signal (SOME/IP event)
await rig.can.expect(signal="BrakeDemand.Value", ...)
No interface names, no bus numbers, no backend-specific strings in test code.

rig.can — CAN bus API

rig.can.send()

Send a DBC-encoded message by name.
await rig.can.send(
    message="EngineControl",
    fields={"throttle": 50, "gear": 3},
)
Parameters:
ParamTypeDescription
messagestrDBC message name
fieldsdict[str, Any]Signal name → physical value
interfacestr | NoneCAN interface key from TOML. Defaults to primary.
Raises: BlockedError if DBC is not loaded or message is unknown.

rig.can.send_raw()

Send a raw CAN frame without DBC encoding.
await rig.can.send_raw(arb_id=0x100, data=bytes([0x01, 0x02, 0x03, 0x04]))

rig.can.expect()

Wait for a signal to satisfy a condition.
result = await rig.can.expect(
    signal="EngineData.RPM",
    condition=lambda v: v > 800,
    timeout=2.0,
)
assert result.passed, result.fail_msg
Parameters:
ParamTypeDescription
signalstr"MessageName.SignalName"
conditionCallable[[float], bool]Lambda returning True when the assertion is satisfied
timeoutfloatSeconds before the assertion fails
Returns: ExpectResult — see Result model below.

rig.can.receive()

Wait for a specific CAN frame by arbitration ID.
frame = await rig.can.receive(arb_id=0x100, timeout=1.0)
print(frame.fields)  # decoded signal values if DBC is loaded
Raises: CANTimeoutError if no matching frame arrives within the timeout.

rig.can.bus_load()

Measure current CAN bus load as a percentage.
load = await rig.can.bus_load(window=1.0)  # measure over 1 second
print(f"Bus load: {load.percent:.1f}%")

rig.sim — Bus Simulation Engine control

The rig.sim namespace controls the virtual Bus Simulation Engine (BSE). It is useful when testing the ECU’s response to simulated bus traffic.

rig.sim.set()

Set a signal value in the simulation. Takes effect on the next scheduled frame.
await rig.sim.set("VehicleSpeed.Speed", 120.0)

rig.sim.start_scheduling()

Start periodic transmission of a DBC message.
await rig.sim.start_scheduling("EngineData")  # message name

rig.sim.stop_scheduling()

Stop periodic transmission of a message.
await rig.sim.stop_scheduling("EngineData")

rig.sim.override() (context manager)

Temporarily override a signal value. Always restores the original value, even on exception (Rule R8).
async with rig.sim.override("EngineData.RPM", 0.0):
    # Signal is forced to 0.0 inside this block
    result = await rig.can.expect(
        signal="EngineData.RPM",
        condition=lambda v: v == 0.0,
        timeout=0.5,
    )
    assert result.passed

# RPM is restored to its pre-override value here

rig.sim.status()

Get current simulation state — which messages are active, signal count.
status = await rig.sim.status()
print(f"Active messages: {status.active_count}")

rig.fault — Fault injection

Fault methods return FaultDescriptor objects — they do not execute anything. The fault is activated only when passed to rig.fault.inject().
Fault methods return descriptors, never coroutines. Do not await them directly. Always use async with rig.fault.inject(...).
# CORRECT
async with rig.fault.inject(rig.fault.can_dropout(arb_id=0x100, duration=2.0)):
    await asyncio.sleep(2.0)
# Fault removed here

# WRONG — can_dropout() does not return a coroutine
await rig.fault.can_dropout(arb_id=0x100, duration=2.0)  # TypeError

rig.fault.inject() (context manager)

Activate a fault for the duration of the async with block. Always cleans up on exit, even on exception.
async with rig.fault.inject(descriptor):
    # Fault is active here
    ...
# Fault is removed here

rig.fault.can_dropout()

Suppress TX and RX of a specific CAN arbitration ID.
async with rig.fault.inject(
    rig.fault.can_dropout(arb_id=0x200, duration=2.0)
):
    result = await rig.can.expect(
        signal="EngineData.RPM",
        condition=lambda v: v == 0.0,
        timeout=2.5,
    )
    assert result.passed, result.fail_msg
ParamTypeDescription
arb_idintCAN arbitration ID to suppress
durationfloatDuration in seconds
interfacestr | NoneCAN interface key. Defaults to primary.

rig.fault.can_noise()

Inject random bit errors into a specific CAN message.
async with rig.fault.inject(
    rig.fault.can_noise(arb_id=0x100, bit_error_rate=0.05)
):
    await asyncio.sleep(1.0)

rig.fault.power_cycle()

Power-cycle a rail — off for off_duration seconds, then back on.
async with rig.fault.inject(
    rig.fault.power_cycle(rail="ecu_main", off_duration=0.5)
):
    # ECU is off during this block
    await asyncio.sleep(1.0)
# ECU is back on here

rig.fault.gpio_stuck()

Hold a GPIO output pin at a fixed value.
async with rig.fault.inject(
    rig.fault.gpio_stuck(pin="ignition_enable", value=False, duration=1.0)
):
    await asyncio.sleep(1.0)

rig.ecu — ECU management

The rig.ecu namespace provides access to ECUs defined in [rig.ecus.*] in the TOML.
# Access an ECU by its TOML key
ecu = rig.ecu["engine_ecu"]

await ecu.power_on()                    # turn on the power rail
await ecu.wait_for_boot(timeout=8.0)    # wait for DoIP routing activation
await ecu.power_off()                   # turn off the power rail

Result model

ExpectResult

Returned by rig.can.expect().
result = await rig.can.expect(signal="EngineData.RPM", condition=lambda v: v > 800, timeout=2.0)

result.passed      # bool — True if condition was satisfied within timeout
result.fail_msg    # str — human-readable failure description (empty string if passed)
result.value       # float | None — the signal value that satisfied the condition (or last value)
result.elapsed     # float — seconds elapsed from call to resolution
Always use assert result.passed, result.fail_msg — this gives useful output in test reports:
# Good — error message is surfaced in the report
assert result.passed, result.fail_msg

# Bad — no useful error message
assert result.passed

BlockedError vs fail

CruciHiL distinguishes between rig problems and firmware problems:
ConditionWhat to raiseStatusCounts against pass rate?
Firmware assertion failedassert result.passed, result.fail_msgfailYes
Rig precondition not metraise BlockedError("message")blockedNo
Use BlockedError for infrastructure problems — missing hardware, DBC not loaded, backend not connected. These should never count as firmware regressions.
from crucihil.hal.models.exceptions import BlockedError

async def test_engine_startup(rig: Rig) -> None:
    if not rig.config.definitions.can_dbc:
        raise BlockedError("No DBC loaded — cannot run signal assertions")

    result = await rig.can.expect(
        signal="EngineData.RPM",
        condition=lambda v: v > 800,
        timeout=2.0,
    )
    assert result.passed, result.fail_msg

Import reference

from crucihil.hal.rig import Rig
from crucihil.hal.models.exceptions import BlockedError, CANTimeoutError
from crucihil.hal.models.results import ExpectResult, FaultDescriptor

Full test example

"""Engine validation test functions."""

from __future__ import annotations

import asyncio

from crucihil.hal.rig import Rig
from crucihil.hal.models.exceptions import BlockedError


async def test_engine_startup(rig: Rig, expected_rpm: float = 800.0, startup_timeout: float = 2.0) -> None:
    """ECU reaches idle RPM within startup_timeout seconds of ignition."""
    result = await rig.can.expect(
        signal="EngineData.RPM",
        condition=lambda v: v > expected_rpm,
        timeout=startup_timeout,
    )
    assert result.passed, result.fail_msg


async def test_can_heartbeat(rig: Rig) -> None:
    """EngineData frames are present on the CAN bus."""
    result = await rig.can.expect(
        signal="EngineData.RPM",
        condition=lambda v: v is not None,
        timeout=1.0,
    )
    assert result.passed, result.fail_msg


async def test_can_dropout_recovery(rig: Rig) -> None:
    """ECU recovers within 500ms after EngineData dropout."""
    async with rig.fault.inject(
        rig.fault.can_dropout(arb_id=0x100, duration=2.0)
    ):
        await asyncio.sleep(2.0)

    # ECU should recover within 500ms of bus restoration
    result = await rig.can.expect(
        signal="EngineData.RPM",
        condition=lambda v: v > 0,
        timeout=0.5,
    )
    assert result.passed, result.fail_msg


async def test_sim_override(rig: Rig) -> None:
    """sim.override restores signal on exit."""
    original_rpm = 1500.0
    await rig.sim.set("EngineData.RPM", original_rpm)

    async with rig.sim.override("EngineData.RPM", 0.0):
        result = await rig.can.expect(
            signal="EngineData.RPM",
            condition=lambda v: v == 0.0,
            timeout=0.5,
        )
        assert result.passed, result.fail_msg

    # Original value should be restored
    result = await rig.can.expect(
        signal="EngineData.RPM",
        condition=lambda v: v == original_rpm,
        timeout=0.5,
    )
    assert result.passed, result.fail_msg

See also