UniCoreFW

flow()

Create a function that is the composition of functions, executed left to right.

Implementation

Args: *functions: Functions to compose Returns: A function that applies all functions in sequence

Example

add_exclamation = lambda s: s + "!" make_greeting = lambda name: f"Hello {name}" greet = flow(make_greeting, add_exclamation) greet("World")

Expected output: 'Hello World!'

Source Code

def flow(*functions: Callable) -> Callable: for func in functions: _validate_callable(func) if not functions: return lambda x: x def composed(*args, **kwargs): result = functions[0](*args, **kwargs) for func in functions[1:]: result = func(result) return result # Set _argcount for compatibility with tests composed._argcount = functions[0].__code__.co_argcount if functions else 0 # type: ignore composed.__name__ = f"flow({', '.join(f.__name__ for f in functions)})" return composed