The Rig object is injected into every test function by the framework. Test code never constructs a Rig — it only receives one.
async def test_brake_pressure_response(rig: Rig) -> None:
await rig.can.send(message="BrakeDemand", fields={"Value": 80.0})
result = await rig.can.expect(
signal="BrakeStatus.Pressure",
condition=lambda v: v >= 75.0,
timeout=2.0,
)
assert result.passed, result.fail_msg
Signal naming
All signals are identified as "MessageName.SignalName" — the exact names from the DBC file. This convention works identically for CAN and Ethernet signals.
# CAN signal
await rig.can.expect(signal="EngineData.RPM", ...)
# Ethernet signal (SOME/IP event)
await rig.can.expect(signal="BrakeDemand.Value", ...)
No interface names, no bus numbers, no backend-specific strings in test code.
rig.can — CAN bus API
rig.can.send()
Send a DBC-encoded message by name.
await rig.can.send(
message="EngineControl",
fields={"throttle": 50, "gear": 3},
)
Parameters:
| Param | Type | Description |
|---|
message | str | DBC message name |
fields | dict[str, Any] | Signal name → physical value |
interface | str | None | CAN interface key from TOML. Defaults to primary. |
Raises: BlockedError if DBC is not loaded or message is unknown.
rig.can.send_raw()
Send a raw CAN frame without DBC encoding.
await rig.can.send_raw(arb_id=0x100, data=bytes([0x01, 0x02, 0x03, 0x04]))
rig.can.expect()
Wait for a signal to satisfy a condition.
result = await rig.can.expect(
signal="EngineData.RPM",
condition=lambda v: v > 800,
timeout=2.0,
)
assert result.passed, result.fail_msg
Parameters:
| Param | Type | Description |
|---|
signal | str | "MessageName.SignalName" |
condition | Callable[[float], bool] | Lambda returning True when the assertion is satisfied |
timeout | float | Seconds before the assertion fails |
Returns: ExpectResult — see Result model below.
rig.can.receive()
Wait for a specific CAN frame by arbitration ID.
frame = await rig.can.receive(arb_id=0x100, timeout=1.0)
print(frame.fields) # decoded signal values if DBC is loaded
Raises: CANTimeoutError if no matching frame arrives within the timeout.
rig.can.bus_load()
Measure current CAN bus load as a percentage.
load = await rig.can.bus_load(window=1.0) # measure over 1 second
print(f"Bus load: {load.percent:.1f}%")
rig.sim — Bus Simulation Engine control
The rig.sim namespace controls the virtual Bus Simulation Engine (BSE). It is useful when testing the ECU’s response to simulated bus traffic.
rig.sim.set()
Set a signal value in the simulation. Takes effect on the next scheduled frame.
await rig.sim.set("VehicleSpeed.Speed", 120.0)
rig.sim.start_scheduling()
Start periodic transmission of a DBC message.
await rig.sim.start_scheduling("EngineData") # message name
rig.sim.stop_scheduling()
Stop periodic transmission of a message.
await rig.sim.stop_scheduling("EngineData")
rig.sim.override() (context manager)
Temporarily override a signal value. Always restores the original value, even on exception (Rule R8).
async with rig.sim.override("EngineData.RPM", 0.0):
# Signal is forced to 0.0 inside this block
result = await rig.can.expect(
signal="EngineData.RPM",
condition=lambda v: v == 0.0,
timeout=0.5,
)
assert result.passed
# RPM is restored to its pre-override value here
rig.sim.status()
Get current simulation state — which messages are active, signal count.
status = await rig.sim.status()
print(f"Active messages: {status.active_count}")
rig.fault — Fault injection
Fault methods return FaultDescriptor objects — they do not execute anything. The fault is activated only when passed to rig.fault.inject().
Fault methods return descriptors, never coroutines. Do not await them directly. Always use async with rig.fault.inject(...).
# CORRECT
async with rig.fault.inject(rig.fault.can_dropout(arb_id=0x100, duration=2.0)):
await asyncio.sleep(2.0)
# Fault removed here
# WRONG — can_dropout() does not return a coroutine
await rig.fault.can_dropout(arb_id=0x100, duration=2.0) # TypeError
rig.fault.inject() (context manager)
Activate a fault for the duration of the async with block. Always cleans up on exit, even on exception.
async with rig.fault.inject(descriptor):
# Fault is active here
...
# Fault is removed here
rig.fault.can_dropout()
Suppress TX and RX of a specific CAN arbitration ID.
async with rig.fault.inject(
rig.fault.can_dropout(arb_id=0x200, duration=2.0)
):
result = await rig.can.expect(
signal="EngineData.RPM",
condition=lambda v: v == 0.0,
timeout=2.5,
)
assert result.passed, result.fail_msg
| Param | Type | Description |
|---|
arb_id | int | CAN arbitration ID to suppress |
duration | float | Duration in seconds |
interface | str | None | CAN interface key. Defaults to primary. |
rig.fault.can_noise()
Inject random bit errors into a specific CAN message.
async with rig.fault.inject(
rig.fault.can_noise(arb_id=0x100, bit_error_rate=0.05)
):
await asyncio.sleep(1.0)
rig.fault.power_cycle()
Power-cycle a rail — off for off_duration seconds, then back on.
async with rig.fault.inject(
rig.fault.power_cycle(rail="ecu_main", off_duration=0.5)
):
# ECU is off during this block
await asyncio.sleep(1.0)
# ECU is back on here
rig.fault.gpio_stuck()
Hold a GPIO output pin at a fixed value.
async with rig.fault.inject(
rig.fault.gpio_stuck(pin="ignition_enable", value=False, duration=1.0)
):
await asyncio.sleep(1.0)
rig.ecu — ECU management
The rig.ecu namespace provides access to ECUs defined in [rig.ecus.*] in the TOML.
# Access an ECU by its TOML key
ecu = rig.ecu["engine_ecu"]
await ecu.power_on() # turn on the power rail
await ecu.wait_for_boot(timeout=8.0) # wait for DoIP routing activation
await ecu.power_off() # turn off the power rail
Result model
ExpectResult
Returned by rig.can.expect().
result = await rig.can.expect(signal="EngineData.RPM", condition=lambda v: v > 800, timeout=2.0)
result.passed # bool — True if condition was satisfied within timeout
result.fail_msg # str — human-readable failure description (empty string if passed)
result.value # float | None — the signal value that satisfied the condition (or last value)
result.elapsed # float — seconds elapsed from call to resolution
Always use assert result.passed, result.fail_msg — this gives useful output in test reports:
# Good — error message is surfaced in the report
assert result.passed, result.fail_msg
# Bad — no useful error message
assert result.passed
BlockedError vs fail
CruciHiL distinguishes between rig problems and firmware problems:
| Condition | What to raise | Status | Counts against pass rate? |
|---|
| Firmware assertion failed | assert result.passed, result.fail_msg | fail | Yes |
| Rig precondition not met | raise BlockedError("message") | blocked | No |
Use BlockedError for infrastructure problems — missing hardware, DBC not loaded, backend not connected. These should never count as firmware regressions.
from crucihil.hal.models.exceptions import BlockedError
async def test_engine_startup(rig: Rig) -> None:
if not rig.config.definitions.can_dbc:
raise BlockedError("No DBC loaded — cannot run signal assertions")
result = await rig.can.expect(
signal="EngineData.RPM",
condition=lambda v: v > 800,
timeout=2.0,
)
assert result.passed, result.fail_msg
Import reference
from crucihil.hal.rig import Rig
from crucihil.hal.models.exceptions import BlockedError, CANTimeoutError
from crucihil.hal.models.results import ExpectResult, FaultDescriptor
Full test example
"""Engine validation test functions."""
from __future__ import annotations
import asyncio
from crucihil.hal.rig import Rig
from crucihil.hal.models.exceptions import BlockedError
async def test_engine_startup(rig: Rig, expected_rpm: float = 800.0, startup_timeout: float = 2.0) -> None:
"""ECU reaches idle RPM within startup_timeout seconds of ignition."""
result = await rig.can.expect(
signal="EngineData.RPM",
condition=lambda v: v > expected_rpm,
timeout=startup_timeout,
)
assert result.passed, result.fail_msg
async def test_can_heartbeat(rig: Rig) -> None:
"""EngineData frames are present on the CAN bus."""
result = await rig.can.expect(
signal="EngineData.RPM",
condition=lambda v: v is not None,
timeout=1.0,
)
assert result.passed, result.fail_msg
async def test_can_dropout_recovery(rig: Rig) -> None:
"""ECU recovers within 500ms after EngineData dropout."""
async with rig.fault.inject(
rig.fault.can_dropout(arb_id=0x100, duration=2.0)
):
await asyncio.sleep(2.0)
# ECU should recover within 500ms of bus restoration
result = await rig.can.expect(
signal="EngineData.RPM",
condition=lambda v: v > 0,
timeout=0.5,
)
assert result.passed, result.fail_msg
async def test_sim_override(rig: Rig) -> None:
"""sim.override restores signal on exit."""
original_rpm = 1500.0
await rig.sim.set("EngineData.RPM", original_rpm)
async with rig.sim.override("EngineData.RPM", 0.0):
result = await rig.can.expect(
signal="EngineData.RPM",
condition=lambda v: v == 0.0,
timeout=0.5,
)
assert result.passed, result.fail_msg
# Original value should be restored
result = await rig.can.expect(
signal="EngineData.RPM",
condition=lambda v: v == original_rpm,
timeout=0.5,
)
assert result.passed, result.fail_msg
See also