Project Structure

Project Structure
.
├── config/               # Agent configuration files
├── src/
│   ├── actions/          # Agent outputs/actions/capabilities
│   ├── fuser/            # Input fusion logic
│   ├── inputs/           # Input plugins (e.g. VLM, audio)
│   ├── llm/              # LLM integration
│   ├── providers/        # Providers
│   ├── runtime/          # Core runtime system
│   ├── simulators/       # Virtual endpoints such as `WebSim`
│   ├── zenoh_idl/        # Zenoh's Interface Definition Language (IDL)
│   └── run.py            # CLI entry point

The system is based on a loop that runs at a fixed frequency of self.config.hertz. This loop looks for the most recent data from various sources, fuses the data into a prompt, sends that prompt to one or more LLMs, and then sends the LLM responses to virtual agents or physical robots.

Architecture Overview

Specific runtime flow:

  1. Input plugins collect sensor data (vision, audio, social media, etc.)
  2. The Fuser combines inputs into a prompt
  3. The LLM generates commands based on the prompt
  4. The ActionOrchestrator executes commands through actions
  5. Connectors map OM1 data/commands to external data buses and data distribution systems such as custom APIs, ROS2, Zenoh, or CycloneDDS.
Core Runtime System
# /src/runtime/cortex.py
async def _run_cortex_loop(self) -> None:
    while True:
        await asyncio.sleep(1 / self.config.hertz)
        await self._tick()

async def _tick(self) -> None:
    finished_promises, _ = await self.action_orchestrator.flush_promises()
    prompt = self.fuser.fuse(self.config.agent_inputs, finished_promises)
    output = await self.config.cortex_llm.ask(prompt)
    logging.debug("I'm thinking... ", output)
    await self.action_orchestrator.promise(output.commands)

Code Explanation

The code above defines an asynchronous event loop for a system running a Cortex AI model. It continuously processes input, generates responses, and executes commands.

_run_cortex_loop() – Main Loop

async def _run_cortex_loop(self) -> None:
while True:
    await asyncio.sleep(1 / self.config.hertz)  # Controls loop frequency
    await self._tick()  # Calls the processing function
  • Runs indefinitely, executing _tick() at a rate defined by self.config.hertz (loop frequency).
  • Uses asyncio.sleep() to maintain a steady execution rate.

_tick() – Processing Each Cycle

async def _tick(self) -> None:
finished_promises, _ = await self.action_orchestrator.flush_promises()
  • Flushes completed tasks (finished_promises) from action_orchestrator.
  • Ensures the system processes only pending or new tasks.

Generates a prompt using current inputs.

prompt = self.fuser.fuse(self.config.agent_inputs, finished_promises)
  • Creates a prompt by combining agent_inputs and finished_promises using a fuser (likely a function that merges inputs).

Sends the prompt to the Cortex LLM for processing.

prompt = self.fuser.fuse(self.config.agent_inputs, finished_promises)
  • Sends the fused prompt to the Cortex LLM (Language Model) and awaits a response.
  • The model generates an output based on the input prompt.

Receives and logs the output.

logging.debug("I'm thinking... ", output)
  • Logs the generated output for debugging.

Executes commands generated by the AI.

await self.action_orchestrator.promise(output.commands)

  • Executes the commands from the LLM response by passing them to action_orchestrator.