πŸ” Multi-Commodity Flow Environment

This environment is based on the classical multi-commodity flow problem from combinatorial optimization [1], formulated in a partially observable, decentralized multi-agent reinforcement learning setting.

In this environment, each agent controls a node in a directed graph and decides how to dispatch incoming flows to its outgoing edges. The network supports multiple commodities (i.e., types or classes of flow), and the objective is to minimize the total cost of flow circulation while satisfying physical constraints like edge capacities and flow conservation.

This problem is challenging due to:

  • The combinatorial structure of routing multiple flows.

  • The decentralized nature of control (one agent per node).

  • Integer flow constraints.

  • Partial observability (agents only observe their own local neighborhood).

Illustration for the multi-commodity flow problem.

Illustration for the multi-commodity flow problem representing the problem from the perspective of a single agent. The objective is to minimize the cost of operating the network. Each agent chooses how to dispatch incoming commodities to its outgoing edges at each step.

Overview

The environment supports two main settings:

  • Circulation problems: All nodes are circulation nodes with initial flows, no explicit sources or sinks.

  • Source-sink flow problems: Flows must start at designated source nodes and terminate at sink nodes.

Constraints

The environment enforces the following constraints at each timestep:

  • Edge Capacity: The total flow on edge \((i, j)\) cannot exceed its capacity \(\rho_{ij}\).

  • Flow Conservation at Circulation Nodes: For each circulation node, the total incoming flow must equal the total outgoing flow.

  • Source/Sink Consistency: Each flow must fully exit its source node and fully enter its sink node over the episode.

Let \(\rho_{\text{max}} = \max_{(i,j)} \rho_{ij}\) denote the maximum edge capacity in the network.

State Space

Let:

  • \(k\) be the number of commodities (flow classes),

  • \(N\) the number of controllable nodes (agents),

  • \(E\) the number of directed edges.

The global state at time \(t\) consists of the flow values on each of the \(E\) edges for all commodities.

Each agent observes only the flows arriving on its own incoming edges, which may include flows from any commodity. There is no access to global state or the actions of other agents.

Action Space

Each agent \(i\) controls the dispatch of incoming flows across its outgoing edges.

  • The action of agent \(i\) is a vector (or matrix) with \(k \times n^{\text{out}}_i\) entries, where \(n^{\text{out}}_i\) is the number of outgoing edges from node \(i\).

  • The values represent integer flows assigned to each edge per commodity.

  • Internally, the policy outputs a continuous distribution over possible flow allocations, which is discretized into integer flow values.

Reward and Objective

The goal is to minimize the total cost of flow circulation over the episode horizon \(T\).

  • Each commodity and each edge has an associated cost.

  • The cost incurred at time \(t\) is computed based on the flows sent through each edge and their respective costs.

  • The total episode reward is the negative sum of these flow costs, i.e., a cost-minimization objective.

This environment is a benchmark for multi-agent coordination, flow control under partial observability, and cooperative optimization in graph-structured settings. It is particularly well-suited for studying distributed reinforcement learning algorithms on networked systems.

Environment

class cognac.env.MultiCommodityFlow.env.MultiCommodityFlowEnvironment(adjacency_matrix: ~numpy.ndarray, n_commodities: int = 3, max_capacity: int = 100, max_steps: int = 20, reward_class: type = <class 'cognac.env.MultiCommodityFlow.rewards.MCFWithOverflowPenaltyReward'>, is_global_reward: bool = False)

Bases: ParallelEnv

Multi-Commodity Flow Environment based on a directed graph representing agents as nodes controlling flow of multiple commodities through edges.

This environment models a circulation network where agents redistribute commodities along outgoing edges, subject to capacity constraints. It supports multi-agent reinforcement learning via the PettingZoo ParallelEnv interface.

Parameters

adjacency_matrixnp.ndarray

Square matrix describing the influence graph adjacency between agents. Positive/negative entries define direction and influence strength.

n_commoditiesint, optional

Number of commodity types flowing in the network (default is 5).

max_capacityint, optional

Maximum capacity of each edge or node (default is 100).

max_stepsint, optional

Maximum number of steps per episode before termination (default is 20).

reward_classtype, optional

Class used to compute the reward at each step (default is DefaultMCFReward).

is_global_rewardbool, optional

Whether to use a global reward shared across all agents or individual rewards (default False).

Attributes

adjacency_matrixnp.ndarray

The input adjacency matrix of the network.

n_agentsint

Number of agents/nodes in the network.

possible_agentslist of int

List of agent indices representing nodes.

max_capacityint

Maximum capacity of edges/nodes.

networknetworkx.DiGraph

Directed graph representing the network topology and flows.

timestepint

Current time step in the episode.

stateobject

Current environment state (custom structure).

rewardobject

Reward function instance for computing step rewards.

influence_activationnp.ndarray

Boolean matrix indicating active influences between agents.

influence_sgnnp.ndarray

Matrix indicating sign (+/-) of influences.

