Milestone M3: Scheduler¶
EARS Tasks and Git Workflow¶
Branch name: feature/m3-scheduler
EARS Loop¶
- Explore: review readiness signals, priority bands, backpressure cooperation
- Analyze: design runnable queues, tick cadence, and BLOCKED handling
- Implement:
scheduler.pyrun loop,runtime_plan, mutators - Specify checks: unit/integration tests for fairness, priorities, shutdown, backpressure
- Commit after each major step
Git Commands¶
Bash
git checkout -b feature/m3-scheduler
git add -A && git commit -m "feat(scheduler): add Scheduler skeleton and registration"
git add -A && git commit -m "feat(scheduler): implement runnable queues and priority bands"
git add -A && git commit -m "feat(scheduler): cooperative loop with tick cadence and backpressure"
git add -A && git commit -m "feat(scheduler): runtime mutators and graceful shutdown"
git add -A && git commit -m "test(scheduler): fairness, priority bias, shutdown semantics"
git push -u origin feature/m3-scheduler
Overview¶
Implement the cooperative scheduler that drives node lifecycle and message delivery across the graph. The scheduler evaluates node readiness, drains edges with priority bias for control-plane traffic, and invokes on_message/on_tick and start/stop hooks. It integrates non-blocking backpressure semantics introduced in M2 and readies the runtime for observability wiring in M4.
EARS Requirements¶
- The system shall provide a Scheduler with public APIs:
register(unit: Node | Subgraph)-> Nonerun()-> Noneshutdown()-> Noneset_priority(edge_id: str, priority: int)-> Noneset_capacity(edge_id: str, capacity: int)-> None
- When the scheduler starts, the system shall invoke
on_startfor all nodes exactly once. - When a message is available on an input edge for a node, the system shall invoke
on_message(port, msg)respecting type/port contracts. - When the tick cadence elapses, the system shall invoke
on_tickon nodes that are runnable. - If a shutdown is requested, the system shall gracefully stop the graph, invoking
on_stopon each node, and handle draining/flushing of edges according to policy. - The system shall prioritize control-plane edges over data-plane edges where priorities are configured.
- While running, the scheduler shall maintain fairness to avoid starvation across runnable nodes.
- If a node raises an exception in a lifecycle callback, the system shall capture it and surface through error channels (logs/metrics in M4), and apply node error policy (retry/skip/stop stubbed for now).
- The system shall support cooperative backpressure: if an output edge signals BLOCKED, the scheduler shall avoid busy-waiting and reschedule producer/consumer appropriately.
- The system shall remain single-process and asyncio-friendly without requiring async to use it (adapter layer can be added later).
Deliverables¶
src/meridian/core/scheduler.py- Scheduler class and run loop [DONE]
- Registration of graphs/subgraphs with expansion into an internal plan (nodes, edges, ports) [DONE]
- Runnable queues and priority biasing [DONE]
- Tick cadence control [DONE]
- Graceful shutdown and stop semantics [DONE]
- Runtime mutators:
set_priority,set_capacity(with validation and safe application) [DONE] - Backpressure-aware emit routing glue between
Node.emitandEdge.put/try_put[DONE]
src/meridian/core/runtime_plan.py(optional small helper)- Flatten Subgraph into execution plan: topo order, node/edge indices, port maps [DONE - integrated into scheduler]
- Deterministic IDs for edges and addressable endpoints [DONE]
- Minimal integration hooks
- Metrics/tracing placeholders (no-op) called at key points (loop iteration start/end, callbacks, enqueue/dequeue) [DONE]
- Tests (see Testing Strategy) [DONE - comprehensive unit tests]
Scheduling Model¶
-
Cooperative loop: [DONE]
- Maintain two primary readiness signals: [DONE]
- Message-ready: input queues non-empty for a node [DONE]
- Tick-ready: elapsed tick interval for a node (global cadence with per-node hints) [DONE]
- Maintain a runnable structure keyed by
(priority_class, node_id)with round-robin within class [DONE]
- Maintain two primary readiness signals: [DONE]
-
Priorities: [DONE]
- Edge priority integer; map to bands (e.g., control-plane > high > normal) [DONE]
- Node effective class derived from its highest-priority ready input; if none, normal [DONE]
- Bias: service higher bands more frequently with a simple ratio (e.g., 4:2:1) [DONE]
-
Fairness: [DONE]
- Within a band, nodes are scheduled round-robin to avoid starvation [DONE]
- Limit per-iteration work with a budget (e.g., max messages per node per turn) to bound tail latency [DONE]
-
Backpressure handling: [DONE]
Node.emit→Edge.putreturns PutResult: OK, BLOCKED, DROPPED, COALESCED [DONE]- On BLOCKED: mark producing node as yield-required; give consumers a chance; requeue producer later [DONE]
- Avoid busy waits by tracking edges that caused BLOCKED and revisiting when downstream consumption occurs [DONE]
Lifecycle and Semantics¶
-
start():
- Expand subgraphs, allocate runtime plan
- Initialize nodes:
on_startin topo order (producers first); catch exceptions - Prime tick timers
-
run(): [DONE]
- Loop until shutdown flag set and all outstanding work quiesces (subject to policy) [DONE]
-
Each iteration:
- Pull next node from runnable queues according to priority bands and fairness [DONE]
- If message-ready: pop one (or small batch) from input edges and invoke
on_message[DONE] - Else if tick-ready: invoke
on_tick[DONE] - Track work done; update metrics intents [DONE]
-
Idle strategy: brief sleep/yield when no work (configurable) [DONE]
-
shutdown():
- Stop accepting new external inputs (if applicable)
- Drain policy:
- block: attempt graceful draining until timeout; then stop
- drop/latest/coalesce: allow loop to quiesce; then stop
on_stopin reverse topo order (consumers first)
Public APIs¶
register(unit: Node | Subgraph)-> None [DONE]- Accept a Node or Subgraph; multiple calls allowed before
run()[DONE] - Validate names and conflicts across registrations [DONE]
- Accept a Node or Subgraph; multiple calls allowed before
run()-> None [DONE]- Blocking call that runs until shutdown requested [DONE]
- Optional parameters via constructor:
tick_interval,max_batch_per_node,idle_sleep_ms[DONE]
shutdown()-> None [DONE]- Signals loop to exit;
run()returns after graceful stop [DONE]
- Signals loop to exit;
set_priority(edge_id: str, priority: int)-> None [DONE]- Validate edge exists; adjust band assignment atomically [DONE]
set_capacity(edge_id: str, capacity: int)-> None [DONE]- Validate and apply to the underlying Edge safely (may require temporary lock) [DONE]
Configuration¶
- SchedulerConfig
tick_interval_ms: default tick cadence baseline (e.g., 50–100ms)fairness_ratio: e.g., {control: 4, high: 2, normal: 1}max_batch_per_node: to bound per-iteration workidle_sleep_ms: backoff when no runnable nodesshutdown_timeout_s: soft limit for drain on shutdown
- Node hints (optional):
desired_tick_interval_mspriority_overridefor special nodes (discouraged; use edge priorities instead)
Data Structures¶
- Ready queues:
- Per-band deque of
node_ids - Map
node_id-> ReadyState {message_ready,tick_ready,blocked_edges}
- Per-band deque of
- Edge registry:
edge_id-> EdgeRef {capacity,policy,priority_band,src,dst}
- Node registry:
node_id-> NodeRef {instance,inputs,outputs,tick_timer,error_policy}
- Port map:
(node_id, port_name)->edge_idfor inputs/outputs
- Message buffers:
- Per-input-port pending deque (reads from Edge, small batch caching)
Error Handling¶
- Catch exceptions from hooks:
on_message/on_tick/on_start/on_stopwrapped with try/except- Record error intent; apply node error policy (default: skip and continue; configurable later)
- Avoid bringing down the scheduler unless policy demands
- Misconfiguration at runtime mutators:
set_priority/set_capacityvalidate and raise ValueError with context
- Backpressure and drops:
- Respect PutResult; never spin; ensure progress via consumer scheduling
Testing Strategy¶
Unit Tests¶
- Ready queues and fairness:
- Round-robin servicing within band; ratio across bands respected
- Priority bias:
- Control-plane inputs preempt data-plane consistently
- Tick cadence:
- Tick-ready nodes get scheduled at configured intervals
- Backpressure flow:
- Producer with BLOCKED result yields; consumer runs; producer resumes successfully
- Shutdown semantics:
- Graceful stop invokes
on_stopin reverse topo; drain behavior matches policy
- Graceful stop invokes
- Runtime mutators:
set_prioritychanges servicing band immediatelyset_capacityapplied, validated, and reflected in Edge
Integration Tests¶
- Producer → Edge (block) → Slow Consumer:
- Observe producer backpressure and steady-state throughput
- Control-plane kill switch:
- Control edge triggers immediate
on_messageeven under heavy data-plane load
- Control edge triggers immediate
- Mixed policies:
- latest/coalesce edges behave under burst; end-to-end message correctness
- Error propagation:
- Node raising in
on_messagedoesn't crash loop; error counted and flow continues
- Node raising in
Stress and Reliability¶
- High-throughput graph (multiple producers/consumers) to validate scheduler loop latency bounds
- Starvation avoidance under varied ready patterns
- Long-running test (30–60 min) without memory growth
Performance Guidance¶
- Minimize allocations in loop; pre-allocate small objects where possible
- Prefer integer
node_ids/edge_idsand array-backed structures for hot paths - Bound per-iteration work to keep tail latency predictable
- Use simple timing (monotonic) and avoid per-iteration system time calls when possible
Acceptance Criteria¶
- Scheduler supports run, shutdown, registration of at least one Subgraph
- Priority bias demonstrably preempts data-plane under load
- Backpressure semantics validated with block policy
- Tick cadence operates within tolerance (±1 tick interval under light load)
- All unit/integration tests for scheduler pass; coverage for scheduler ≥90%
- Public API conforms to the draft (README/report), or deviations documented
Risks and Mitigations¶
- Over-engineering scheduling policy:
- Mitigation: Keep bias ratios simple; no complex RT algorithms in v1
- Busy waiting under backpressure:
- Mitigation: Explicit BLOCKED handling and consumer-first scheduling
- Priority inversion:
- Mitigation: Band-based ready queues; derive node band from highest-priority ready input
- Complexity in subgraph expansion:
- Mitigation:
runtime_planhelper with deterministic IDs and small surface area
- Mitigation:
- Shutdown hangs:
- Mitigation: Shutdown timeout; policy-aware drain; robust stop ordering
Out of Scope (Deferred)¶
- Async/await-native scheduler APIs (adapter can be added post-v1)
- Multi-process or distributed scheduling
- Advanced error policies (retries with jitter, DLQ routing) beyond placeholders
- Full observability (wired in M4)
Traceability¶
- Implements Technical Blueprint Implementation Plan M3.
- Fulfills EARS requirements for lifecycle orchestration, priorities, backpressure cooperation, fairness, and graceful shutdown.
Checklist¶
-
scheduler.py: run loop, readiness detection, lifecycle calls - Priority bands and fairness ratio implemented
- Backpressure-aware emit routing
- Shutdown semantics with reverse topo
on_stop - Runtime mutators (priority, capacity) with validation
- Unit/integration tests and coverage ≥90% for scheduler