Worker¶
Worker is the basic unit of computation in OmAgent. It is responsible for executing tasks and generating outputs.
How to define a worker¶
The most basic worker can be created like this:
from omagent_core.engine.worker.base import BaseWorker
from omagent_core.utils.registry import registry
@registry.register_worker()
class MyWorker(BaseWorker):
def _run(self, *args, **kwargs):
# Implement your business logic here
return {"result": "some_value"}
By inheriting from BaseWorker
, you can define your own worker. The worker will be registered with the name of the class. Normally, use @registry.register_worker()
to register the worker so that it can build from configurations. See registry for more details.
1. Parameter Handling¶
You can define typed parameters that are json serializable, and return in key-value format:
@registry.register_worker()
class ParameterWorker(BaseWorker):
def _run(self, name: str, age: int):
# Parameters will be automatically extracted
return {
"message": f"Hello {name}, you are {age} years old"
}
2. Integration¶
You can integrate workers with other libraries to extend the functionality. A most common case is to integrate with LLMs. Here is an example of how:
@registry.register_worker()
class LLMWorker(BaseLLMBackend, BaseWorker)
llm: OpenaiGPTLLM
output_parser: StrParser
prompts: List[PromptTemplate] = Field(
default=[
PromptTemplate.from_template(template="Your prompt here")
]
)
def _run(self, *args, **kwargs):
return self.simple_infer()
3. Configuration Fields¶
You can configure worker behavior using Pydantic Fields to set default values:
@registry.register_worker()
class ConfigurableWorker(BaseWorker):
poll_interval: float = Field(default=100) # Polling interval in milliseconds
domain: str = Field(default=None) # Workflow domain
concurrency: int = Field(default=5) # Concurrency level
Note: do not use alias
in the field definition.
4. Async Support¶
Workers can be asynchronous:
@registry.register_worker()
class AsyncWorker(BaseWorker):
async def _run(self, *args, **kwargs):
async def count_task(i):
await asyncio.sleep(1)
print(f'Task {i} completed!')
return i
tasks = [count_task(i) for i in range(10)]
results = await asyncio.gather(*tasks)
return {"result": "async operation completed"}
Configuration and build¶
Workers can be configured and built from YAML or JSON configuration files. You not only can set the parameters, but the recursive dependencies.
1. Worker Configuration Structure¶
Here's the basic structure:
name: LLMWorker
llm:
name: OpenaiGPTLLM
model_id: gpt-4o
api_key: sk-proj-...
endpoint: https://api.openai.com/v1
temperature: 0
vision: true
output_parser:
name: StrParser
2. Submodule Substitution¶
You can use the ${sub|module_name} to substitute submodules. This is useful when you want to reuse the same submodule in different workers and also keep the configuration clean. The module_name should be the name of the submodule configuration file.
For example, you can define the llm_worker.yaml as follows:
name: LLMWorker
llm: ${sub|gpt}
output_parser:
name: StrParser
And define the gpt.yaml as follows:
name: OpenaiGPTLLM
model_id: gpt-4o
api_key: sk-proj-...
endpoint: https://api.openai.com/v1
temperature: 0
vision: true
This is equivalent to the previous LLMWorker example.
Note: Do not use alias
in the field definition. Do not create Circular reference.
3. Environment Variables¶
You can use the ${env|env_name, default_value} to substitute environment variables. This is useful when you want to set the parameters dynamically. The env_name should be the name of the environment variable. default_value is optional, and will be used when the environment variable is not set. For example, you can define the gpt.yaml as follows:
name: OpenaiGPTLLM
model_id: gpt-4o
api_key: ${env| CUSTOM_OPENAI_KEY}
endpoint: ${env| CUSTOM_OPENAI_ENDPOINT, https://api.openai.com/v1}
temperature: 0
vision: true
The environment variable name is case-sensitive.
4. Default Configuration Fields¶
Workers have several default configuration fields that can be set:
- component_stm: The STM component for the worker. Use any registered component name. Default is the one registered with register_stm
. Access it via self.stm
. See container and memory for more details.
- component_ltm: The LTM component for the worker. Use any registered component name. Default is the one registered with register_ltm
. Access it via self.ltm
. See container and memory for more details.
- component_callback: The callback component for the worker. Use any registered component name. Default is the one registered with register_callback
. Access it via self.callback
. See container and client for more details.
- component_input: The input component for the worker. Use any registered component name. Default is the one registered with register_input
. Access it via self.input
. See container and client for more details.
- poll_interval: The poll interval for the worker. Default is 100 milliseconds.
- domain: The domain of the workflow. Default is None.
- concurrency: The concurrency of the worker. Default is 5.
5. Build from Configurations¶
The worker instances can be built from configurations by using the build_from_file
function from omagent_core.utils.build. Here's how it works:
from omagent_core.utils.build import build_from_file
# Load worker configs from a directory
worker_config = build_from_file('path/to/config/directory')
Note: You must provide a workers
directory in the configuration path which contains all configurations for the workers.
Run workers¶
OmAgent provides a TaskHandler class to manage worker instance creation and management. Here's how to use TaskHandler:
from omagent_core.engine.automator.task_handler import TaskHandler
task_handler = TaskHandler(worker_config=worker_config, workers=[MyWorker()])
task_handler.start_processes()
task_handler.stop_processes()
The worker_config
parameter accepts a set of worker configurations and launches the corresponding number of processes based on each worker's concurrency attribute value.
You can also use the workers
parameter to directly pass in instantiated worker objects. Instances of these workers are deepcopied based on the concurrency setting. If your worker instances contain objects that cannot be deepcopied, set the instance's concurrency property to 1 and actively expand the concurrency count in the workers list.
Then, use start_processes
to start all workers and stop_processes
to stop all workers.
Important Notes¶
- Always use the @registry.register_worker() decorator to register the worker
- The
_run
method is mandatory and contains your core logic - Return values should be a dictionary with serializable values
- Worker behavior can be configured using Fields
- Both synchronous and asynchronous operations are supported
- The
self.workflow_instance_id
is automatically available in the worker context
Best Practices¶
- Keep workers focused on a single responsibility
- Use proper type hints for better code clarity
- Implement proper error handling
- Document your worker's expected inputs and outputs
- Use configuration fields for flexible behavior
- Consider using async operations for I/O-bound tasks