Functional
scatter_gather
Sends different sets of arguments/kwargs to a list of modules and collects the responses.
Each callable in to_send receives the positional arguments of
the corresponding tuple in args_list and the named arguments
of the corresponding dict in kwargs_list. If args_list or
kwargs_list are not provided (or are None), the corresponding
callables will be called without positional or named arguments,
respectively, unless an empty list ([]) or empty tuple (())
is provided for a specific item.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
to_send
|
List[Callable]
|
List of callable objects (e.g. functions or |
required |
args_list
|
Optional[List[Tuple[Any, ...]]]
|
Each tuple contains the positional argumentsvfor the corresponding callable
in |
None
|
kwargs_list
|
Optional[List[Dict[str, Any]]]
|
Each dictionary contains the named arguments for the corresponding callable
in |
None
|
timeout
|
Optional[float]
|
Maximum time (in seconds) to wait for responses. |
None
|
Returns:
| Type | Description |
|---|---|
Any
|
Tuple containing the responses for each callable. If an error or |
...
|
timeout occurs for a specific callable, its corresponding response |
Tuple[Any, ...]
|
in the tuple will be |
Raises:
| Type | Description |
|---|---|
TypeError
|
If |
ValueError
|
If the lengths of |
Examples:
def add(x, y): return x + y def multiply(x, y=2): return x * y callables = [add, multiply, add]
Example 1: Using only args_list
args = [ (1, 2), (3,), (10, 20) ] # multiply will use its default y results = F.scatter_gather(callables, args_list=args) print(results) # (3, 6, 30)
Example 2: Using args_list e kwargs_list
args = [ (1,), (), (10,) ] kwargs = [ {'y': 2}, {'x': 3, 'y': 3}, {'y': 20} ] results = F.scatter_gather(callables, args_list=args, kwargs_list=kwargs) print(results) # (3, 9, 30)
Example 3: Using only kwargs_list (useful if functions have
defaults or don't need positional args)
def greet(name="World"): return f"Hello, {name}" def farewell(person_name): return f"Goodbye, {person_name}" funcs = [greet, greet, farewell] kwargs_for_funcs = [ {}, {'name': "Earth"}, {'person_name': "Commander"} ] results = F.scatter_gather(funcs, kwargs_list=kwargs_for_funcs) print(results) # ("Hello, World", "Hello, Earth", "Goodbye, Commander")
Source code in src/msgflux/nn/functional.py
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 | |
msg_scatter_gather
Scatter a list of messages to a list of modules and gather the responses.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
to_send
|
List[Callable]
|
List of callable objects (e.g. functions or |
required |
messages
|
List[dotdict]
|
List of |
required |
timeout
|
Optional[float]
|
Maximum time (in seconds) to wait for responses. |
None
|
Returns:
| Type | Description |
|---|---|
Tuple[dotdict, ...]
|
Tuple containing the messages updated with the responses. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If |
Source code in src/msgflux/nn/functional.py
bcast_gather
Broadcasts arguments to multiple callables and gathers the responses.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
to_send
|
List[Callable]
|
List of callable objects (e.g. functions or |
required |
*args
|
Positional arguments. |
()
|
|
timeout
|
Optional[float]
|
Maximum time (in seconds) to wait for responses. |
None
|
**kwargs
|
Named arguments. |
{}
|
Returns:
| Type | Description |
|---|---|
Tuple[Any, ...]
|
Tuple containing the responses. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If |
Examples:
def square(x): return x * x def cube(x): return x * x * x def fail(x): raise ValueError("Intentional error")
Example 1:
results = F.bcast_gather([square, cube], 3) print(results) # (9, 27)
Example 2: Simulate error
results = F.bcast_gather([square, fail, cube], 2) print(results) # (4, None, 8)
Example 3: Timeout
results = F.bcast_gather([square, cube], 4, timeout=0.01) print(results) # (16, 64)
Source code in src/msgflux/nn/functional.py
msg_bcast_gather
Broadcasts a single message to multiple modules and gathers the responses.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
to_send
|
List[Callable]
|
List of callable objects (e.g. functions or |
required |
message
|
dotdict
|
Instance of |
required |
timeout
|
Optional[float]
|
Maximum time (in seconds) to wait for responses. |
None
|
Returns:
| Type | Description |
|---|---|
dotdict
|
The original message with the module responses added. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If |
Source code in src/msgflux/nn/functional.py
wait_for
Wait for a callable execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
to_send
|
Callable
|
A callable object (e.g. functions or |
required |
*args
|
Positional arguments. |
()
|
|
timeout
|
Optional[float]
|
Maximum time (in seconds) to wait for responses. |
None
|
**kwargs
|
Named arguments. |
{}
|
Returns:
| Type | Description |
|---|---|
Any
|
Callable responses. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If |
Examples:
async def f1(x): return x * x
Example 1:
results = F.wait_for(f1, 3) print(results) # 9
Source code in src/msgflux/nn/functional.py
wait_for_event
Waits synchronously for an asyncio.Event to be set.
This function will block until event.set() is called elsewhere.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
Event
|
The asyncio.Event to wait for. |
required |
Raises:
| Type | Description |
|---|---|
TypeError
|
If |
Source code in src/msgflux/nn/functional.py
background_task
Executes a task in the background asynchronously without blocking, using the AsyncExecutorPool. This function is "fire-and-forget".
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
to_send
|
Callable
|
Callable object (function, async function, or module with .acall() method). |
required |
*args
|
Positional arguments. |
()
|
|
**kwargs
|
Named arguments. |
{}
|
Raises:
| Type | Description |
|---|---|
TypeError
|
If |
Examples:
Example 1:
import time def print_message(message: str): time.sleep(1) print(f"[Sync] Message: {message}") F.background_task(print_message, "Hello from sync function")
Example 2:
import asyncio async def async_print_message(message: str): await asyncio.sleep(1) print(f"[Async] Message: {message}") F.background_task(async_print_message, "Hello from async function")
Example 3 (with error):
def failing_task(): raise ValueError("This task failed!") F.background_task(failing_task) # Error will be logged