π Multi-Commodity Flow Environmentο
This environment is based on the classical multi-commodity flow problem from combinatorial optimization [1], formulated in a partially observable, decentralized multi-agent reinforcement learning setting.
In this environment, each agent controls a node in a directed graph and decides how to dispatch incoming flows to its outgoing edges. The network supports multiple commodities (i.e., types or classes of flow), and the objective is to minimize the total cost of flow circulation while satisfying physical constraints like edge capacities and flow conservation.
This problem is challenging due to:
The combinatorial structure of routing multiple flows.
The decentralized nature of control (one agent per node).
Integer flow constraints.
Partial observability (agents only observe their own local neighborhood).
Illustration for the multi-commodity flow problem representing the problem from the perspective of a single agent. The objective is to minimize the cost of operating the network. Each agent chooses how to dispatch incoming commodities to its outgoing edges at each step.ο
Overviewο
The environment supports two main settings:
Circulation problems: All nodes are circulation nodes with initial flows, no explicit sources or sinks.
Source-sink flow problems: Flows must start at designated source nodes and terminate at sink nodes.
Constraintsο
The environment enforces the following constraints at each timestep:
Edge Capacity: The total flow on edge \((i, j)\) cannot exceed its capacity \(\rho_{ij}\).
Flow Conservation at Circulation Nodes: For each circulation node, the total incoming flow must equal the total outgoing flow.
Source/Sink Consistency: Each flow must fully exit its source node and fully enter its sink node over the episode.
Let \(\rho_{\text{max}} = \max_{(i,j)} \rho_{ij}\) denote the maximum edge capacity in the network.
State Spaceο
Let:
\(k\) be the number of commodities (flow classes),
\(N\) the number of controllable nodes (agents),
\(E\) the number of directed edges.
The global state at time \(t\) consists of the flow values on each of the \(E\) edges for all commodities.
Each agent observes only the flows arriving on its own incoming edges, which may include flows from any commodity. There is no access to global state or the actions of other agents.
Action Spaceο
Each agent \(i\) controls the dispatch of incoming flows across its outgoing edges.
The action of agent \(i\) is a vector (or matrix) with \(k \times n^{\text{out}}_i\) entries, where \(n^{\text{out}}_i\) is the number of outgoing edges from node \(i\).
The values represent integer flows assigned to each edge per commodity.
Internally, the policy outputs a continuous distribution over possible flow allocations, which is discretized into integer flow values.
Reward and Objectiveο
The goal is to minimize the total cost of flow circulation over the episode horizon \(T\).
Each commodity and each edge has an associated cost.
The cost incurred at time \(t\) is computed based on the flows sent through each edge and their respective costs.
The total episode reward is the negative sum of these flow costs, i.e., a cost-minimization objective.
This environment is a benchmark for multi-agent coordination, flow control under partial observability, and cooperative optimization in graph-structured settings. It is particularly well-suited for studying distributed reinforcement learning algorithms on networked systems.
Environmentο
- class cognac.env.MultiCommodityFlow.env.MultiCommodityFlowEnvironment(adjacency_matrix: ~numpy.ndarray, n_commodities: int = 3, max_capacity: int = 100, max_steps: int = 20, reward_class: type = <class 'cognac.env.MultiCommodityFlow.rewards.MCFWithOverflowPenaltyReward'>, is_global_reward: bool = False)ο
Bases:
ParallelEnvMulti-Commodity Flow Environment based on a directed graph representing agents as nodes controlling flow of multiple commodities through edges.
This environment models a circulation network where agents redistribute commodities along outgoing edges, subject to capacity constraints. It supports multi-agent reinforcement learning via the PettingZoo ParallelEnv interface.
Parametersο
- adjacency_matrixnp.ndarray
Square matrix describing the influence graph adjacency between agents. Positive/negative entries define direction and influence strength.
- n_commoditiesint, optional
Number of commodity types flowing in the network (default is 5).
- max_capacityint, optional
Maximum capacity of each edge or node (default is 100).
- max_stepsint, optional
Maximum number of steps per episode before termination (default is 20).
- reward_classtype, optional
Class used to compute the reward at each step (default is DefaultMCFReward).
- is_global_rewardbool, optional
Whether to use a global reward shared across all agents or individual rewards (default False).
Attributesο
- adjacency_matrixnp.ndarray
The input adjacency matrix of the network.
- n_agentsint
Number of agents/nodes in the network.
- possible_agentslist of int
List of agent indices representing nodes.
- max_capacityint
Maximum capacity of edges/nodes.
- networknetworkx.DiGraph
Directed graph representing the network topology and flows.
- timestepint
Current time step in the episode.
- stateobject
Current environment state (custom structure).
- rewardobject
Reward function instance for computing step rewards.
- influence_activationnp.ndarray
Boolean matrix indicating active influences between agents.
- influence_sgnnp.ndarray
Matrix indicating sign (+/-) of influences.
- adjacency_matrix_probnp.ndarray
Absolute value of adjacency matrix entries, interpreted as probabilities.
Methodsο
- reset(seed=None, options=None)
Reset the environment to initial state and sample initial flows.
- step(actions)
Perform one environment step applying agent actions and updating flows.
- get_obs()
Return observations for all agents.
- observation_space(agent)
Return the observation space for a given agent.
- action_space(agent)
Return the action space for a given agent.
- _check_influence_graph() Noneο
Validate the influence graph properties.
Checks that the diagonal of the adjacency matrix probability matrix is zero, ensuring no self-influence, and verifies that all entries are within [0,1].
Raisesο
- AssertionError
If the diagonal entries are not zero or if any entry is out of bounds.
Warning
Internal use only. Used to ensure network consistency.
- _init_type_node() Noneο
Initialize node types in the network graph based on connectivity.
Node types: - βsourceβ: no predecessors (input node) - βsinkβ: no successors (output node) - βcirculationβ: has both predecessors and successors - βunconnectedβ: isolated node (no predecessors or successors)
Sets the βtypeβ attribute on each node in the network graph.
Raisesο
- AssertionError
If the network does not contain at least one source and one sink node, or if it contains unsupported types.
Warning
Internal use only. This method is intended for internal environment setup.
- _normalize_act(act)ο
Normalize a NumPy array to a row-wise stochastic matrix.
Each row is normalized so that the sum of its elements is 1. Rows that sum to 0 remain all zeros to avoid division by zero.
Parameters:ο
- matrixnp.ndarray
A 2D NumPy array to normalize.
Returns:ο
- np.ndarray
A row-wise stochastic version of the input matrix.
- _split_integers_by_distributions(stocks: ndarray, distributions: ndarray) ndarrayο
Split each integer stock into parts proportional to the corresponding distribution.
Parametersο
- stockslist[int]
List of integer stocks (e.g., [12, 5, 7]).
- distributionslist[list[float]]
List of distributions (row-wise), each summing to 1 (or will be normalized).
Returnsο
- np.ndarray
Array of shape (len(stocks), len(distributions[0])) with integer parts, where each row sums to the corresponding stock value.
- action_space(agent: int) Discreteο
Return the action space specification for a given agent.
If the agent has no outgoing edges, returns a zero-dimensional Box. Otherwise, returns a Box space with shape equal to the number of outgoing edges, with each action value in [0.0, 1.0], representing proportions.
Parametersο
- agentint
Agent index.
Returnsο
- Box
Gymnasium Box space defining valid actions for the agent.
- get_obs() dictο
Get current observations for all agents.
Observation consists of the agentβs current commodity stock and the flow values of all incoming edges concatenated into a single numpy array.
Returnsο
- dict
Mapping from agent ID to numpy array representing the observation.
- metadata: dict[str, Any] = {'name': 'multicommodity_flow_environment_v0'}ο
- observation_space(agent: int) MultiDiscreteο
Split an integer stock into parts proportional to a given distribution.
Ensures that the returned list of integer parts sums exactly to stock. The splitting is done by flooring the proportional amounts and distributing the remainder according to the highest fractional parts.
Parametersο
- stockint
Total integer value to split.
- distributionlist of float
List of proportions (not necessarily normalized) that sum to 1.
Returnsο
- list of int
List of integer parts summing exactly to stock.
Warning
Internal utility method for flow distribution calculation.
- reset(seed: int = None, options: dict = None) tupleο
Reset the environment to the initial state.
Resets all agent states, network flows, and commodities to initial random values subject to capacity constraints.
Parametersο
- seedint, optional
Seed for random number generators to ensure reproducibility.
- optionsdict, optional
Additional options for environment reset.
Returnsο
- tuple
A tuple containing: - observations (dict): Mapping from agent ID to initial observation. - infos (dict): Mapping from agent ID to info dict (empty by default).
- step(actions: dict) tupleο
Execute a step of the environment using the provided agent actions.
Each agent redistributes its commodity stock along outgoing edges according to the action distribution. The environment updates flow values, applies reward computation, and checks termination conditions.
Parametersο
- actionsdict
Mapping from agent ID to a list or array of dispatch values for outgoing edges.
Returnsο
- tuple
A 5-tuple containing: - observations (dict): Agent observations after step. - rewards (dict): Reward values for each agent. - terminations (dict): Boolean flags indicating episode termination per agent. - truncations (dict): Boolean flags indicating episode truncation per agent. - infos (dict): Additional info dictionaries per agent.
Rewardsο
- class cognac.env.MultiCommodityFlow.rewards.DefaultMCFRewardο
Bases:
BaseRewardDefault reward function for the Multi Commodity Flow environment.
The reward is the negative of the total cost of the flow.
- _abc_impl = <_abc._abc_data object>ο
- class cognac.env.MultiCommodityFlow.rewards.MCFWithOverflowPenaltyReward(overflow_penalty_coef=1.0)ο
Bases:
BaseRewardReward function for Multi Commodity Flow with overflow penalty.
The reward is the negative of the total cost of the flow, plus a penalty when total commodity flow on an edge exceeds its capacity.
- _abc_impl = <_abc._abc_data object>ο