adjacency_matrix_probnp.ndarray

Absolute value of adjacency matrix entries, interpreted as probabilities.

Methods

reset(seed=None, options=None)

Reset the environment to initial state and sample initial flows.

step(actions)

Perform one environment step applying agent actions and updating flows.

get_obs()

Return observations for all agents.

observation_space(agent)

Return the observation space for a given agent.

action_space(agent)

Return the action space for a given agent.

_check_influence_graph() None

Validate the influence graph properties.

Checks that the diagonal of the adjacency matrix probability matrix is zero, ensuring no self-influence, and verifies that all entries are within [0,1].

Raises

AssertionError

If the diagonal entries are not zero or if any entry is out of bounds.

Warning

Internal use only. Used to ensure network consistency.

_init_type_node() None

Initialize node types in the network graph based on connectivity.

Node types: - β€˜source’: no predecessors (input node) - β€˜sink’: no successors (output node) - β€˜circulation’: has both predecessors and successors - β€˜unconnected’: isolated node (no predecessors or successors)

Sets the β€œtype” attribute on each node in the network graph.

Raises

AssertionError

If the network does not contain at least one source and one sink node, or if it contains unsupported types.

Warning

Internal use only. This method is intended for internal environment setup.

_normalize_act(act)

Normalize a NumPy array to a row-wise stochastic matrix.

Each row is normalized so that the sum of its elements is 1. Rows that sum to 0 remain all zeros to avoid division by zero.

Parameters:

matrixnp.ndarray

A 2D NumPy array to normalize.

Returns:

np.ndarray

A row-wise stochastic version of the input matrix.

_split_integers_by_distributions(stocks: ndarray, distributions: ndarray) ndarray

Split each integer stock into parts proportional to the corresponding distribution.

Parameters

stockslist[int]

List of integer stocks (e.g., [12, 5, 7]).

distributionslist[list[float]]

List of distributions (row-wise), each summing to 1 (or will be normalized).

Returns

np.ndarray

Array of shape (len(stocks), len(distributions[0])) with integer parts, where each row sums to the corresponding stock value.

action_space(agent: int) Discrete

Return the action space specification for a given agent.

If the agent has no outgoing edges, returns a zero-dimensional Box. Otherwise, returns a Box space with shape equal to the number of outgoing edges, with each action value in [0.0, 1.0], representing proportions.

Parameters

agentint

Agent index.

Returns

Box

Gymnasium Box space defining valid actions for the agent.

get_obs() dict

Get current observations for all agents.

Observation consists of the agent’s current commodity stock and the flow values of all incoming edges concatenated into a single numpy array.

Returns

dict

Mapping from agent ID to numpy array representing the observation.

metadata: dict[str, Any] = {'name': 'multicommodity_flow_environment_v0'}
observation_space(agent: int) MultiDiscrete

Split an integer stock into parts proportional to a given distribution.

Ensures that the returned list of integer parts sums exactly to stock. The splitting is done by flooring the proportional amounts and distributing the remainder according to the highest fractional parts.

Parameters

stockint

Total integer value to split.

distributionlist of float

List of proportions (not necessarily normalized) that sum to 1.

Returns

list of int

List of integer parts summing exactly to stock.

Warning

Internal utility method for flow distribution calculation.

reset(seed: int = None, options: dict = None) tuple

Reset the environment to the initial state.

Resets all agent states, network flows, and commodities to initial random values subject to capacity constraints.

Parameters

seedint, optional

Seed for random number generators to ensure reproducibility.

optionsdict, optional

Additional options for environment reset.

Returns

tuple

A tuple containing: - observations (dict): Mapping from agent ID to initial observation. - infos (dict): Mapping from agent ID to info dict (empty by default).

step(actions: dict) tuple

Execute a step of the environment using the provided agent actions.

Each agent redistributes its commodity stock along outgoing edges according to the action distribution. The environment updates flow values, applies reward computation, and checks termination conditions.

Parameters

actionsdict

Mapping from agent ID to a list or array of dispatch values for outgoing edges.

Returns

tuple

A 5-tuple containing: - observations (dict): Agent observations after step. - rewards (dict): Reward values for each agent. - terminations (dict): Boolean flags indicating episode termination per agent. - truncations (dict): Boolean flags indicating episode truncation per agent. - infos (dict): Additional info dictionaries per agent.

Rewards

class cognac.env.MultiCommodityFlow.rewards.DefaultMCFReward

Bases: BaseReward

Default reward function for the Multi Commodity Flow environment.

The reward is the negative of the total cost of the flow.

_abc_impl = <_abc._abc_data object>
class cognac.env.MultiCommodityFlow.rewards.MCFWithOverflowPenaltyReward(overflow_penalty_coef=1.0)

Bases: BaseReward

Reward function for Multi Commodity Flow with overflow penalty.

The reward is the negative of the total cost of the flow, plus a penalty when total commodity flow on an edge exceeds its capacity.

_abc_impl = <_abc._abc_data object>